In [1]:
# COLAB SETUP: PySpark + HuggingFace datasets

!pip -q install pyspark datasets pyarrow

import os, time, json
import pandas as pd
from datasets import load_dataset

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [2]:
# 1) MOUNT GOOGLE DRIVE + CREATE DIRECTORIES

from google.colab import drive
drive.mount("/content/drive")

BASE_DIR = "/content/drive/MyDrive/CIC_IDS_Collection"
SAMPLE_DIR = os.path.join(BASE_DIR, "sample_data")
REPORTS_DIR = os.path.join(BASE_DIR, "model_reports")

os.makedirs(SAMPLE_DIR, exist_ok=True)
os.makedirs(REPORTS_DIR, exist_ok=True)

SAMPLE_CSV_DIR = os.path.join(SAMPLE_DIR, "cic_ids_sample_csv")  # spark writes folder
METRICS_CSV_PATH = os.path.join(REPORTS_DIR, "model_performance.csv")

print("Folders created:")
print("Sample:", SAMPLE_DIR)
print("Reports:", REPORTS_DIR)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Folders created:
Sample: /content/drive/MyDrive/CIC_IDS_Collection/sample_data
Reports: /content/drive/MyDrive/CIC_IDS_Collection/model_reports


In [3]:
# 2) START SPARK (COLAB SAFE CFG)

spark = (
    SparkSession.builder
    .appName("7006SCN_CIC_IDS")
    .config("spark.driver.memory", "8g")
    .config("spark.executor.memory", "4g")
    .config("spark.sql.shuffle.partitions", "200")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "2g")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

In [4]:
# 3) LOAD Data USING HUGGINGFACE DATASETS

from datasets import load_dataset

DATASET_NAME = "auliraff/CIC-IDS-Collection"
SPLIT = "train"

# Load normally (not streaming)
ds = load_dataset(DATASET_NAME, split=SPLIT)

# Convert to pandas temporarily
pdf_full = ds.to_pandas()

# Check distribution
print(pdf_full["Label"].value_counts())

# Separate classes
benign_df = pdf_full[pdf_full["Label"] == "Benign"]
attack_df = pdf_full[pdf_full["Label"] != "Benign"]

# Take 50k benign + 50k attack (balanced 100k)
sample_benign = benign_df.sample(n=50000, random_state=42)
sample_attack = attack_df.sample(n=50000, random_state=42)

pdf = pd.concat([sample_benign, sample_attack])

# Shuffle
pdf = pdf.sample(frac=1, random_state=42).reset_index(drop=True)

print(pdf["Label"].value_counts())
print("Final shape:", pdf.shape)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


cic-collection.parquet:   0%|          | 0.00/1.03G [00:00<?, ?B/s]

Generating train split:   0%|          | 0/9167581 [00:00<?, ? examples/s]

Label
Benign                  7186189
DDoS-LOIC-HTTP           575364
DoS-Hulk                 318740
DDoS-HOIC                198861
Botnet                   145968
DDoS                     128062
DDoS-NTP                 121328
DDoS-TFTP                 98833
Bruteforce-SSH            97260
Infiltration              94857
DoS-Goldeneye             52324
DDoS-Syn                  47757
DDoS-UDP                  28863
DoS-Slowloris             15243
DDoS-MSSQL                11784
DDoS-UDPLag                8452
Bruteforce-FTP             5984
DoS-Slowhttptest           5271
DDoS-Ddossim               5115
DDoS-DNS                   3668
DoS-Slowread               2786
Portscan                   2255
DDoS-LDAP                  2092
Webattack-bruteforce       2020
DDoS-SNMP                  2017
DDoS-Slowloris             1858
DoS-Slowheaders            1649
Webattack-XSS               876
DoS-Rudy                    699
DDoS-NetBIOS                675
DoS-Slowbody                621
We

In [5]:
# 4) SAVE SAMPLE DATA TO DRIVE AS A SINGLE CSV


PANDAS_SAMPLE_CSV = os.path.join(SAMPLE_DIR, "cic_ids_sample.csv")
pdf.to_csv(PANDAS_SAMPLE_CSV, index=False)
print("Saved sample CSV:", PANDAS_SAMPLE_CSV)

Saved sample CSV: /content/drive/MyDrive/CIC_IDS_Collection/sample_data/cic_ids_sample.csv


In [6]:
# 5) CONVERT TO SPARK DATAFRAME

df = spark.createDataFrame(pdf)

In [7]:

# 6) BASIC CLEANING + TARGET CREATION (Binary Attack)


# Choose label column safely: prefer 'Label' else 'ClassLabel'
label_col = "Label" if "Label" in df.columns else ("ClassLabel" if "ClassLabel" in df.columns else None)
if label_col is None:
    raise ValueError("No Label/ClassLabel column found!")

# Create binary target: Benign=0, Attack=1
df = df.withColumn("target", F.when(F.col(label_col) == F.lit("Benign"), F.lit(0)).otherwise(F.lit(1)))

# Drop duplicate label columns (keep original if you want, but target is what we use)
# df = df.drop("ClassLabel") if "ClassLabel" in df.columns else df

# Fix negative values in time-like columns (common in this dataset)
time_cols = [c for c in df.columns if "Duration" in c or "IAT" in c or "Idle" in c or "Active" in c]
for c in time_cols:
    # If numeric, convert negatives to abs value (simple fix for report; explain in writeup)
    df = df.withColumn(c, F.when(F.col(c).cast("double").isNotNull(), F.abs(F.col(c).cast("double"))).otherwise(F.col(c)))

# Fill numeric nulls with 0 (safe default for flow features)
# Identify numeric columns
numeric_types = (T.IntegerType, T.LongType, T.DoubleType, T.FloatType, T.ShortType, T.ByteType, T.DecimalType)
num_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, numeric_types) and f.name not in ["target"]]

df = df.na.fill({c: 0 for c in num_cols})

# Drop rows where target is null (should not happen)
df = df.filter(F.col("target").isNotNull())

print("Spark rows:", df.count())
print("Class distribution:")
df.groupBy("target").count().show()

Spark rows: 100000
Class distribution:
+------+-----+
|target|count|
+------+-----+
|     1|50000|
|     0|50000|
+------+-----+



In [8]:
# 7) FEATURE VECTOR PIPELINE

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# SAFE feature selection
exclude_cols = ["target", "Label", "ClassLabel"]

feature_cols = [
    f.name for f in df.schema.fields
    if isinstance(f.dataType, (T.IntegerType, T.LongType, T.DoubleType, T.FloatType))
    and f.name not in exclude_cols
]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="keep")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withMean=False, withStd=True)


In [9]:
# 8) TRAIN/TEST SPLIT

# Simple, stable split for 100k dataset
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Cache for performance (important in Spark)
train_df = train_df.cache()
test_df = test_df.cache()

print("Train count:", train_df.count())
print("Test count:", test_df.count())

print("Train class distribution:")
train_df.groupBy("target").count().show()

print("Test class distribution:")
test_df.groupBy("target").count().show()

Train count: 80062
Test count: 19938
Train class distribution:
+------+-----+
|target|count|
+------+-----+
|     1|40063|
|     0|39999|
+------+-----+

Test class distribution:
+------+-----+
|target|count|
+------+-----+
|     1| 9937|
|     0|10001|
+------+-----+



In [10]:
# 9) METRICS FUNCTION

def eval_binary(pred_df, model_name):
    """
    Returns dict of metrics for binary classification.
    """
    # Some models output probability/rawPrediction; use ROC AUC
    auc_eval = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
    mc_eval_f1 = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="f1")
    mc_eval_acc = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
    mc_eval_prec = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="weightedPrecision")
    mc_eval_rec = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="weightedRecall")

    out = {}
    out["model"] = model_name
    out["rows_train"] = train_df.count()
    out["rows_test"] = test_df.count()
    out["accuracy"] = float(mc_eval_acc.evaluate(pred_df))
    out["precision_w"] = float(mc_eval_prec.evaluate(pred_df))
    out["recall_w"] = float(mc_eval_rec.evaluate(pred_df))
    out["f1"] = float(mc_eval_f1.evaluate(pred_df))

    # AUC may fail if only one class present in test (rare, but handle)
    try:
        out["roc_auc"] = float(auc_eval.evaluate(pred_df))
    except Exception as e:
        out["roc_auc"] = None
        out["roc_auc_error"] = str(e)[:200]

    return out

In [11]:
# 10) TRAIN 4 MODELS + LOG METRICS
# ======================================================
results = []

# --- Logistic Regression ---
lr = LogisticRegression(featuresCol="scaledFeatures", labelCol="target", maxIter=50, regParam=0.0)
pipe_lr = Pipeline(stages=[assembler, scaler, lr])
t0 = time.time()
m_lr = pipe_lr.fit(train_df)
pred_lr = m_lr.transform(test_df)
row = eval_binary(pred_lr, "LogisticRegression")
row["train_time_sec"] = time.time() - t0
results.append(row)

In [12]:
# --- Decision Tree ---
dt = DecisionTreeClassifier(featuresCol="scaledFeatures", labelCol="target", maxDepth=10)
pipe_dt = Pipeline(stages=[assembler, scaler, dt])
t0 = time.time()
m_dt = pipe_dt.fit(train_df)
pred_dt = m_dt.transform(test_df)
row = eval_binary(pred_dt, "DecisionTree")
row["train_time_sec"] = time.time() - t0
results.append(row)

In [14]:
# --- Random Forest ---
SEED = 42

rf = RandomForestClassifier(
    featuresCol="scaledFeatures",
    labelCol="target",
    numTrees=80,
    maxDepth=12,
    seed=SEED
)

pipe_rf = Pipeline(stages=[assembler, scaler, rf])

t0 = time.time()
m_rf = pipe_rf.fit(train_df)
pred_rf = m_rf.transform(test_df)

row = eval_binary(pred_rf, "RandomForest")
row["train_time_sec"] = time.time() - t0

results.append(row)

In [15]:
# --- Gradient Boosted Trees ---
gbt = GBTClassifier(featuresCol="scaledFeatures", labelCol="target", maxIter=60, maxDepth=6, seed=SEED)
pipe_gbt = Pipeline(stages=[assembler, scaler, gbt])
t0 = time.time()
m_gbt = pipe_gbt.fit(train_df)
pred_gbt = m_gbt.transform(test_df)
row = eval_binary(pred_gbt, "GBTClassifier")
row["train_time_sec"] = time.time() - t0
results.append(row)

In [16]:
# 11) SAVE MODEL PERFORMANCE CSV TO GOOGLE DRIVE

metrics_df = pd.DataFrame(results)
metrics_df.to_csv(METRICS_CSV_PATH, index=False)

print("\nSaved model performance CSV:", METRICS_CSV_PATH)
print("\nModel results:")
display(metrics_df)


Saved model performance CSV: /content/drive/MyDrive/CIC_IDS_Collection/model_reports/model_performance.csv

Model results:


Unnamed: 0,model,rows_train,rows_test,accuracy,precision_w,recall_w,f1,roc_auc,train_time_sec
0,LogisticRegression,80062,19938,0.831578,0.832261,0.831578,0.831506,0.911502,28.733965
1,DecisionTree,80062,19938,0.95807,0.958117,0.95807,0.958068,0.953826,11.799696
2,RandomForest,80062,19938,0.970308,0.97084,0.970308,0.970298,0.990007,56.066065
3,GBTClassifier,80062,19938,0.971813,0.972474,0.971813,0.971801,0.990289,68.415563


In [17]:
# 12) SAVE SPARK SAMPLE DATA AS CSV FOLDER TOO

spark_sample_df = df.limit(100000)
spark_sample_df.coalesce(1).write.mode("overwrite").option("header", True).csv(SAMPLE_CSV_DIR)
print("Saved Spark CSV folder:", SAMPLE_CSV_DIR)

# Cleanup cache
train_df.unpersist()
test_df.unpersist()

Saved Spark CSV folder: /content/drive/MyDrive/CIC_IDS_Collection/sample_data/cic_ids_sample_csv


DataFrame[Flow Duration: double, Total Fwd Packets: bigint, Total Backward Packets: bigint, Fwd Packets Length Total: double, Bwd Packets Length Total: double, Fwd Packet Length Max: double, Fwd Packet Length Mean: double, Fwd Packet Length Std: double, Bwd Packet Length Max: double, Bwd Packet Length Mean: double, Bwd Packet Length Std: double, Flow Bytes/s: double, Flow Packets/s: double, Flow IAT Mean: double, Flow IAT Std: double, Flow IAT Max: double, Flow IAT Min: double, Fwd IAT Total: double, Fwd IAT Mean: double, Fwd IAT Std: double, Fwd IAT Max: double, Fwd IAT Min: double, Bwd IAT Total: double, Bwd IAT Mean: double, Bwd IAT Std: double, Bwd IAT Max: double, Bwd IAT Min: double, Fwd PSH Flags: bigint, Fwd Header Length: bigint, Bwd Header Length: bigint, Fwd Packets/s: double, Bwd Packets/s: double, Packet Length Max: double, Packet Length Mean: double, Packet Length Std: double, Packet Length Variance: double, SYN Flag Count: bigint, URG Flag Count: bigint, Avg Packet Size: