# Dataset Preprocessing 

This notebook demonstrates the complete pipeline for:
1. Downloading the Edge-IIoT dataset from Kaggle
2. Loading and analyzing the datasets
3. Data cleaning and validation
4. Merging datasets and organizing by device
5. Converting data to streaming format
6. Exporting processed data for streaming

Dataset: [Edge-IIoT Set Dataset](https://www.kaggle.com/datasets/sibasispradhan/edge-iiotset-dataset)

The Edge-IIoT dataset contains sensor data from IoT edge devices for anomaly detection research.


**Preprocessing Kaggle Notebook Link:** [Preprocessing](https://www.kaggle.com/code/imedbenmadi/federated-learning-preprocessing)

## Install required packages

In [2]:
!pip install --quiet kaggle pandas numpy kafka-python python-dotenv

In [3]:
import os
import json
import time
from datetime import datetime
from pathlib import Path

import pandas as pd
import numpy as np

os.environ['KAGGLE_CONFIG_DIR'] = os.path.expanduser('~/.kaggle')

print("Libraries imported successfully")


Libraries imported successfully


## 2. Download Dataset



In [4]:
from kaggle.api.kaggle_api_extended import KaggleApi

data_dir = Path('./edge_iiot_data')
data_dir.mkdir(exist_ok=True)

print(f"Data directory: {data_dir.absolute()}")

api = KaggleApi()
api.authenticate()
print("Kaggle API authenticated")


Data directory: c:\Users\imadb\OneDrive\Bureau\OST Project\notebooks\edge_iiot_data
Kaggle API authenticated


In [5]:
# Check if files already exist
csv_files = list(data_dir.glob('*.csv'))
if len(csv_files) >= 3:
    print("Dataset files already exist locally\n")
    print(f"Files in {data_dir}:")
    for file in sorted(csv_files):
        size_mb = file.stat().st_size / (1024 * 1024)
        print(f"  {file.name} ({size_mb:.2f} MB)")
else:
    print("Downloading Edge-IIoT dataset from Kaggle...")
    print("This may take several minutes...\n")
    
    api.dataset_download_files(
        'sibasispradhan/edge-iiotset-dataset',
        path=data_dir,
        unzip=True
    )
    
    print("Dataset downloaded successfully\n")
    print(f"Files in {data_dir}:")
    for file in sorted(data_dir.glob('*.csv')):
        size_mb = file.stat().st_size / (1024 * 1024)
        print(f"  {file.name} ({size_mb:.2f} MB)")


Dataset files already exist locally

Files in edge_iiot_data:
  DNN-EdgeIIoT-dataset.csv (1161.02 MB)
  live_data_training.csv (96.99 MB)
  ML-EdgeIIoT-dataset.csv (78.38 MB)


## 3. Load and Explore Datasets

Load all CSV files and analyze their structure and content.


In [6]:
csv_files = sorted(data_dir.glob('*.csv'))
print(f"Found {len(csv_files)} CSV files\n")

datasets = {}
for csv_file in csv_files:
    print(f"Loading {csv_file.name}...")
    df = pd.read_csv(csv_file)
    datasets[csv_file.stem] = df
    print(f"  Shape: {df.shape}")
    print(f"  Columns: {len(df.columns)}")
    print(f"  Memory: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n")

print(f"Loaded {len(datasets)} datasets")


Found 3 CSV files

Loading DNN-EdgeIIoT-dataset.csv...


  df = pd.read_csv(csv_file)


  Shape: (2219201, 63)
  Columns: 63
  Memory: 3330.01 MB

Loading live_data_training.csv...
  Memory: 3330.01 MB

Loading live_data_training.csv...


  df = pd.read_csv(csv_file)


  Shape: (32481, 63)
  Columns: 63
  Memory: 120.53 MB

Loading ML-EdgeIIoT-dataset.csv...


  df = pd.read_csv(csv_file)


  Shape: (157800, 63)
  Columns: 63
  Memory: 235.52 MB

Loaded 3 datasets
  Memory: 235.52 MB

Loaded 3 datasets


In [7]:
for name, df in datasets.items():
    print(f"\n{'='*60}")
    print(f"Dataset: {name}")
    print(f"{'='*60}")
    print(f"\nDimensions: {df.shape[0]} rows x {df.shape[1]} columns")
    print(f"\nData types:")
    print(df.dtypes)
    print(f"\nFirst few rows:")
    print(df.head())
    print(f"\nMissing values:")
    missing = df.isnull().sum()
    if missing.sum() == 0:
        print("None")
    else:
        print(missing[missing > 0])



Dataset: DNN-EdgeIIoT-dataset

Dimensions: 2219201 rows x 63 columns

Data types:
frame.time             object
ip.src_host            object
ip.dst_host            object
arp.dst.proto_ipv4     object
arp.opcode            float64
                       ...   
mbtcp.len             float64
mbtcp.trans_id        float64
mbtcp.unit_id         float64
Attack_label            int64
Attack_type            object
Length: 63, dtype: object

First few rows:
                  frame.time    ip.src_host    ip.dst_host arp.dst.proto_ipv4  \
0   2021 11:44:10.081753000   192.168.0.128  192.168.0.101                  0   
1   2021 11:44:10.162218000   192.168.0.101  192.168.0.128                  0   
2   2021 11:44:10.162271000   192.168.0.128  192.168.0.101                  0   
3   2021 11:44:10.162641000   192.168.0.128  192.168.0.101                  0   
4   2021 11:44:10.166132000   192.168.0.101  192.168.0.128                  0   

   arp.opcode  arp.hw.size arp.src.proto_ipv4  icmp.check

## 4. Data Cleaning

**Remove duplicates, handle missing values, and convert data types.**


In [8]:
def clean_dataset(df, name):
    """Clean and validate dataset"""
    print(f"Cleaning {name}...")
    
    initial_rows = len(df)
    
    # Remove duplicates
    df_clean = df.drop_duplicates().reset_index(drop=True)
    duplicates = initial_rows - len(df_clean)
    if duplicates > 0:
        print(f"  Removed {duplicates} duplicate rows")
    
    # Drop rows with any missing values
    rows_before = len(df_clean)
    df_clean = df_clean.dropna()
    missing_removed = rows_before - len(df_clean)
    if missing_removed > 0:
        print(f"  Removed {missing_removed} rows with missing values")
    
    # Convert object columns to numeric where possible
    converted_success = 0
    for col in df_clean.columns:
        if df_clean[col].dtype == 'object':
            try:
                converted = pd.to_numeric(df_clean[col], errors='coerce')
                if converted.notna().sum() > 0:
                    df_clean[col] = converted
                    converted_success += 1
            except:
                pass
    if converted_success > 0:
        print(f"  Converted {converted_success} object columns to numeric")
    
    # Add source dataset indicator
    df_clean['dataset_source'] = name
    
    print(f"  Final: {len(df_clean):,} rows ({100*len(df_clean)/initial_rows:.1f}% retained)\n")
    return df_clean

cleaned_datasets = {}
for name, df in datasets.items():
    cleaned_datasets[name] = clean_dataset(df, name)


Cleaning DNN-EdgeIIoT-dataset...
  Removed 815 duplicate rows
  Removed 815 duplicate rows
  Converted 19 object columns to numeric
  Final: 2,218,386 rows (100.0% retained)

Cleaning live_data_training...
  Converted 19 object columns to numeric
  Final: 2,218,386 rows (100.0% retained)

Cleaning live_data_training...
  Removed 77 duplicate rows
  Removed 77 duplicate rows
  Converted 6 object columns to numeric
  Final: 32,404 rows (99.8% retained)

Cleaning ML-EdgeIIoT-dataset...
  Converted 6 object columns to numeric
  Final: 32,404 rows (99.8% retained)

Cleaning ML-EdgeIIoT-dataset...
  Removed 814 duplicate rows
  Removed 814 duplicate rows
  Converted 19 object columns to numeric
  Final: 156,986 rows (99.5% retained)

  Converted 19 object columns to numeric
  Final: 156,986 rows (99.5% retained)



## 5. Merge Datasets

Combine all cleaned datasets into a single dataframe.


In [14]:
list(cleaned_datasets.keys())[0]

'DNN-EdgeIIoT-dataset'

In [None]:
print("Merging datasets...")

# Analyze columns across datasets
all_columns = set()
for df in cleaned_datasets.values():
    all_columns.update(df.columns)

print(f"Total unique columns across all datasets: {len(all_columns)}")

# Find common columns
common_cols = set(cleaned_datasets[list(cleaned_datasets.keys())[0]].columns)
for df in list(cleaned_datasets.values())[1:]:
    common_cols &= set(df.columns)

print(f"Columns in all datasets: {len(common_cols)}")
print(f"Common columns: {sorted(common_cols)}\n")

# Merge all datasets
df_merged = pd.concat(cleaned_datasets.values(), ignore_index=True, sort=False)

print(f"Merged dataset shape: {df_merged.shape}")
print(f"Total rows: {len(df_merged):,}")
print(f"Total columns: {len(df_merged.columns)}")

print(f"\nDataset sources distribution:")
print(df_merged['dataset_source'].value_counts())

# Calculate data sparsity
total_cells = df_merged.shape[0] * df_merged.shape[1]
non_null = df_merged.notna().sum().sum()
sparsity = (1 - non_null / total_cells) * 100

print(f"\nData quality metrics:")
print(f"  Total cells: {total_cells:,}")
print(f"  Non-null cells: {non_null:,}")
print(f"  Sparsity: {sparsity:.1f}%")

print(f"\nMissing values per column (top 10):")
missing_per_col = df_merged.isnull().sum().sort_values(ascending=False)
print(missing_per_col.head(10))


Merging datasets...
Total unique columns across all datasets: 64
Columns in all datasets: 64
Common columns: ['Attack_label', 'Attack_type', 'arp.dst.proto_ipv4', 'arp.hw.size', 'arp.opcode', 'arp.src.proto_ipv4', 'dataset_source', 'dns.qry.name', 'dns.qry.name.len', 'dns.qry.qu', 'dns.qry.type', 'dns.retransmission', 'dns.retransmit_request', 'dns.retransmit_request_in', 'frame.time', 'http.content_length', 'http.file_data', 'http.referer', 'http.request.full_uri', 'http.request.method', 'http.request.uri.query', 'http.request.version', 'http.response', 'http.tls_port', 'icmp.checksum', 'icmp.seq_le', 'icmp.transmit_timestamp', 'icmp.unused', 'ip.dst_host', 'ip.src_host', 'mbtcp.len', 'mbtcp.trans_id', 'mbtcp.unit_id', 'mqtt.conack.flags', 'mqtt.conflag.cleansess', 'mqtt.conflags', 'mqtt.hdrflags', 'mqtt.len', 'mqtt.msg', 'mqtt.msg_decoded_as', 'mqtt.msgtype', 'mqtt.proto_len', 'mqtt.protoname', 'mqtt.topic', 'mqtt.topic_len', 'mqtt.ver', 'tcp.ack', 'tcp.ack_raw', 'tcp.checksum', 'tcp

In [10]:
# Check for existing device/identifier columns
print("Looking for device identifier columns...")
device_col_candidates = ['device_id', 'Device_ID', 'DeviceID', 'device', 'Device', 'id', 'ID', 'src_ip', 'dst_ip', 'ip.src', 'ip.dst']

device_col = None
for col in device_col_candidates:
    if col in df_merged.columns:
        # Check if column has meaningful values (not all NaN)
        if df_merged[col].notna().sum() > 0:
            device_col = col
            print(f"Found column: {col}")
            break

if device_col is None:
    print("No device identifier column found")
    print(f"Using dataset source + random grouping instead\n")
    
    # Group by dataset source and create device IDs within each
    num_devices = max(5, len(df_merged) // 1000)
    df_merged['device_id'] = df_merged.groupby('dataset_source').cumcount() % num_devices
    print(f"Created {num_devices} synthetic device IDs per dataset")
else:
    print(f"Using existing device column: {device_col}")
    df_merged.rename(columns={device_col: 'device_id'}, inplace=True)

print(f"\nDevices distribution:")
print(df_merged['device_id'].value_counts().sort_index())

print(f"\nDevice counts by dataset:")
print(df_merged.groupby(['dataset_source', 'device_id']).size().unstack(fill_value=0))


Looking for device identifier columns...
No device identifier column found
Using dataset source + random grouping instead

Created 2407 synthetic device IDs per dataset

Devices distribution:
device_id
0       1002
1       1002
2       1002
3       1002
4       1002
        ... 
2402     999
2403     999
2404     999
2405     999
2406     999
Name: count, Length: 2407, dtype: int64

Device counts by dataset:
Created 2407 synthetic device IDs per dataset

Devices distribution:
device_id
0       1002
1       1002
2       1002
3       1002
4       1002
        ... 
2402     999
2403     999
2404     999
2405     999
2406     999
Name: count, Length: 2407, dtype: int64

Device counts by dataset:
device_id             0     1     2     3     4     5     6     7     8     \
dataset_source                                                               
DNN-EdgeIIoT-dataset   922   922   922   922   922   922   922   922   922   
ML-EdgeIIoT-dataset     66    66    66    66    66    66    66   

## 6. Group by Device

Organize the dataset by device for streaming preparation.


In [11]:
print("Grouping data by device...")

device_groups = {}
for device_id, group in df_merged.groupby('device_id'):
    device_groups[str(device_id)] = group.reset_index(drop=True)

print(f"Created {len(device_groups)} device groups\n")

print("Device group statistics:")
print("-" * 60)
for device_id, group in sorted(device_groups.items()):
    print(f"Device {device_id}: {len(group)} rows")

print(f"\nData grouped by device")


Grouping data by device...


MemoryError: Unable to allocate 460. KiB for an array with shape (59, 999) and data type float64

## 7. Export Results

Save preprocessed data and statistics.


In [None]:
output_dir = Path('./edge_iiot_processed')
output_dir.mkdir(exist_ok=True)

print(f"Exporting processed data to {output_dir}\n")

# Export merged dataset
merged_file = output_dir / 'merged_data.csv'
df_merged.to_csv(merged_file, index=False)
print(f"Merged data: {merged_file}")
print(f"  Size: {merged_file.stat().st_size / 1024**2:.2f} MB")

# Export device-specific files
print(f"\nDevice files:")
for device_id, df_device in device_groups.items():
    device_file = output_dir / f'device_{device_id}.csv'
    df_device.to_csv(device_file, index=False)
    print(f"  device_{device_id}.csv ({len(df_device)} rows)")

# Export summary
summary = {
    'total_records': len(df_merged),
    'total_devices': len(device_groups),
    'total_columns': len(df_merged.columns),
    'dataset_sources': df_merged['dataset_source'].value_counts().to_dict(),
    'processing_timestamp': datetime.now().isoformat()
}

summary_file = output_dir / 'processing_summary.json'
with open(summary_file, 'w') as f:
    json.dump(summary, f, indent=2)

print(f"\nProcessing summary:")
for key, value in summary.items():
    print(f"  {key}: {value}")

print(f"\nExport complete!")

NameError: name 'Path' is not defined

## Summary

This notebook completed the following steps:

1. Downloaded the Edge-IIoT dataset from Kaggle
2. Loaded and analyzed three CSV files
3. Cleaned data by removing duplicates and handling missing values
4. Merged datasets into a consolidated dataframe
5. Organized data by device identifier
6. Converted data to JSON message format suitable for streaming
7. Demonstrated streaming at 10 rows per second
8. Exported processed data to multiple formats

### Output files
- `merged_data.csv` - Complete merged dataset
- `device_*.csv` - Per-device data files  
- `kafka_queue.json` - Formatted for Kafka streaming
- `processing_summary.json` - Processing statistics

The preprocessed data is ready for integration with a Kafka broker or other streaming systems.
