In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName("ML").getOrCreate()

In [6]:
df = spark.read.csv("../1m_health_events_dataset.csv", header=True, inferSchema=True)

                                                                                

In [7]:
df.show(5)

+--------------------+-------------------+--------+--------+--------------------+----------+
|           EventType|          Timestamp|Location|Severity|             Details|Is_Anomaly|
+--------------------+-------------------+--------+--------+--------------------+----------+
|  emergency_incident|2022-01-01 00:00:00|  Boston|    high|This is a simulat...|         0|
|      health_mention|2022-01-01 00:01:00|   Tokyo|     low|This is a simulat...|         0|
|      health_mention|2022-01-01 00:01:00|   Tokyo|  medium|This is a simulat...|         0|
|         vaccination|2022-01-01 00:01:00|  Boston|  medium|This is a simulat...|         0|
|general_health_re...|2022-01-01 00:03:00|   Tokyo|  medium|This is a simulat...|         0|
+--------------------+-------------------+--------+--------+--------------------+----------+
only showing top 5 rows



In [8]:
from pyspark.sql.functions import col, year, month, dayofweek, hour, date_format
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

# Drop the Details column if it exists
df = df.drop("Details")

# Split the Timestamp column into different components
df = df.withColumn("Year_Month", date_format(col("Timestamp"), "yyyy-MM")) \
    .withColumn("Day_of_Week", date_format(col("Timestamp"), "E")) \
    .withColumn("Hour_of_Day", hour(col("Timestamp")))

# Drop the Timestamp column
df = df.drop("Timestamp")

In [9]:
categorical_cols = ["EventType", "Location", "Severity", "Day_of_Week", "Year_Month"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") for col in categorical_cols]
encoder = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=col+"_encoded") for indexer, col in zip(indexers, categorical_cols)]
feature_cols = ["EventType_encoded", "Location_encoded", "Severity_encoded", "Day_of_Week_encoded", "Year_Month_encoded", "Hour_of_Day"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")


(trainingData, testData) = df.randomSplit([0.8, 0.2], seed=843)

In [10]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Is_Anomaly", predictionCol="prediction")

In [11]:
from pyspark.ml.classification import LogisticRegression

logisticReg = LogisticRegression(featuresCol='features', labelCol='Is_Anomaly')
pipeline_stages = indexers + encoder + [assembler, logisticReg]
pipeline_log = Pipeline(stages=pipeline_stages)
model_log = pipeline_log.fit(trainingData)
predictions_log = model_log.transform(testData)
print(f"Accuracy = ",evaluator.setMetricName("accuracy").evaluate(predictions_log))
print(f"Precision = ",evaluator.setMetricName("weightedPrecision").evaluate(predictions_log))
print(f"Recall = ",evaluator.setMetricName("weightedRecall").evaluate(predictions_log))
print(f"F1 Score = ",evaluator.setMetricName("f1").evaluate(predictions_log))

24/05/08 20:48:52 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/08 20:48:52 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

Accuracy =  0.9998702031320983


                                                                                

Precision =  0.9998702199803345
Recall =  0.9998702031320983
F1 Score =  0.9998364601587763


In [12]:
from pyspark.ml.classification import DecisionTreeClassifier
dtree = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'Is_Anomaly', maxDepth = 3)
pipeline_stages_dt = indexers + encoder + [assembler, dtree]
pipeline_dt = Pipeline(stages=pipeline_stages_dt)
model_dt = pipeline_dt.fit(trainingData)
predictions_dt = model_dt.transform(testData)
print(f"Accuracy = ",evaluator.setMetricName("accuracy").evaluate(predictions_dt))
print(f"Precision = ",evaluator.setMetricName("weightedPrecision").evaluate(predictions_dt))
print(f"Recall = ",evaluator.setMetricName("weightedRecall").evaluate(predictions_dt))
print(f"F1 Score = ",evaluator.setMetricName("f1").evaluate(predictions_dt))

                                                                                

Accuracy =  0.9998702031320983
Precision =  0.9998702199803345
Recall =  0.9998702031320983
F1 Score =  0.9998364601587763


In [13]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Is_Anomaly')
pipeline_stages_rf = indexers + encoder + [assembler, rf]
pipeline_rf = Pipeline(stages=pipeline_stages_rf)
model_rf = pipeline_rf.fit(trainingData)
predictions_rf = model_rf.transform(testData)
print(f"Accuracy = ",evaluator.setMetricName("accuracy").evaluate(predictions_rf))
print(f"Precision = ",evaluator.setMetricName("weightedPrecision").evaluate(predictions_rf))
print(f"Recall = ",evaluator.setMetricName("weightedRecall").evaluate(predictions_rf))
print(f"F1 Score = ",evaluator.setMetricName("f1").evaluate(predictions_rf))

24/05/08 20:49:13 WARN MemoryStore: Not enough space to cache rdd_466_1 in memory! (computed 20.0 MiB so far)
24/05/08 20:49:13 WARN BlockManager: Persisting block rdd_466_1 to disk instead.
24/05/08 20:49:13 WARN MemoryStore: Not enough space to cache rdd_466_2 in memory! (computed 30.0 MiB so far)
24/05/08 20:49:13 WARN BlockManager: Persisting block rdd_466_2 to disk instead.
24/05/08 20:49:13 WARN MemoryStore: Not enough space to cache rdd_466_0 in memory! (computed 30.0 MiB so far)
24/05/08 20:49:13 WARN BlockManager: Persisting block rdd_466_0 to disk instead.
24/05/08 20:49:13 WARN MemoryStore: Not enough space to cache rdd_466_4 in memory! (computed 30.0 MiB so far)
24/05/08 20:49:13 WARN BlockManager: Persisting block rdd_466_4 to disk instead.
                                                                                

Accuracy =  0.9998102968853744
Precision =  0.9996206297580206
Recall =  0.9998102968853744
F1 Score =  0.9997154543257329


                                                                                

In [14]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(featuresCol='features', labelCol='Is_Anomaly', predictionCol='prediction')
pipeline_stages_gbt = indexers + encoder + [assembler, gbt]
pipeline_gbt = Pipeline(stages=pipeline_stages_gbt)
model_gbt = pipeline_gbt.fit(trainingData)
predictions_gbt = model_gbt.transform(testData)
print(f"Accuracy = ",evaluator.setMetricName("accuracy").evaluate(predictions_gbt))
print(f"Precision = ",evaluator.setMetricName("weightedPrecision").evaluate(predictions_gbt))
print(f"Recall = ",evaluator.setMetricName("weightedRecall").evaluate(predictions_gbt))
print(f"F1 Score = ",evaluator.setMetricName("f1").evaluate(predictions_gbt))

                                                                                

Accuracy =  0.9998702031320983
Precision =  0.9998702199803345
Recall =  0.9998702031320983
F1 Score =  0.9998364601587763


In [15]:
model_name = "gbt_model"

model_path = "./gbt_model"

model_gbt.save(model_path)