## Step 1: Install PySpark and Import Libraries

In [1]:
# Install PySpark (Colab only)
!pip install pyspark -q

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType, StringType
import os
import time

print("‚úÖ Libraries imported successfully")

‚úÖ Libraries imported successfully


## Step 2: Mount Google Drive

In [2]:
# Mount Google Drive
try:
    from google.colab import drive
    drive.mount('/content/drive')
    BASE_DIR = "/content/drive/MyDrive/NetworkIDS"
    print(f"‚úÖ Google Drive mounted successfully!")
    print(f"üìÅ Base directory: {BASE_DIR}")
    IS_COLAB = True
except:
    BASE_DIR = "d:/Coding/real-time-network-intrusion-detection-spark-kafka/data"
    print(f"‚úÖ Running locally. Data directory: {BASE_DIR}")
    IS_COLAB = False

# Define paths
INPUT_DIR = f"{BASE_DIR}/output/parquet"
OUTPUT_DIR = f"{BASE_DIR}/output"

print(f"üìÇ Input directory: {INPUT_DIR}")
print(f"üìÇ Output directory: {OUTPUT_DIR}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
‚úÖ Google Drive mounted successfully!
üìÅ Base directory: /content/drive/MyDrive/NetworkIDS
üìÇ Input directory: /content/drive/MyDrive/NetworkIDS/output/parquet
üìÇ Output directory: /content/drive/MyDrive/NetworkIDS/output


## Step 3: Create Spark Session

In [3]:
# Create Spark session with MEMORY OPTIMIZED settings for Colab
spark = SparkSession.builder \
    .appName("NIDS-LabelHarmonization") \
    .config("spark.driver.memory", "10g") \
    .config("spark.sql.shuffle.partitions", "400") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.parquet.columnarReaderBatchSize", "1024") \
    .config("spark.sql.parquet.enableVectorizedReader", "false") \
    .config("spark.sql.files.maxPartitionBytes", "64m") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print(f"‚úÖ Spark session created (memory optimized)")
print(f"üìä Spark version: {spark.version}")

‚úÖ Spark session created (memory optimized)
üìä Spark version: 4.0.1


## Step 4: Load Preprocessed Datasets

In [4]:
# Load CIC-IDS 2017
print("Loading CIC-IDS 2017...")
df_2017 = spark.read.parquet(f"{INPUT_DIR}/cicids2017_preprocessed")
count_2017 = df_2017.count()
print(f"  ‚úÖ Loaded {count_2017:,} records")
print(f"  üìä Columns: {len(df_2017.columns)}")

# Load CIC-IDS 2018
print("\nLoading CIC-IDS 2018...")
df_2018 = spark.read.parquet(f"{INPUT_DIR}/cicids2018_preprocessed")
count_2018 = df_2018.count()
print(f"  ‚úÖ Loaded {count_2018:,} records")
print(f"  üìä Columns: {len(df_2018.columns)}")

print(f"\nüìà Total records: {count_2017 + count_2018:,}")

Loading CIC-IDS 2017...
  ‚úÖ Loaded 2,522,009 records
  üìä Columns: 82

Loading CIC-IDS 2018...
  ‚úÖ Loaded 15,741,406 records
  üìä Columns: 83

üìà Total records: 18,263,415


## Step 5: Inspect Label Distributions

In [5]:
# Check column names to find the label column
print("CIC-IDS 2017 columns:")
print([c for c in df_2017.columns if 'label' in c.lower()])

print("\nCIC-IDS 2018 columns:")
print([c for c in df_2018.columns if 'label' in c.lower()])

CIC-IDS 2017 columns:
['label', 'binary_label']

CIC-IDS 2018 columns:
['label', 'binary_label']


In [6]:
# Get label distributions for 2017
print("="*60)
print("CIC-IDS 2017 - Attack Type Distribution")
print("="*60)
labels_2017 = df_2017.groupBy('label').count().orderBy(F.desc('count'))
labels_2017.show(20, truncate=False)

CIC-IDS 2017 - Attack Type Distribution
+--------------------------+-------+
|label                     |count  |
+--------------------------+-------+
|BENIGN                    |2096134|
|DoS Hulk                  |172846 |
|DDoS                      |128016 |
|PortScan                  |90819  |
|DoS GoldenEye             |10286  |
|FTP-Patator               |5933   |
|DoS slowloris             |5385   |
|DoS Slowhttptest          |5228   |
|SSH-Patator               |3219   |
|Bot                       |1953   |
|Web Attack ÔøΩ Brute Force  |1470   |
|Web Attack ÔøΩ XSS          |652    |
|Infiltration              |36     |
|Web Attack ÔøΩ Sql Injection|21     |
|Heartbleed                |11     |
+--------------------------+-------+



In [7]:
# Get label distributions for 2018
print("="*60)
print("CIC-IDS 2018 - Attack Type Distribution")
print("="*60)
labels_2018 = df_2018.groupBy('label').count().orderBy(F.desc('count'))
labels_2018.show(20, truncate=False)

CIC-IDS 2018 - Attack Type Distribution
+------------------------+--------+
|label                   |count   |
+------------------------+--------+
|Benign                  |13388000|
|DDOS attack-HOIC        |668461  |
|DDoS attacks-LOIC-HTTP  |576175  |
|DoS attacks-Hulk        |434873  |
|Bot                     |282310  |
|Infilteration           |161059  |
|SSH-Bruteforce          |117322  |
|DoS attacks-GoldenEye   |41455   |
|FTP-BruteForce          |39346   |
|DoS attacks-SlowHTTPTest|19462   |
|DoS attacks-Slowloris   |10285   |
|DDOS attack-LOIC-UDP    |1730    |
|Brute Force -Web        |611     |
|Brute Force -XSS        |230     |
|SQL Injection           |87      |
+------------------------+--------+



## Step 6: Define Unified Label Mapping

We create 9 unified attack categories:
- 0: Benign (Normal traffic)
- 1: DoS (Denial of Service)
- 2: DDoS (Distributed DoS)
- 3: Brute Force (Password attacks)
- 4: Web Attack (XSS, SQL Injection)
- 5: Infiltration
- 6: Botnet
- 7: PortScan (Reconnaissance)
- 8: Other (Heartbleed, etc.)

In [8]:
# Unified label mapping for CIC-IDS 2017
LABEL_MAP_2017 = {
    # Benign
    'BENIGN': 0,

    # DoS attacks
    'DoS Hulk': 1,
    'DoS GoldenEye': 1,
    'DoS slowloris': 1,
    'DoS Slowhttptest': 1,

    # DDoS
    'DDoS': 2,

    # Brute Force
    'FTP-Patator': 3,
    'SSH-Patator': 3,

    # Web Attacks
    'Web Attack \x96 Brute Force': 4,
    'Web Attack \x96 XSS': 4,
    'Web Attack \x96 Sql Injection': 4,

    # Infiltration
    'Infiltration': 5,

    # Botnet
    'Bot': 6,

    # PortScan
    'PortScan': 7,

    # Other
    'Heartbleed': 8
}

# Unified label mapping for CIC-IDS 2018
LABEL_MAP_2018 = {
    # Benign
    'Benign': 0,

    # DoS attacks
    'DoS attacks-Hulk': 1,
    'DoS attacks-GoldenEye': 1,
    'DoS attacks-Slowloris': 1,
    'DoS attacks-SlowHTTPTest': 1,

    # DDoS
    'DDoS attack-HOIC': 2,
    'DDoS attacks-LOIC-HTTP': 2,
    'DDOS attack-LOIC-UDP': 2,
    'DDoS attack-LOIC-UDP': 2,

    # Brute Force
    'FTP-BruteForce': 3,
    'SSH-Bruteforce': 3,

    # Web Attacks
    'Brute Force -Web': 4,
    'Brute Force -XSS': 4,
    'SQL Injection': 4,

    # Infiltration
    'Infilteration': 5,

    # Botnet
    'Bot': 6
}

# Label names for reference
UNIFIED_LABELS = {
    0: 'Benign',
    1: 'DoS',
    2: 'DDoS',
    3: 'Brute_Force',
    4: 'Web_Attack',
    5: 'Infiltration',
    6: 'Botnet',
    7: 'PortScan',
    8: 'Other'
}

print("‚úÖ Label mappings defined")
print("\nUnified Categories:")
for k, v in UNIFIED_LABELS.items():
    print(f"  {k}: {v}")

‚úÖ Label mappings defined

Unified Categories:
  0: Benign
  1: DoS
  2: DDoS
  3: Brute_Force
  4: Web_Attack
  5: Infiltration
  6: Botnet
  7: PortScan
  8: Other


## Step 7: Apply Label Mapping

In [9]:
def apply_label_mapping(df, label_map, default_value=8):
    """
    Apply unified label mapping to dataframe.
    Unknown labels are mapped to 'Other' (8).
    """
    # Build CASE WHEN expression
    mapping_expr = F.lit(default_value)  # Default for unknown labels

    for original_label, unified_label in label_map.items():
        mapping_expr = F.when(F.col('label') == original_label, unified_label).otherwise(mapping_expr)

    df = df.withColumn('unified_label', mapping_expr.cast(IntegerType()))
    return df

print("‚úÖ Mapping function defined")

‚úÖ Mapping function defined


In [10]:
# Apply mapping to 2017
print("Applying label mapping to CIC-IDS 2017...")
df_2017 = apply_label_mapping(df_2017, LABEL_MAP_2017)
df_2017 = df_2017.withColumn('dataset_source', F.lit('cicids2017'))

# Verify mapping
print("\nCIC-IDS 2017 - Unified Label Distribution:")
df_2017.groupBy('unified_label').count().orderBy('unified_label').show()

Applying label mapping to CIC-IDS 2017...

CIC-IDS 2017 - Unified Label Distribution:
+-------------+-------+
|unified_label|  count|
+-------------+-------+
|            0|2096134|
|            1| 193745|
|            2| 128016|
|            3|   9152|
|            5|     36|
|            6|   1953|
|            7|  90819|
|            8|   2154|
+-------------+-------+



In [11]:
# Apply mapping to 2018
print("Applying label mapping to CIC-IDS 2018...")
df_2018 = apply_label_mapping(df_2018, LABEL_MAP_2018)
df_2018 = df_2018.withColumn('dataset_source', F.lit('cicids2018'))

# Verify mapping
print("\nCIC-IDS 2018 - Unified Label Distribution:")
df_2018.groupBy('unified_label').count().orderBy('unified_label').show()

Applying label mapping to CIC-IDS 2018...

CIC-IDS 2018 - Unified Label Distribution:
+-------------+--------+
|unified_label|   count|
+-------------+--------+
|            0|13388000|
|            1|  506075|
|            2|  577905|
|            3|  156668|
|            4|     928|
|            5|  161059|
|            6|  282310|
|            8|  668461|
+-------------+--------+



## Step 8: Align Columns and Merge Datasets

In [12]:
# Find common columns
cols_2017 = set(df_2017.columns)
cols_2018 = set(df_2018.columns)

common_cols = cols_2017.intersection(cols_2018)
only_2017 = cols_2017 - cols_2018
only_2018 = cols_2018 - cols_2017

print(f"Common columns: {len(common_cols)}")
print(f"Only in 2017: {len(only_2017)} - {only_2017}")
print(f"Only in 2018: {len(only_2018)} - {only_2018}")

Common columns: 33
Only in 2017: 51 - {'fin_flag_count', 'bwd_packet_length_mean', 'bwd_avg_bytes_bulk', 'flow_bytes_s', 'subflow_bwd_packets', 'fwd_packet_length_mean', 'destination_port', 'bwd_header_length', 'average_packet_size', 'init_win_bytes_forward', 'total_length_of_fwd_packets', 'bwd_iat_total', 'fwd_iat_total', 'fwd_packet_length_std', 'rst_flag_count', 'min_packet_length', 'packet_length_variance', 'fwd_header_length34', 'bwd_packet_length_std', 'fwd_packet_length_max', 'total_length_of_bwd_packets', 'act_data_pkt_fwd', 'flow_packets_s', 'packet_length_std', 'urg_flag_count', 'fwd_packets_s', 'psh_flag_count', 'max_packet_length', 'avg_fwd_segment_size', 'fwd_header_length55', 'packet_length_mean', 'avg_bwd_segment_size', 'fwd_avg_packets_bulk', 'min_seg_size_forward', 'bwd_packets_s', 'fwd_avg_bytes_bulk', 'bwd_packet_length_min', 'bwd_packet_length_max', 'bwd_avg_bulk_rate', 'syn_flag_count', 'subflow_fwd_bytes', 'subflow_bwd_bytes', 'fwd_avg_bulk_rate', 'bwd_avg_packets

In [13]:
# Select common columns for merging
# Ensure we include important columns
essential_cols = ['binary_label', 'unified_label', 'sample_weight', 'features_scaled', 'label', 'dataset_source']
merge_cols = list(common_cols)

# Add any essential columns that might be missing
for col in essential_cols:
    if col not in merge_cols and col in cols_2017 and col in cols_2018:
        merge_cols.append(col)

print(f"Columns to merge: {len(merge_cols)}")

# Select same columns from both dataframes
df_2017_aligned = df_2017.select(merge_cols)
df_2018_aligned = df_2018.select(merge_cols)

print(f"\n2017 aligned: {len(df_2017_aligned.columns)} columns")
print(f"2018 aligned: {len(df_2018_aligned.columns)} columns")

Columns to merge: 33

2017 aligned: 33 columns
2018 aligned: 33 columns


In [14]:
# Merge datasets - MEMORY OPTIMIZED VERSION
print("Merging datasets...")
start_time = time.time()

# Don't cache - let Spark handle memory automatically
df_merged = df_2017_aligned.union(df_2018_aligned)

# Don't count the full dataset - it triggers full materialization
# Just verify it works with a small sample
sample_count = df_merged.limit(10).count()
print(f"‚úÖ Merge successful (verified with sample)")

elapsed = time.time() - start_time
print(f"‚è±Ô∏è Merge operation took {elapsed:.2f} seconds")

# We'll get the total count when we save (more efficient)
print("üìù Total count will be calculated during save operation")

Merging datasets...
‚úÖ Merge successful (verified with sample)
‚è±Ô∏è Merge operation took 1.40 seconds
üìù Total count will be calculated during save operation


## Step 9: Verify Merged Dataset

In [15]:
# Check distribution by dataset source
print("Records by Dataset Source:")
df_merged.groupBy('dataset_source').count().show()

# Check unified label distribution
print("\nUnified Label Distribution (Merged):")
label_dist = df_merged.groupBy('unified_label').count().orderBy('unified_label')
label_dist.show()

Records by Dataset Source:
+--------------+--------+
|dataset_source|   count|
+--------------+--------+
|    cicids2017| 2522009|
|    cicids2018|15741406|
+--------------+--------+


Unified Label Distribution (Merged):
+-------------+--------+
|unified_label|   count|
+-------------+--------+
|            0|15484134|
|            1|  699820|
|            2|  705921|
|            3|  165820|
|            4|     928|
|            5|  161095|
|            6|  284263|
|            7|   90819|
|            8|  670615|
+-------------+--------+



In [16]:
# Add label names for readability
print("Unified Label Distribution with Names:")
print("="*50)

label_counts = df_merged.groupBy('unified_label').count().orderBy('unified_label').collect()
total = sum([row['count'] for row in label_counts])

for row in label_counts:
    label_id = row['unified_label']
    count = row['count']
    pct = (count / total) * 100
    label_name = UNIFIED_LABELS.get(label_id, 'Unknown')
    print(f"{label_id}: {label_name:15} - {count:>10,} ({pct:5.2f}%)")

print("="*50)
print(f"Total: {total:,}")

Unified Label Distribution with Names:
0: Benign          - 15,484,134 (84.78%)
1: DoS             -    699,820 ( 3.83%)
2: DDoS            -    705,921 ( 3.87%)
3: Brute_Force     -    165,820 ( 0.91%)
4: Web_Attack      -        928 ( 0.01%)
5: Infiltration    -    161,095 ( 0.88%)
6: Botnet          -    284,263 ( 1.56%)
7: PortScan        -     90,819 ( 0.50%)
8: Other           -    670,615 ( 3.67%)
Total: 18,263,415


## Step 10: Recalculate Class Weights

In [17]:
# Calculate class weights for unified labels (for multi-class)
def calculate_multiclass_weights(df, label_col):
    """Calculate inverse frequency weights for multi-class classification"""
    class_counts = df.groupBy(label_col).count().collect()
    max_count = max([row['count'] for row in class_counts])
    weights = {row[label_col]: float(max_count) / row['count'] for row in class_counts}
    return weights

# Calculate weights for unified labels
multiclass_weights = calculate_multiclass_weights(df_merged, 'unified_label')

print("Multi-class Weights (Unified Labels):")
print("="*50)
for label_id in sorted(multiclass_weights.keys()):
    weight = multiclass_weights[label_id]
    label_name = UNIFIED_LABELS.get(label_id, 'Unknown')
    print(f"{label_id}: {label_name:15} - weight: {weight:.4f}")

Multi-class Weights (Unified Labels):
0: Benign          - weight: 1.0000
1: DoS             - weight: 22.1259
2: DDoS            - weight: 21.9347
3: Brute_Force     - weight: 93.3792
4: Web_Attack      - weight: 16685.4892
5: Infiltration    - weight: 96.1180
6: Botnet          - weight: 54.4712
7: PortScan        - weight: 170.4944
8: Other           - weight: 23.0895


In [18]:
# Add multiclass sample weights
def add_multiclass_weights(df, label_col, weights):
    """Add sample weights for multi-class classification"""
    weight_expr = F.lit(1.0)  # Default weight

    for label_val, weight_val in weights.items():
        weight_expr = F.when(F.col(label_col) == label_val, weight_val).otherwise(weight_expr)

    df = df.withColumn('multiclass_weight', weight_expr)
    return df

# Apply multiclass weights
df_merged = add_multiclass_weights(df_merged, 'unified_label', multiclass_weights)
print("‚úÖ Multi-class weights added")

# Verify
df_merged.select('unified_label', 'multiclass_weight').distinct().orderBy('unified_label').show()

‚úÖ Multi-class weights added
+-------------+------------------+
|unified_label| multiclass_weight|
+-------------+------------------+
|            0|               1.0|
|            1|22.125880940813353|
|            2|21.934655577607128|
|            3| 93.37917018453744|
|            4| 16685.48922413793|
|            5| 96.11802973400788|
|            6| 54.47115523300605|
|            7|170.49443398407823|
|            8|23.089453710400154|
+-------------+------------------+



## Step 11: Save Harmonized Dataset

In [19]:
# Re-verify Google Drive connection before saving
if IS_COLAB:
    try:
        os.listdir(BASE_DIR)
        print("‚úÖ Google Drive connection verified")
    except:
        print("‚ö†Ô∏è Drive disconnected! Remounting...")
        from google.colab import drive
        drive.mount('/content/drive', force_remount=True)
        print("‚úÖ Drive remounted successfully")

‚úÖ Google Drive connection verified


In [20]:
# Save in chunks to avoid memory issues
print("Saving harmonized dataset in optimized way...")
start_time = time.time()

output_path = f"{OUTPUT_DIR}/parquet/cicids_merged_harmonized"

# Write without repartition - let Spark handle it naturally
# coalesce reduces partitions without shuffle (less memory)
df_merged.coalesce(50).write.mode('overwrite').parquet(output_path)

elapsed = time.time() - start_time
print(f"‚úÖ Saved to: {output_path}")
print(f"‚è±Ô∏è Time: {elapsed:.2f} seconds")

Saving harmonized dataset in optimized way...
‚úÖ Saved to: /content/drive/MyDrive/NetworkIDS/output/parquet/cicids_merged_harmonized
‚è±Ô∏è Time: 210.02 seconds


In [22]:
# Save metadata/weights for later use
import json

# Get count from the saved file
print("Counting records from saved file...")
total_count = spark.read.parquet(f"{OUTPUT_DIR}/parquet/cicids_merged_harmonized").count()
print(f"üìä Total records: {total_count:,}")

metadata = {
    'total_records': total_count,
    'num_features': len([c for c in df_merged.columns if c not in ['label', 'binary_label', 'unified_label', 'sample_weight', 'multiclass_weight', 'features_scaled', 'dataset_source']]),
    'unified_labels': UNIFIED_LABELS,
    'multiclass_weights': {str(k): v for k, v in multiclass_weights.items()},
    'records_by_source': {
        'cicids2017': count_2017,
        'cicids2018': count_2018
    }
}

metadata_path = f"{OUTPUT_DIR}/cicids_merged_metadata.json"
with open(metadata_path, 'w') as f:
    json.dump(metadata, f, indent=2)

print(f"‚úÖ Metadata saved to: {metadata_path}")
print("\nMetadata:")
print(json.dumps(metadata, indent=2))

Counting records from saved file...
üìä Total records: 18,263,415
‚úÖ Metadata saved to: /content/drive/MyDrive/NetworkIDS/output/cicids_merged_metadata.json

Metadata:
{
  "total_records": 18263415,
  "num_features": 27,
  "unified_labels": {
    "0": "Benign",
    "1": "DoS",
    "2": "DDoS",
    "3": "Brute_Force",
    "4": "Web_Attack",
    "5": "Infiltration",
    "6": "Botnet",
    "7": "PortScan",
    "8": "Other"
  },
  "multiclass_weights": {
    "3": 93.37917018453744,
    "5": 96.11802973400788,
    "7": 170.49443398407823,
    "0": 1.0,
    "1": 22.125880940813353,
    "6": 54.47115523300605,
    "8": 23.089453710400154,
    "2": 21.934655577607128,
    "4": 16685.48922413793
  },
  "records_by_source": {
    "cicids2017": 2522009,
    "cicids2018": 15741406
  }
}


## Step 12: Verification - Load and Check Saved Data

In [23]:
# Verify saved data
print("Verifying saved data...")

df_verify = spark.read.parquet(output_path)
verify_count = df_verify.count()

print(f"\n‚úÖ Verification successful!")
print(f"  Records: {verify_count:,}")
print(f"  Columns: {len(df_verify.columns)}")

# Show sample
print("\nSample data:")
df_verify.select('label', 'binary_label', 'unified_label', 'multiclass_weight', 'dataset_source').show(5)

Verifying saved data...

‚úÖ Verification successful!
  Records: 18,263,415
  Columns: 34

Sample data:
+------+------------+-------------+-----------------+--------------+
| label|binary_label|unified_label|multiclass_weight|dataset_source|
+------+------------+-------------+-----------------+--------------+
|BENIGN|           0|            0|              1.0|    cicids2017|
|BENIGN|           0|            0|              1.0|    cicids2017|
|BENIGN|           0|            0|              1.0|    cicids2017|
|BENIGN|           0|            0|              1.0|    cicids2017|
|BENIGN|           0|            0|              1.0|    cicids2017|
+------+------------+-------------+-----------------+--------------+
only showing top 5 rows


## Summary

### What was created:
1. **Harmonized Dataset**: `cicids_merged_harmonized.parquet`
   - Combined CIC-IDS 2017 + 2018
   - ~19 million records
   - Unified attack labels (9 categories)

2. **Metadata**: `cicids_merged_metadata.json`
   - Class weights for training
   - Label mappings
   - Record counts

### Columns available for training:
- `features_scaled`: Scaled feature vector (for ML models)
- `binary_label`: 0=Benign, 1=Attack (for binary classification)
- `unified_label`: 0-8 (for multi-class classification)
- `sample_weight`: Binary class weights
- `multiclass_weight`: Multi-class weights
- `label`: Original attack type string
- `dataset_source`: cicids2017 or cicids2018

### Next Steps:
1. Open `model_training.ipynb`
2. Load harmonized dataset
3. Train binary and multi-class classifiers
4. Evaluate model performance

In [24]:
# Cleanup
spark.stop()
print("‚úÖ Spark session stopped")
print("\nüéâ Label harmonization complete! Ready for model training.")

‚úÖ Spark session stopped

üéâ Label harmonization complete! Ready for model training.
