In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_unixtime, hour, when
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
import matplotlib.pyplot as plt

2023-09-21 16:27:31.404911: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-09-21 16:27:31.405019: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


In [2]:
# Load the data
spark = SparkSession.builder.appName("TelecomActivityPreprocessing").getOrCreate()
data = spark.read.text("/user1/data/*.txt")

In [53]:
data.show(3, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                       |
+----------------------------------------------------------------------------------------------------------------------------+
|1\t1383260400000\t0\t0.08136262351125882\t\t\t\t                                                                            |
|1\t1383260400000\t39\t0.14186425470242922\t0.1567870050390246\t0.16093793691701822\t0.052274848528573205\t11.028366381681026|
|1\t1383261000000\t0\t0.13658782275823106\t\t\t0.02730046487718618\t                                                         |
+----------------------------------------------------------------------------------------------------------------------------+
only showing top 3 rows



In [84]:
# Remove lines with empty fields
data1 = data.filter((col("value").rlike(r"\S")) & (col("value").rlike(r"\S+\s\S+\s\S+\s\S+\s\S+\s\S+\s\S+\s\S+")))

In [85]:
data1.show(3, truncate=False)



+----------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                       |
+----------------------------------------------------------------------------------------------------------------------------+
|1\t1383260400000\t39\t0.14186425470242922\t0.1567870050390246\t0.16093793691701822\t0.052274848528573205\t11.028366381681026|
|1\t1383261000000\t39\t0.27845207746066025\t0.11992572014174135\t0.1887771729145041\t0.13363747203983203\t11.100963451409388 |
|1\t1383261600000\t39\t0.3306414276169333\t0.1709520296851148\t0.13417624316013174\t0.05460092975437236\t10.892770602791096  |
+----------------------------------------------------------------------------------------------------------------------------+
only showing top 3 rows



                                                                                

In [86]:
from pyspark.sql.functions import split, unix_timestamp, col, sum

In [87]:
data1 = data1.withColumn("columns", split(col("value"), "\t")) \
    .select(
        col("columns").getItem(0).alias("square_id"),
        (col("columns").getItem(1).cast("bigint") / 1000).cast("timestamp").alias("timestamp"),
        col("columns").getItem(2).cast("int").alias("country_code"),
        col("columns").getItem(3).cast("double").alias("sms_in_activity"),
        col("columns").getItem(4).cast("double").alias("sms_out_activity"),
        col("columns").getItem(5).cast("double").alias("call_in_activity"),
        col("columns").getItem(6).cast("double").alias("call_out_activity"),
        col("columns").getItem(7).cast("double").alias("internet_traffic_activity")
    )

In [88]:
data1.printSchema()

root
 |-- square_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- country_code: integer (nullable = true)
 |-- sms_in_activity: double (nullable = true)
 |-- sms_out_activity: double (nullable = true)
 |-- call_in_activity: double (nullable = true)
 |-- call_out_activity: double (nullable = true)
 |-- internet_traffic_activity: double (nullable = true)



In [89]:
data1.show(3, truncate=False)

+---------+-------------------+------------+-------------------+-------------------+-------------------+--------------------+-------------------------+
|square_id|timestamp          |country_code|sms_in_activity    |sms_out_activity   |call_in_activity   |call_out_activity   |internet_traffic_activity|
+---------+-------------------+------------+-------------------+-------------------+-------------------+--------------------+-------------------------+
|1        |2013-10-31 23:00:00|39          |0.14186425470242922|0.1567870050390246 |0.16093793691701822|0.052274848528573205|11.028366381681026       |
|1        |2013-10-31 23:10:00|39          |0.27845207746066025|0.11992572014174135|0.1887771729145041 |0.13363747203983203 |11.100963451409388       |
|1        |2013-10-31 23:20:00|39          |0.3306414276169333 |0.1709520296851148 |0.13417624316013174|0.05460092975437236 |10.892770602791096       |
+---------+-------------------+------------+-------------------+-------------------+----

In [90]:
# Sum the data from the current 10-minute interval to 1-hour based on the timestamp
data1 = data1.withColumn(
    "hourly_timestamp",
    from_unixtime((unix_timestamp(col("timestamp")) / 3600).cast("bigint") * 3600)
)

In [91]:
data1.printSchema()

root
 |-- square_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- country_code: integer (nullable = true)
 |-- sms_in_activity: double (nullable = true)
 |-- sms_out_activity: double (nullable = true)
 |-- call_in_activity: double (nullable = true)
 |-- call_out_activity: double (nullable = true)
 |-- internet_traffic_activity: double (nullable = true)
 |-- hourly_timestamp: string (nullable = true)



In [64]:
# sum call and SMS activity per square_id and timestamp   TO DELETE
data1 = data1.agg(  #.groupBy(
   # "square_id",
   # "hourly_timestamp"
#).agg(
    sum(col("call_in_activity") + col("call_out_activity")).alias("call_activity"),
    sum(col("sms_in_activity") + col("sms_out_activity")).alias("sms_activity"),
    sum(col("internet_traffic_activity")).alias("internet_traffic_activity")

)

In [92]:
# let's reduce the dataframe size summarizing SMS and call data
from pyspark.sql import functions as F

data1 = data1.withColumn('SMS_activity', F.col('sms_in_activity') + F.col('sms_out_activity'))
data1 = data1.withColumn('call_activity', F.col('call_in_activity') + F.col('call_out_activity'))

In [93]:
# Add a column 'weekend_or_weekday'
data1 = data1.withColumn(
    "weekend_or_weekday",
    when(hour(col("hourly_timestamp")).between(0, 5), "weekend").otherwise("weekday")
)

In [94]:
data1.printSchema()

root
 |-- square_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- country_code: integer (nullable = true)
 |-- sms_in_activity: double (nullable = true)
 |-- sms_out_activity: double (nullable = true)
 |-- call_in_activity: double (nullable = true)
 |-- call_out_activity: double (nullable = true)
 |-- internet_traffic_activity: double (nullable = true)
 |-- hourly_timestamp: string (nullable = true)
 |-- SMS_activity: double (nullable = true)
 |-- call_activity: double (nullable = true)
 |-- weekend_or_weekday: string (nullable = false)



In [95]:
data1 = data1.drop('sms_in_activity', 'sms_out_activity', 'call_in_activity', 'call_out_activity')

In [99]:
print(data1.select("*"))

DataFrame[square_id: string, timestamp: timestamp, country_code: int, internet_traffic_activity: double, hourly_timestamp: string, SMS_activity: double, call_activity: double, weekend_or_weekday: string]


In [97]:
from pyspark.sql.functions import avg

activity_summary = data1.groupBy('weekend_or_weekday').agg(avg('SMS_activity').alias('Average_SMS_activity'), avg('call_activity').alias('Average_Call_activity'), avg('internet_traffic_activity').alias('Average_Internet_traffic_activity'))
activity_summary.show()



+------------------+--------------------+---------------------+---------------------------------+
|weekend_or_weekday|Average_SMS_activity|Average_Call_activity|Average_Internet_traffic_activity|
+------------------+--------------------+---------------------+---------------------------------+
|           weekday|    7.49561325612599|    8.697384310443645|                  82.180836362139|
|           weekend|  2.0342061005246967|   1.0514926201658503|               63.894598042644766|
+------------------+--------------------+---------------------+---------------------------------+



                                                                                

In [104]:
# exporting data
data1.write.parquet("/user1/data/parquet")

                                                                                

In [None]:
data1.write.csv("/user1/data/csv")



In [None]:
#preparing for modeling (cat values, scaling, splitting)
# handle categorical values
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# Create StringIndexer for categorical columns
indexer = StringIndexer(inputCol='weekend_or_weekday', outputCol='weekend_or_weekday_index')

# Apply StringIndexer to get numerical indices
indexed_df = indexer.fit(data1).transform(data1)

In [None]:
# Apply OneHotEncoder to convert indices into binary vectors
encoder = OneHotEncoder(inputCols=['weekend_or_weekday_index'],
                        outputCols=['weekend_or_weekday_encoded'])
encoded_model = encoder.fit(indexed_df)
encoded_df = encoded_model.transform(indexed_df)

In [None]:
# scaling numerical features
from pyspark.ml.feature import StandardScaler

# Create StandardScaler for numerical columns
scaler = StandardScaler(inputCols=['Square_id', 'Timestamp', 'Internet_traffic_activity'],
                        outputCols=['Scaled_Square_id', 'Scaled_Timestamp', 'Scaled_Internet_traffic_activity'])

# Fit the scaler on the data and transform the features
scaled_df = scaler.fit(encoded_df).transform(encoded_df)


In [17]:
# Prepare the data for training the autoencoder
X_train = preprocessed_data.select("*")  # "*" to take all input features
# train_data, test_data = scaled_df.randomSplit([0.8, 0.2], seed=42)


In [None]:
# Build the autoencoder model
#input_dim = X_train.shape[1]
input_dim = len(X_train.columns)
encoding_dim = 32

autoencoder = Sequential([
    Dense(encoding_dim, activation='relu', input_shape=(input_dim,)),
    Dense(input_dim, activation='sigmoid')
])

In [None]:
# Compile and train the autoencoder model
#autoencoder.compile(optimizer='adam', loss='mean_squared_error')

# autoencoder.fit(X_train, X_train, epochs=10, batch_size=32) 
# ^ this won't work as we can't train a Keras model on a SparkDataframe 
# and data is too big to be converted to a Pandas Dataframe.

Since we cannot directly fit a Keras model on a Spark DataFrame, we need to convert it to an RDD (Resilient Distributed Dataset). We use the rdd property of the Spark DataFrame to achieve this. <br><br>

We use the foreachPartition method of the RDD to train the autoencoder model on each partition of the data. This allows to distribute the training process across multiple workers in the Spark cluster.

In [None]:
rdd = X_train.rdd

In [None]:
# Define a function to train the autoencoder model on each partition
def train_autoencoder(partition):

    # Create a new autoencoder model for each partition
    autoencoder = Sequential([
        Dense(encoding_dim, activation='relu', input_shape=(input_dim,)),
        Dense(input_dim, activation='sigmoid')
    ])

    # Compile and fit the autoencoder model on the partition data
    autoencoder.compile(optimizer='adam', loss='mse')
    autoencoder.fit(partition, partition, epochs=10, batch_size=32)

# Train the autoencoder model on each partition of the RDD
rdd.foreachPartition(train_autoencoder)

# Detect anomalies using the trained autoencoder
reconstructed_data = autoencoder.predict(X_train)
mse = tf.keras.losses.mean_squared_error(X_train, reconstructed_data)
anomaly_threshold = mse.mean() + mse.std() * 2  # Adjust the threshold as per your requirements

anomalies = X_train[mse > anomaly_threshold]


To measure the performance of the autoencoder model, we can use the reconstruction error as a metric. The reconstruction error is typically measured as the mean squared error (MSE) between the original input and its reconstruction. A lower reconstruction error indicates better performance.

To evaluate the model’s performance, we can calculate the MSE between the original input data X_train and its corresponding reconstructed data reconstructed_data

In [None]:
mse = tf.keras.losses.mean_squared_error(X_train, reconstructed_data)

In [None]:
# Plotting anomalies versus normal items
plt.scatter(X_train['Timestamp'], X_train['SMS_in_activity'], label='Normal')
plt.scatter(anomalies['Timestamp'], anomalies['SMS_in_activity'], color='red', label='Anomaly')
plt.xlabel('Timestamp')
plt.ylabel('SMS-in Activity')
plt.title('Anomalies Detected')
plt.legend()
plt.show()

In [None]:
# Stop the Spark session
spark.stop()

In [2]:
# =============================================

In [3]:
# tuning model

In [None]:
# Build multiple autoencoder models with different initializations or hyperparameters
num_models = 5
autoencoders = []
for _ in range(num_models):
    autoencoder = Sequential([
        Dense(encoding_dim, activation='relu', input_shape=(input_dim,)),
        Dense(input_dim, activation='sigmoid')
    ])
    autoencoder.compile(optimizer='adam', loss='mean_squared_error')
    autoencoders.append(autoencoder)

# Train each autoencoder model separately
for autoencoder in autoencoders:
    autoencoder.fit(X_train, X_train, epochs=10, batch_size=32)

# Combine the outputs of all autoencoder models using ensemble methods (e.g., averaging)
reconstructed_data = np.mean([autoencoder.predict(X_train) for autoencoder in autoencoders], axis=0)

# Calculate the mean squared error (MSE) between the original input and ensemble reconstruction
mse = tf.keras.losses.mean_squared_error(X_train, reconstructed_data)

# Set a threshold based on your specific requirements to classify data points as anomalies or normal
anomaly_threshold = mse.mean() + mse.std() * 2

# Identify anomalies based on the MSE threshold
anomalies = X_train[mse > anomaly_threshold]