In [1]:
!pip install -q pandas matplotlib seaborn tqdm fastparquet python-snappy

## Importing Libraries and Installation

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from pathlib import Path
import warnings
import json
from tqdm import tqdm
import fastparquet
import gc

# Configure plotting style and suppress warnings
sns.set_theme(style="whitegrid", palette="husl")
warnings.filterwarnings('ignore')

print("All necessary libraries are installed and imported.")

All necessary libraries are installed and imported.


## Configuration and Data Discovery

In [None]:
BASE_DATA_DIR = Path("iot23_csv_data")
RESULTS_DIR = Path("results")
PROCESSED_DATA_DIR = Path("processed_data")

# Create output directories
RESULTS_DIR.mkdir(exist_ok=True)
PROCESSED_DATA_DIR.mkdir(exist_ok=True)

# Discover scenario files
scenarios = {}
if not BASE_DATA_DIR.exists():
    print(f"FATAL ERROR: The directory '{BASE_DATA_DIR}' does not exist.")
else:
    for path in BASE_DATA_DIR.iterdir():
        if path.is_dir() and path.name.startswith("CTU-"):
            log_file = path / 'conn.log.labeled.csv'
            if log_file.exists():
                scenarios[path.name] = log_file

print(f"Discovered {len(scenarios)} scenarios ready for processing.")

Discovered 18 scenarios ready for processing.


In [8]:
scenarios

{'CTU-IoT-Malware-Capture-20-1': PosixPath('iot23_csv_data/CTU-IoT-Malware-Capture-20-1/conn.log.labeled.csv'),
 'CTU-Honeypot-Capture-4-1': PosixPath('iot23_csv_data/CTU-Honeypot-Capture-4-1/conn.log.labeled.csv'),
 'CTU-IoT-Malware-Capture-52-1': PosixPath('iot23_csv_data/CTU-IoT-Malware-Capture-52-1/conn.log.labeled.csv'),
 'CTU-IoT-Malware-Capture-60-1': PosixPath('iot23_csv_data/CTU-IoT-Malware-Capture-60-1/conn.log.labeled.csv'),
 'CTU-IoT-Malware-Capture-8-1': PosixPath('iot23_csv_data/CTU-IoT-Malware-Capture-8-1/conn.log.labeled.csv'),
 'CTU-IoT-Malware-Capture-49-1': PosixPath('iot23_csv_data/CTU-IoT-Malware-Capture-49-1/conn.log.labeled.csv'),
 'CTU-IoT-Malware-Capture-3-1': PosixPath('iot23_csv_data/CTU-IoT-Malware-Capture-3-1/conn.log.labeled.csv'),
 'CTU-IoT-Malware-Capture-21-1': PosixPath('iot23_csv_data/CTU-IoT-Malware-Capture-21-1/conn.log.labeled.csv'),
 'CTU-IoT-Malware-Capture-9-1': PosixPath('iot23_csv_data/CTU-IoT-Malware-Capture-9-1/conn.log.labeled.csv'),
 'CTU-

## Master Parquet File Creation

In [None]:
combined_parquet_path = PROCESSED_DATA_DIR / 'master_iot23_data.parquet'
chunksize = 250000  
is_first_chunk = True

print(f"--- Starting memory-safe combination to Parquet: {combined_parquet_path} ---")

for scenario_name, file_path in tqdm(scenarios.items(), desc="Combining Scenarios"):
    try:
        # Create an iterator that reads the CSV in chunks
        chunk_iterator = pd.read_csv(
            file_path, 
            chunksize=chunksize, 
            low_memory=False, 
            na_values='-' 
        )
        
        for chunk in chunk_iterator:
            chunk['scenario'] = scenario_name
            
            if is_first_chunk:
                # For the very first chunk, create a new Parquet file
                fastparquet.write(
                    combined_parquet_path, 
                    chunk, 
                    compression='SNAPPY',
                    append=False 
                )
                is_first_chunk = False
            else:
                # For all subsequent chunks, append to the existing file
                fastparquet.write(
                    combined_parquet_path, 
                    chunk, 
                    compression='SNAPPY',
                    append=True
                )
    except Exception as e:
        print(f"  - An error occurred while processing {scenario_name}: {e}")

print(f"\nSuccessfully created master Parquet file.")

--- Starting memory-safe combination to Parquet: processed_data/master_iot23_data.parquet ---


Combining Scenarios:  83%|████████▎ | 15/18 [04:25<00:43, 14.35s/it]

  - An error occurred while processing CTU-IoT-Malware-Capture-34-1: Error converting column "ts" to bytes using encoding None. Original error: could not convert string to float: 'CrDn63WjJEmrWGjqf'


Combining Scenarios: 100%|██████████| 18/18 [06:16<00:00, 20.89s/it]


Successfully created master Parquet file.





## Feature Engineering

In [None]:
master_parquet_path = PROCESSED_DATA_DIR / 'master_iot23_data.parquet'
enriched_parquet_path = PROCESSED_DATA_DIR / 'enriched_iot23_data.parquet'

# Pre-calculate the global minimum timestamp for the 'time_since_start' feature
print("Pre-calculating global minimum timestamp...")
min_ts = fastparquet.ParquetFile(master_parquet_path).to_pandas(['ts'])['ts'].min()
global_min_datetime = pd.to_datetime(min_ts, unit='s')
print(f"Global minimum timestamp found: {global_min_datetime}")

# Main Processing Loop 
pf = fastparquet.ParquetFile(master_parquet_path)
is_first_chunk = True

print(f"\n--- Starting memory-safe feature engineering to: {enriched_parquet_path} ---")

for chunk_df in tqdm(pf.iter_row_groups(), desc="Enriching Data"):
    try:
        # Feature Engineering: Temporal Features 
        chunk_df.dropna(subset=['ts'], inplace=True)
        chunk_df['datetime'] = pd.to_datetime(chunk_df['ts'], unit='s', errors='coerce')
        chunk_df['hour'] = chunk_df['datetime'].dt.hour
        chunk_df['day_of_week'] = chunk_df['datetime'].dt.dayofweek
        chunk_df['time_since_start'] = (chunk_df['datetime'] - global_min_datetime).dt.total_seconds()

        # Feature Engineering: Standardize Labels & Phases 
        if 'label' in chunk_df.columns:
            chunk_df['label'].fillna('Unknown', inplace=True)
            label_mapping = {
                'Benign': 'Benign', 'benign': 'Benign', 'Malicious': 'Malicious', 'C&C': 'Command_Control',
                'CC': 'Command_Control', 'DDoS': 'DDoS_Attack', 'DDOS': 'DDoS_Attack',
                'PartOfAHorizontalPortScan': 'Port_Scan', 'PartOfHorizontalPortScan': 'Port_Scan',
                'Attack': 'Direct_Attack', 'FileDownload': 'File_Download', 'HeartBeat': 'Heartbeat',
                'Okiru': 'Okiru_Activity', 'Mirai': 'Mirai_Activity', 'Torii': 'Torii_Activity'
            }
            chunk_df['standardized_label'] = chunk_df['label'].astype(str).map(label_mapping).fillna(chunk_df['label'])
            
            def classify_attack_phase(label):
                label = str(label).lower()
                if 'benign' in label: return 'Normal_Activity'
                elif 'scan' in label: return 'Reconnaissance'
                elif 'command' in label or 'control' in label: return 'C&C_Communication'
                elif 'ddos' in label: return 'Denial_of_Service'
                elif 'attack' in label: return 'Direct_Attack'
                elif 'download' in label: return 'Payload_Delivery'
                else: return 'Other_Malicious'
            chunk_df['attack_phase'] = chunk_df['standardized_label'].apply(classify_attack_phase)

        # Feature Engineering: Calculate Flow Metrics 
        numeric_cols = ['duration', 'orig_bytes', 'resp_bytes', 'orig_pkts', 'resp_pkts']
        for col in numeric_cols:
            if col in chunk_df.columns: chunk_df[col].fillna(0, inplace=True)
        
        chunk_df['total_bytes'] = chunk_df['orig_bytes'] + chunk_df['resp_bytes']
        if 'conn_state' in chunk_df.columns:
            chunk_df['connection_successful'] = chunk_df['conn_state'].isin(['SF', 'S1', 'S2', 'S3'])

        # Write the enriched chunk to the new Parquet file 
        if is_first_chunk:
            fastparquet.write(enriched_parquet_path, chunk_df, compression='SNAPPY', append=False)
            is_first_chunk = False
        else:
            fastparquet.write(enriched_parquet_path, chunk_df, compression='SNAPPY', append=True)
            
    except Exception as e:
        print(f"  - An error occurred while processing a chunk: {e}")

print(f"\nSuccessfully created final enriched Parquet file.")

Pre-calculating global minimum timestamp...
Global minimum timestamp found: 2018-05-09 15:30:31.015073061

--- Starting memory-safe feature engineering to: processed_data/enriched_iot23_data.parquet ---


Enriching Data: 313it [05:01,  1.04it/s]


Successfully created final enriched Parquet file.





In [11]:
del chunk_df
gc.collect()

238