In [1]:
import datetime
from numpy import array, sqrt

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, unix_timestamp, expr, when
from pyspark.sql.types import FloatType

from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml.evaluation import ClusteringEvaluator, MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


In [2]:


# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Anomaly Detection Model") \
    .getOrCreate()
spark = SparkSession.builder \
    .appName("KafkaDataSparkAnalysis") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .getOrCreate()

# the following line gets the bucket name attached to our cluster
bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")

# specifying the path to our bucket where the data is located (no need to edit this path anymore)
data = "gs://" + bucket + "/notebooks/jupyter/"

df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(data + "data.csv")\
    .coalesce(5)


# df = df.withColumn("timestamp_unix", unix_timestamp("Timestamp"))


df.cache()
df.printSchema()
print("This datasets consists of {} rows.".format(df.count()))


24/05/09 16:43:43 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/05/09 16:43:43 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

root
 |-- EventType: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- Details: string (nullable = true)
 |-- Is_Anomaly: integer (nullable = true)





This datasets consists of 1000000 rows.


                                                                                

In [3]:
df.show(25)

+--------------------+-------------------+-----------+--------+--------------------+----------+
|           EventType|          Timestamp|   Location|Severity|             Details|Is_Anomaly|
+--------------------+-------------------+-----------+--------+--------------------+----------+
|  emergency_incident|2022-01-01 00:00:00|     Boston|    high|This is a simulat...|         0|
|      health_mention|2022-01-01 00:01:00|      Tokyo|     low|This is a simulat...|         0|
|      health_mention|2022-01-01 00:01:00|      Tokyo|  medium|This is a simulat...|         0|
|         vaccination|2022-01-01 00:01:00|     Boston|  medium|This is a simulat...|         0|
|general_health_re...|2022-01-01 00:03:00|      Tokyo|  medium|This is a simulat...|         0|
|  hospital_admission|2022-01-01 00:03:00|    Chicago|  medium|This is a simulat...|         0|
|general_health_re...|2022-01-01 00:03:00|    Chicago|  medium|This is a simulat...|         0|
|general_health_re...|2022-01-01 00:05:0

In [4]:

train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)
print("Training data size: {}".format(train_df.count()))
print("Testing data size: {}".format(test_df.count()))

                                                                                

Training data size: 700072




Testing data size: 299928


                                                                                

In [5]:

# Index and encode categorical features
indexer_event = StringIndexer(inputCol="EventType", outputCol="EventType_Index")
indexer_location = StringIndexer(inputCol="Location", outputCol="Location_Index")

# Convert 'Severity' to a numerical scale
severity_scale = {"low": 1, "medium": 2, "high": 3}
train_df = train_df.withColumn("Severity_Num", when(col("Severity") == "low", severity_scale["low"])
                                                        .when(col("Severity") == "medium", severity_scale["medium"])
                                                        .when(col("Severity") == "high", severity_scale["high"]))
test_df = test_df.withColumn("Severity_Num", when(col("Severity") == "low", severity_scale["low"])
                                                      .when(col("Severity") == "medium", severity_scale["medium"])
                                                      .when(col("Severity") == "high", severity_scale["high"]))


In [6]:

# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=["EventType_Index", "Location_Index", "Severity_Num"], outputCol="features")

# Scale the features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")


In [7]:

# Define the KMeans model
kmeans = KMeans().setK(9).setSeed(7).setFeaturesCol("scaledFeatures")

# Build the pipeline
pipeline = Pipeline(stages=[indexer_event, indexer_location, assembler, scaler, kmeans])


In [8]:

# Define a parameter grid
paramGrid = (ParamGridBuilder()
             .addGrid(kmeans.k, [12])  # Number of clusters
             .addGrid(scaler.withStd, [True, False])  # Standard deviation scaling
             .build())


In [9]:

# Define the evaluator
evaluator = ClusteringEvaluator(predictionCol="prediction", featuresCol="scaledFeatures", metricName="silhouette")


In [10]:

# Set up the CrossValidator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=9)  # Adjusted for demonstration

# Run cross-validation on the training data, and choose the best set of parameters.
cvModel = crossval.fit(train_df)

                                                                                

In [11]:
# Fetch the best model
bestModel = cvModel.bestModel
bestKMeansModel = bestModel.stages[-1]  # The last stage is KMeans in the pipeline

In [12]:
# Make predictions on the test data
predictions = bestModel.transform(test_df)

In [13]:
# Now choose an anomaly cluster as before (this part would be manual and interpretive)
anomaly_cluster = 1  # This needs to be checked based on new cluster centers
predictions = predictions.withColumn("predicted_label", (col("prediction") == anomaly_cluster).cast("double"))

In [14]:
predictions.show(25)

+------------------+-------------------+-----------+--------+--------------------+----------+------------+---------------+--------------+-------------+--------------------+----------+---------------+
|         EventType|          Timestamp|   Location|Severity|             Details|Is_Anomaly|Severity_Num|EventType_Index|Location_Index|     features|      scaledFeatures|prediction|predicted_label|
+------------------+-------------------+-----------+--------+--------------------+----------+------------+---------------+--------------+-------------+--------------------+----------+---------------+
|emergency_incident|2022-01-01 00:31:00|   New York|    high|This is a simulat...|         0|           3|            3.0|           5.0|[3.0,5.0,3.0]|[1.75582044243775...|         4|            0.0|
|emergency_incident|2022-01-01 00:42:00|Los Angeles|    high|This is a simulat...|         0|           3|            3.0|           4.0|[3.0,4.0,3.0]|[1.75582044243775...|         4|            0.0|


In [15]:

# Calculate Accuracy and F1 Score (as before, or consider using binary evaluators)
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="Is_Anomaly", predictionCol="predicted_label", metricName="accuracy")
accuracy = evaluator_accuracy.evaluate(predictions)

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="Is_Anomaly", predictionCol="predicted_label", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions)

print(f"Best number of clusters: {bestKMeansModel.getK()}")
print(f"Accuracy: {accuracy}")
print(f"F1 Score: {f1_score}")


[Stage 657:>                                                        (0 + 2) / 2]

Best number of clusters: 12
Accuracy: 0.9371682537142247
F1 Score: 0.9673765009491587


                                                                                