In [10]:
!pip install pytorch-tabnet imbalanced-learn scikit-learn dask_ml




In [11]:
import pandas as pd
import numpy as np
from dask_ml.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder, label_binarize
from imblearn.over_sampling import SMOTE
import dask.dataframe as dd
from sklearn.feature_selection import mutual_info_classif
from dask_ml.model_selection import train_test_split
from pytorch_tabnet.tab_model import TabNetClassifier
import torch
from sklearn.metrics import (confusion_matrix, classification_report,
                             hamming_loss, f1_score, roc_curve, roc_auc_score)
import matplotlib.pyplot as plt




In [12]:
df = dd.read_csv(
    "/kaggle/input/gotham/merged_dataset.csv",
    dtype={
        'tcp.checksum': 'object',
        'tcp.flags': 'object',
        'tcp.options': 'object'
    },
    assume_missing=True,  # allows mixed types
    blocksize="256MB"      # adjust based on RAM
)
df

Unnamed: 0_level_0,frame.time,frame.len,frame.protocols,eth.src,eth.dst,ip.dst,ip.src,ip.flags,ip.ttl,ip.proto,ip.checksum,ip.tos,tcp.srcport,tcp.dstport,tcp.flags,tcp.window_size_value,tcp.window_size_scalefactor,tcp.checksum,tcp.options,tcp.pdu.size,udp.srcport,udp.dstport,label
npartitions=27,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1
,string,float64,string,string,string,string,string,string,float64,float64,string,float64,float64,float64,string,float64,float64,string,string,float64,float64,float64,string
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [13]:
df.columns

Index(['frame.time', 'frame.len', 'frame.protocols', 'eth.src', 'eth.dst',
       'ip.dst', 'ip.src', 'ip.flags', 'ip.ttl', 'ip.proto', 'ip.checksum',
       'ip.tos', 'tcp.srcport', 'tcp.dstport', 'tcp.flags',
       'tcp.window_size_value', 'tcp.window_size_scalefactor', 'tcp.checksum',
       'tcp.options', 'tcp.pdu.size', 'udp.srcport', 'udp.dstport', 'label'],
      dtype='object')

In [14]:
df.dtypes

frame.time                     string[pyarrow]
frame.len                              float64
frame.protocols                string[pyarrow]
eth.src                        string[pyarrow]
eth.dst                        string[pyarrow]
ip.dst                         string[pyarrow]
ip.src                         string[pyarrow]
ip.flags                       string[pyarrow]
ip.ttl                                 float64
ip.proto                               float64
ip.checksum                    string[pyarrow]
ip.tos                                 float64
tcp.srcport                            float64
tcp.dstport                            float64
tcp.flags                      string[pyarrow]
tcp.window_size_value                  float64
tcp.window_size_scalefactor            float64
tcp.checksum                   string[pyarrow]
tcp.options                    string[pyarrow]
tcp.pdu.size                           float64
udp.srcport                            float64
udp.dstport  

In [15]:
attack_counts = df['label'].value_counts().compute()
print(attack_counts)

label
CoAP Amplification            274837
Merlin TCP Flooding           120000
Reporting                        450
Unknown                         7670
Telnet Brute Force            227649
Mirai GRE Flooding           5911401
UDP Scan                        4242
Mirai UDP Flooding           8897895
File Download                   7196
C&C Communication                528
TCP Scan                      737764
Merlin C&C Communication       29356
Ingress Tool Transfer          21587
Benign                      12256883
Merlin UDP Flooding            29996
Mirai C&C Communication         1074
Mirai TCP Flooding           6548173
Merlin ICMP Flooding           57580
Name: count, dtype: int64[pyarrow]


In [16]:
# Drop unnecessary columns
df = df.drop(columns=['ip.checksum', 'tcp.checksum'], errors='ignore')

# -------------------
# Time-based features
# -------------------
df['frame.time'] = dd.to_datetime(df['frame.time'], errors='coerce')
df['hour'] = df['frame.time'].dt.hour.fillna(-1).astype('int32')
df['minute'] = df['frame.time'].dt.minute.fillna(-1).astype('int32')
df['second'] = df['frame.time'].dt.second.fillna(-1).astype('int32')

# -------------------
# Protocol type indicators
# -------------------
df['is_tcp'] = df['tcp.srcport'].notnull().astype('int8')
df['is_udp'] = df['udp.srcport'].notnull().astype('int8')

# -------------------
# Handle nulls in numeric columns safely
# -------------------
null_fill_cols = ['ip.tos','tcp.srcport','tcp.dstport','tcp.options',
                  'tcp.pdu.size','udp.srcport','udp.dstport']
for col in null_fill_cols:
    if col in df.columns:
        df[col] = dd.to_numeric(df[col], errors='coerce').fillna(0).astype('float32')

# -------------------
# Packet size features
# -------------------
df['total_bytes'] = df['frame.len'].fillna(0)
df['tcp.window_size_value'] = df['tcp.window_size_value'].fillna(0)
df['tcp.window_size_scalefactor'] = df['tcp.window_size_scalefactor'].fillna(0)
df['src_dst_bytes_diff'] = df['tcp.window_size_value'] - df['tcp.window_size_scalefactor']

# -------------------
# TCP flags handling
# -------------------
if 'tcp.flags' in df.columns:
    df['tcp.flags_num'] = dd.to_numeric(df['tcp.flags'], errors='coerce').fillna(0).astype('int32')
    df['syn_flag'] = ((df['tcp.flags_num'] & 0x02) > 0).astype('int8')
    df['ack_flag'] = ((df['tcp.flags_num'] & 0x10) > 0).astype('int8')
    df = df.drop(columns=['tcp.flags_num'])

# -------------------
# Frequency encoding for IP/MAC addresses (Fixed approach)
# -------------------
ip_cols = ['ip.src', 'ip.dst', 'eth.src', 'eth.dst']
for col in ip_cols:
    if col in df.columns:
        # Create frequency mapping
        freq_map = df[col].value_counts().compute()
        # Apply mapping using map_partitions
        df[col + '_freq'] = df[col].map_partitions(
            lambda x: x.map(freq_map).fillna(0), 
            meta=('freq', 'f4')
        )

# -------------------
# Drop redundant columns (Fixed - removed frame.protocols from drop list since it's used later)
# -------------------
drop_cols = ['frame.time', 'tcp.flags']
df = df.drop(columns=[c for c in drop_cols if c in df.columns])

# -------------------
# Frequency encode categorical columns (Fixed approach)
# -------------------
cat_cols = ['frame.protocols', 'eth.src', 'eth.dst', 'ip.dst', 'ip.src', 'ip.flags', 'label']
for col in cat_cols:
     # Skip already processed
    req_map = df[col].value_counts().compute()
    df[col] = df[col].map_partitions(lambda x: x.map(freq_map).fillna(0),meta=(col, 'f4'))

# -------------------
# Fill any remaining nulls in numeric columns
# -------------------
num_cols = df.select_dtypes(include=['float32', 'float64', 'int32', 'int64']).columns
for col in num_cols:
    df[col] = df[col].fillna(0)

print("Feature engineering completed successfully!")
print(f"Final dataframe shape: {df.shape}")
print(f"Columns: {list(df.columns)}")

Feature engineering completed successfully!
Final dataframe shape: (<dask_expr.expr.Scalar: expr=(Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Drop(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Drop(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Drop(frame=ArrowStringConversion(frame=FromMapProjectable(b8a554e)), columns=['ip.checksum', 'tcp.checksum'], err

In [17]:
null_counts = df.isnull().sum().compute()
print(null_counts)

frame.len                      0
frame.protocols                0
eth.src                        0
eth.dst                        0
ip.dst                         0
ip.src                         0
ip.flags                       0
ip.ttl                         0
ip.proto                       0
ip.tos                         0
tcp.srcport                    0
tcp.dstport                    0
tcp.window_size_value          0
tcp.window_size_scalefactor    0
tcp.options                    0
tcp.pdu.size                   0
udp.srcport                    0
udp.dstport                    0
label                          0
hour                           0
minute                         0
second                         0
is_tcp                         0
is_udp                         0
total_bytes                    0
src_dst_bytes_diff             0
syn_flag                       0
ack_flag                       0
ip.src_freq                    0
ip.dst_freq                    0
eth.src_fr

In [18]:
df.dtypes

frame.len                      float64
frame.protocols                float32
eth.src                        float32
eth.dst                        float32
ip.dst                         float32
ip.src                         float32
ip.flags                       float32
ip.ttl                         float64
ip.proto                       float64
ip.tos                         float32
tcp.srcport                    float32
tcp.dstport                    float32
tcp.window_size_value          float64
tcp.window_size_scalefactor    float64
tcp.options                    float32
tcp.pdu.size                   float32
udp.srcport                    float32
udp.dstport                    float32
label                          float32
hour                             int32
minute                           int32
second                           int32
is_tcp                            int8
is_udp                            int8
total_bytes                    float64
src_dst_bytes_diff       

In [19]:
X = df.drop(columns=['label'])

# Target
y = df['label']

In [20]:
X = X.astype('float64')

In [21]:
#X_pd = X.compute()
#y_pd = y.compute()



# Compute mutual information
#mi_scores = mutual_info_classif(X_pd, y_pd, discrete_features='auto', random_state=42)

# Create a Series with scores
#mi_series = pd.Series(mi_scores, index=X_pd.columns)

# Select top 20 features
#top_15_features = mi_series.sort_values(ascending=False).head(15).index.tolist()

#print("Top 15 features based on Mutual Information:")
#print(top_15_features)

# Optional: reduce your Dask DataFrame to only top 20 features
#X = X[top_15_features]

In [22]:
scaler = StandardScaler()
X = scaler.fit_transform(X)


In [23]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)



In [24]:
tabnet_clf = TabNetClassifier(
    n_d=32,
    n_a=32,
    n_steps=5,
    gamma=1.5,
    n_independent=2,
    n_shared=2,
    optimizer_fn=torch.optim.Adam,
    optimizer_params=dict(lr=2e-2),
    mask_type='sparsemax'
)




In [25]:
# X_train_pd = X_train.compute()
# y_train_pd = y_train.compute()

# # Apply SMOTE
# smote = SMOTE(random_state=42)
# X_train, y_train = smote.fit_resample(X_train_pd, y_train_pd)

# # Optional: convert back to Dask if you want
# X_train = dd.from_pandas(X_train, npartitions=1)
# y_train = dd.from_pandas(y_train, npartitions=1)

In [26]:
for i in tqdm(range(X_train.npartitions), desc="Training on Dask partitions"):
    # Load one partition at a time into memory
    X_batch = X_train.get_partition(i).compute()
    y_batch = y_train.get_partition(i).compute()
    
    # Safety check: skip empty partitions
    if len(X_batch) == 0:
        continue

    # Optional downsample if still too big for memory
    if len(X_batch) > 50000:
        X_batch = X_batch.sample(n=50000, random_state=42)
        y_batch = y_batch.loc[X_batch.index]
    
    # Apply SMOTE for class balancing
    
    # Convert to numpy arrays for TabNet
    X_np = X_batch.compute() 
    y_np = y_batch.to_numpy()
    
    # Ensure y_np is 1D
    y_np = np.ravel(y_np)

    # Train model incrementally
    if i == 0:
        tabnet_clf.fit(
            X_train=X_np, y_train=y_np,
            max_epochs=10,
            patience=5,
            batch_size=1024,
            virtual_batch_size=128
        )
    else:
        tabnet_clf.fit(
            X_train=X_np, y_train=y_np,
            max_epochs=10,
            patience=5,
            batch_size=1024,
            virtual_batch_size=128,
            from_unsupervised=tabnet_clf  # continue training from previous weights
        )

NameError: name 'tqdm' is not defined

In [None]:
# classes_array = y_train.unique().compute().to_numpy()

# # Iterate over Dask partitions (memory-efficient)
# for X_part_delayed, y_part_delayed in zip(X_train.to_delayed(), y_train.to_delayed()):
#     X_part = X_part_delayed.compute().to_numpy()
#     y_part = y_part_delayed.compute().to_numpy()
    
#     tabnet_clf.partial_fit(
#         X_part, y_part,
#         classes=classes_array
#     )

In [None]:
# for X_part_delayed in X_test.to_delayed():
#     X_part = X_part_delayed.compute().to_numpy()
#     preds_list.append(tabnet_clf.predict(X_part))

# preds = np.concatenate(preds_list)

In [None]:
# y_true_list = []
# y_pred_list = []

# for X_part_delayed, y_part_delayed in zip(X_test.to_delayed(), y_test.to_delayed()):
#     X_part = X_part_delayed.compute().to_numpy()
#     y_part = y_part_delayed.compute().to_numpy()
    
#     preds_part = tabnet_clf.predict(X_part)
    
#     y_true_list.append(y_part)
#     y_pred_list.append(preds_part)

# # Combine all partitions
# y_true = np.concatenate(y_true_list)
# y_pred = np.concatenate(y_pred_list)

In [None]:
classes = np.unique(y_true)
y_true_bin = label_binarize(y_true, classes=classes)
y_pred_bin = label_binarize(y_pred, classes=classes)

plt.figure(figsize=(10, 8))

for i, cls in enumerate(classes):
    fpr, tpr, _ = roc_curve(y_true_bin[:, i], y_pred_bin[:, i])
    auc_score = roc_auc_score(y_true_bin[:, i], y_pred_bin[:, i])
    plt.plot(fpr, tpr, label=f"Class {cls} (AUC = {auc_score:.2f})")

plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Multi-class ROC Curve")
plt.legend()
plt.show()

In [None]:
cm = confusion_matrix(y_true, y_pred)
print("Confusion Matrix:\n", cm)

# Classification report
cr = classification_report(y_true, y_pred)
print("Classification Report:\n", cr)

# Hamming loss
hl = hamming_loss(y_true, y_pred)
print("Hamming Loss:", hl)

# F1 scores
macro_f1 = f1_score(y_true, y_pred, average='macro')
micro_f1 = f1_score(y_true, y_pred, average='micro')
weighted_f1 = f1_score(y_true, y_pred, average='weighted')

print(f"Macro F1: {macro_f1}, Micro F1: {micro_f1}, Weighted F1: {weighted_f1}")
