In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import hour, dayofweek, col, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

spark = SparkSession.builder.appName("Traffic Intrusion Detection").getOrCreate()

df = spark.read.csv("/content/traffic.csv.csv", header=True, inferSchema=True)

df.printSchema()
df.show(5)

df = df.withColumn("Hour", hour(col("DateTime"))) \
       .withColumn("DayOfWeek", dayofweek(col("DateTime")))

df = df.withColumn("Intrusion", when(col("Vehicles") > 20, 1).otherwise(0))

feature_cols = ["Vehicles", "Junction", "Hour", "DayOfWeek"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

rf = RandomForestClassifier(labelCol="Intrusion", featuresCol="features", numTrees=50)
pipeline = Pipeline(stages=[assembler, rf])

train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)
model = pipeline.fit(train_data)
predictions = model.transform(test_data)

evaluator_auc = BinaryClassificationEvaluator(labelCol="Intrusion", metricName="areaUnderROC")
auc = evaluator_auc.evaluate(predictions)
print(f"AUC Score: {auc:.4f}")

evaluator_acc = MulticlassClassificationEvaluator(labelCol="Intrusion", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="Intrusion", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="Intrusion", predictionCol="prediction", metricName="weightedRecall")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="Intrusion", predictionCol="prediction", metricName="f1")

accuracy = evaluator_acc.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)
f1 = evaluator_f1.evaluate(predictions)

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

predictions.select("Vehicles", "Junction", "Hour", "DayOfWeek", "Intrusion", "prediction").show(10)

spark.stop()


root
 |-- DateTime: timestamp (nullable = true)
 |-- Junction: integer (nullable = true)
 |-- Vehicles: integer (nullable = true)
 |-- ID: long (nullable = true)

+-------------------+--------+--------+-----------+
|           DateTime|Junction|Vehicles|         ID|
+-------------------+--------+--------+-----------+
|2015-11-01 00:00:00|       1|      15|20151101001|
|2015-11-01 01:00:00|       1|      13|20151101011|
|2015-11-01 02:00:00|       1|      10|20151101021|
|2015-11-01 03:00:00|       1|       7|20151101031|
|2015-11-01 04:00:00|       1|       9|20151101041|
+-------------------+--------+--------+-----------+
only showing top 5 rows

AUC Score: 1.0000
Accuracy: 1.0000
Precision: 1.0000
Recall: 1.0000
F1 Score: 1.0000
+--------+--------+----+---------+---------+----------+
|Vehicles|Junction|Hour|DayOfWeek|Intrusion|prediction|
+--------+--------+----+---------+---------+----------+
|       9|       3|   0|        1|        0|       0.0|
|      10|       1|   2|        1| 