In [0]:
#Load and data ingestion
cyber_data = spark.read.option("header", True).option("inferSchema", True).csv("/FileStore/tables/Global_Cybersecurity_Threats_2015_2024.csv")
cyber_data.show(5)
cyber_data.printSchema()
cyber_data.count()

print("row count:", cyber_data.count())


+-------+----+-----------------+------------------+-----------------------------+------------------------+-------------+---------------------------+----------------------+-----------------------------------+
|Country|Year|      Attack Type|   Target Industry|Financial Loss (in Million $)|Number of Affected Users|Attack Source|Security Vulnerability Type|Defense Mechanism Used|Incident Resolution Time (in Hours)|
+-------+----+-----------------+------------------+-----------------------------+------------------------+-------------+---------------------------+----------------------+-----------------------------------+
|  China|2019|         Phishing|         Education|                        80.53|                  773169| Hacker Group|         Unpatched Software|                   VPN|                                 63|
|  China|2019|       Ransomware|            Retail|                        62.19|                  295961| Hacker Group|         Unpatched Software|              Firewa

In [0]:
#SQL Data Exploration

cyber_data.createOrReplaceTempView("cyber_threats")

In [0]:
%sql
SELECT `Attack Type`, COUNT(*) AS total
FROM cyber_threats
GROUP BY `Attack Type`
ORDER BY total DESC
LIMIT 5;

Attack Type,total
DDoS,531
Phishing,529
SQL Injection,503
Ransomware,493
Malware,485


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT Year, ROUND(SUM(`Financial Loss (in Million $)`), 2) AS Total_Loss
FROM cyber_threats
GROUP BY Year
ORDER BY Year;


Year,Total_Loss
2015,14510.21
2016,13947.26
2017,16261.68
2018,14720.48
2019,13134.69
2020,15767.95
2021,15873.41
2022,15870.86
2023,15958.08
2024,15434.29


Databricks visualization. Run in Databricks to view.

In [0]:
#Feature Engineering
from pyspark.sql.functions import when, col
median_time = cyber_data.approxQuantile("Incident Resolution Time (in Hours)", [0.5], 0.01)[0]
print("Median Resolution Time:", median_time)
cyber_data = cyber_data.withColumn("Long_Resolution", when(col("Incident Resolution Time (in Hours)") > median_time, 1).otherwise(0))
cyber_data.groupBy("Long_Resolution").count().show()

cyber_data = cyber_data.withColumn("Users_Per_Year", col("Number of Affected Users") / (col("Year") + 1))
cyber_data = cyber_data.withColumn("User_Impact_Index", col("Number of Affected Users") / 100000)


Median Resolution Time: 36.0
+---------------+-----+
|Long_Resolution|count|
+---------------+-----+
|              1| 1501|
|              0| 1499|
+---------------+-----+



In [0]:
#Index Categorical Features
cat_cols = [
    "Country", 
    "Attack Type", 
    "Target Industry", 
    "Attack Source", 
    "Security Vulnerability Type", 
    "Defense Mechanism Used"
]

from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=c, outputCol=c + "_idx", handleInvalid="keep") for c in cat_cols]

input_features = [
    "Year", 
    "Number of Affected Users", 
    "Users_Per_Year", 
    "User_Impact_Index"
] + [c + "_idx" for c in cat_cols]

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=input_features, outputCol="features")


In [0]:
#Model Training
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline

gbt = GBTClassifier(labelCol="Long_Resolution", featuresCol="features", maxIter=50)

pipeline = Pipeline(stages=indexers + [assembler, gbt])

train, test = cyber_data.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train)
preds = model.transform(test)


In [0]:
#Model Perforamnce Evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="Long_Resolution", metricName="areaUnderROC")
auc = evaluator.evaluate(preds)
print("AUC ROC (Long Resolution):", round(auc, 4))


AUC ROC (Long Resolution): 0.4779


In [0]:
#Feature Importance
gbt_model = model.stages[-1]

importances = gbt_model.featureImportances.toArray()

for feature, imp in zip(input_features, importances):
    print(f"{feature}: {imp:.4f}")

Year: 0.0820
Number of Affected Users: 0.1147
Users_Per_Year: 0.0515
User_Impact_Index: 0.0000
Country_idx: 0.2095
Attack Type_idx: 0.1484
Target Industry_idx: 0.1475
Attack Source_idx: 0.0764
Security Vulnerability Type_idx: 0.0662
Defense Mechanism Used_idx: 0.1038


In [0]:
#Metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Accuracy
acc_evaluator = MulticlassClassificationEvaluator(labelCol="Long_Resolution", predictionCol="prediction", metricName="accuracy")
accuracy = acc_evaluator.evaluate(preds)

# Precision
precision_evaluator = MulticlassClassificationEvaluator(labelCol="Long_Resolution", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(preds)

# Recall
recall_evaluator = MulticlassClassificationEvaluator(labelCol="Long_Resolution", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(preds)

# F1 Score
f1_evaluator = MulticlassClassificationEvaluator(labelCol="Long_Resolution", predictionCol="prediction", metricName="f1")
f1_score = f1_evaluator.evaluate(preds)

print(f"Accuracy      : {accuracy:.4f}")
print(f"Precision     : {precision:.4f}")
print(f"Recall        : {recall:.4f}")
print(f"F1 Score      : {f1_score:.4f}")


Accuracy      : 0.4736
Precision     : 0.4736
Recall        : 0.4736
F1 Score      : 0.4736


In [0]:
#Confusion Matrix
preds.select("prediction", "Long_Resolution").groupBy("prediction", "Long_Resolution").count().show()

+----------+---------------+-----+
|prediction|Long_Resolution|count|
+----------+---------------+-----+
|       1.0|              0|  143|
|       0.0|              0|  132|
|       0.0|              1|  146|
|       1.0|              1|  128|
+----------+---------------+-----+

