In [1]:
import dask.dataframe as dd
import pandas as pd
from dask_ml.preprocessing import StandardScaler
from dask_ml.model_selection import train_test_split

In [2]:
pd.set_option("display.max_columns", None)

In [3]:
df = dd.read_parquet("dataset/")

In [4]:
df.head()

Unnamed: 0,IPV4_SRC_ADDR,L4_SRC_PORT,IPV4_DST_ADDR,L4_DST_PORT,PROTOCOL,L7_PROTO,IN_BYTES,IN_PKTS,OUT_BYTES,OUT_PKTS,TCP_FLAGS,CLIENT_TCP_FLAGS,SERVER_TCP_FLAGS,FLOW_DURATION_MILLISECONDS,DURATION_IN,DURATION_OUT,MIN_TTL,MAX_TTL,LONGEST_FLOW_PKT,SHORTEST_FLOW_PKT,MIN_IP_PKT_LEN,MAX_IP_PKT_LEN,SRC_TO_DST_SECOND_BYTES,DST_TO_SRC_SECOND_BYTES,RETRANSMITTED_IN_BYTES,RETRANSMITTED_IN_PKTS,RETRANSMITTED_OUT_BYTES,RETRANSMITTED_OUT_PKTS,SRC_TO_DST_AVG_THROUGHPUT,DST_TO_SRC_AVG_THROUGHPUT,NUM_PKTS_UP_TO_128_BYTES,NUM_PKTS_128_TO_256_BYTES,NUM_PKTS_256_TO_512_BYTES,NUM_PKTS_512_TO_1024_BYTES,NUM_PKTS_1024_TO_1514_BYTES,TCP_WIN_MAX_IN,TCP_WIN_MAX_OUT,ICMP_TYPE,ICMP_IPV4_TYPE,DNS_QUERY_ID,DNS_QUERY_TYPE,DNS_TTL_ANSWER,FTP_COMMAND_RET_CODE,Label,Attack
0,192.168.1.193,49235,192.168.1.33,4444,6,0.0,155392,202,34552,149,24,24,24,4294952,15,15,128,128,1500,40,40,1500,155392.0,34552.0,0,0,0,0,77696000,17272000,56,150,36,14,95,45555,4805,0,0,0,0,0,0,1,ransomware
1,192.168.1.193,49228,192.168.1.152,1880,6,0.0,1600,40,35741,65,24,16,24,4294952,15,15,128,128,1286,40,40,1286,1600.0,35741.0,0,0,0,0,800000,17864000,47,3,30,19,6,16425,237,0,0,0,0,0,0,0,Benign
2,192.168.1.152,0,192.168.1.193,0,1,0.0,212,2,0,0,0,0,0,0,0,0,64,64,106,106,0,106,212.0,0.0,0,0,0,0,1696000,0,2,0,0,0,0,0,0,771,3,0,0,0,0,0,Benign
3,192.168.1.169,65317,239.255.255.250,1900,17,0.0,165,1,0,0,0,0,0,0,0,0,0,0,165,165,0,165,165.0,0.0,0,0,0,0,1320000,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,Benign
4,192.168.1.79,60766,192.168.1.255,15600,17,0.0,63,1,0,0,0,0,0,0,0,0,0,0,63,63,0,63,63.0,0.0,0,0,0,0,504000,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,Benign


In [5]:
df = df.drop(["FTP_COMMAND_RET_CODE"], axis=1)

In [6]:
df = df.dropna()

In [7]:
df = df.drop_duplicates()

In [8]:
df[['SRC_ADDR_OCTET_1', 'SRC_ADDR_OCTET_2', 'SRC_ADDR_OCTET_3', 'SRC_ADDR_OCTET_4']] = df['IPV4_SRC_ADDR'].str.split('.', expand=True, n=4).astype(int)
df[['DST_ADDR_OCTET_1', 'DST_ADDR_OCTET_2', 'DST_ADDR_OCTET_3', 'DST_ADDR_OCTET_4']] = df['IPV4_DST_ADDR'].str.split('.', expand=True, n=4).astype(int)

df = df.drop(columns=['IPV4_SRC_ADDR', 'IPV4_DST_ADDR'])

In [9]:
def drop_outliers_iqr(df, column):
    if column in df.columns and df[column].dtype.kind in 'biufc': 
        q1 = df[column].quantile(0.25).compute()
        q3 = df[column].quantile(0.75).compute()

        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr

        df = df[(df[column] >= lower_bound) & (df[column] <= upper_bound)]

    return df

In [10]:
df = drop_outliers_iqr(df, "SRC_TO_DST_SECOND_BYTES")

In [11]:
df = drop_outliers_iqr(df, "DST_TO_SRC_SECOND_BYTES")

In [12]:
attack_mapping = {
    "Benign": 0,
    "scanning": 1,
    "xss": 2,
    "ddos": 3,
    "dos": 4,
    "injection": 5,
    "password": 6,
    "mitm": 7,
    "ransomware": 8, 
    "backdoor": 9,
}

df["Attack"] = df["Attack"].map(attack_mapping, meta=('Attack', 'int32'))

In [13]:
X = df.drop(columns=['Label', 'Attack'])
y = df[['Label', 'Attack']]

In [14]:
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

In [15]:
X_train_valid, X_test, y_train_valid, y_test = train_test_split(
    X_scaled, y, test_size=0.1, random_state=42
)

X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_valid, y_train_valid, test_size=0.125, random_state=42
)

In [16]:
y_train_label = y_train['Label']
y_train_attack = y_train['Attack']

y_valid_label = y_valid['Label']
y_valid_attack = y_valid['Attack']

y_test_label = y_test['Label']
y_test_attack = y_test['Attack']

In [17]:
X_train.to_parquet("dask_output/X_train/")
print("X_train saved")
X_valid.to_parquet("dask_output/X_valid/")
print("X_valid saved")
X_test.to_parquet("dask_output/X_test/")
print("X_test saved")

X_train saved
X_valid saved
X_test saved


In [18]:
y_train_label = y_train_label.to_frame()
y_train_label.to_parquet("dask_output/y_train_label/")
print("y_train_label saved")

y_train_attack = y_train_attack.to_frame()
y_train_attack.to_parquet("dask_output/y_train_attack/")
print("y_train_attack saved")

y_valid_label = y_valid_label.to_frame()
y_valid_label.to_parquet("dask_output/y_valid_label/")
print("y_valid_label saved")

y_valid_attack = y_valid_attack.to_frame()
y_valid_attack.to_parquet("dask_output/y_valid_attack/")
print("y_valid_attack saved")

y_test_label = y_test_label.to_frame()
y_test_label.to_parquet("dask_output/y_test_label/")
print("y_test_label saved")

y_test_attack = y_test_attack.to_frame()
y_test_attack.to_parquet("dask_output/y_test_attack/")
print("y_test_attack saved")

y_train_label saved
y_train_attack saved
y_valid_label saved
y_valid_attack saved
y_test_label saved
y_test_attack saved
