In [None]:
# Libary
import matplotlib.pyplot as plt
import numpy as np
import gdown
import pandas as pd
from sklearn.model_selection import cross_val_score, KFold
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
import warnings
import zipfile

# Ignore all warnings
warnings.filterwarnings("ignore")

#!pip install scikit-learn-intelex -q --progress-bar off
from sklearnex import patch_sklearn
patch_sklearn()

Intel(R) Extension for Scikit-learn* enabled (https://github.com/intel/scikit-learn-intelex)




---
Dataset preparation


In [None]:
## Download dataset

url = 'https://drive.google.com/uc?id=1iUC1Pv-1JfYWUKMGZQ2xSOk0nYAM2f6i'

# Path where you want to save the downloaded file
output = 'UNSW_NB15.zip'

# Download the file
gdown.download(url, output, quiet=False)

Downloading...
From (original): https://drive.google.com/uc?id=1iUC1Pv-1JfYWUKMGZQ2xSOk0nYAM2f6i
From (redirected): https://drive.google.com/uc?id=1iUC1Pv-1JfYWUKMGZQ2xSOk0nYAM2f6i&confirm=t&uuid=685347ce-041d-4ffd-ae31-b37d988107a0
To: /content/UNSW_NB15.zip
100%|██████████| 156M/156M [00:02<00:00, 52.5MB/s]


'UNSW_NB15.zip'

In [None]:
# Unzip dataset
with zipfile.ZipFile('UNSW_NB15.zip', 'r') as zip_ref:
  zip_ref.extractall()

In [None]:
## Load dataset for learning
# Encode problem
import chardet
with open('NUSW-NB15_features.csv', 'rb') as f:
    encoding = chardet.detect(f.read())['encoding']

# Get feature
cols = list(pd.read_csv('NUSW-NB15_features.csv', encoding=encoding)['Name'])

file_paths = ['UNSW-NB15_1.csv','UNSW-NB15_2.csv','UNSW-NB15_3.csv','UNSW-NB15_4.csv']

# Read each CSV file into separate DataFrames
dfs = [pd.read_csv(file, names=cols) for file in file_paths]

# Concatenate the DataFrames into a single DataFrame
data = pd.concat(dfs, ignore_index=True)
data['attack_cat'] = data['attack_cat'].str.strip().str.replace('Backdoors', 'Backdoor')
data['attack_cat'] = data['attack_cat'].fillna(value='Normal')
data = data.drop_duplicates(ignore_index=True)
data = data.fillna(0)

In [None]:
def balance_class(df, cls_col, cls, cls_size):
    resampled_dfs = [df[df[cls_col] != cls]]
    cls_df = df[df[cls_col] == cls]
    current_class_size = len(cls_df)

    if current_class_size > cls_size:
        # Undersample: Reduce the number of samples
        cls_df_resampled = cls_df.sample(cls_size, random_state=42)
    elif current_class_size < cls_size:
        # Oversample: Increase the number of samples
        cls_df_resampled = cls_df.sample(cls_size, replace=True, random_state=42)
    else:
        cls_df_resampled = cls_df

    resampled_dfs.append(cls_df_resampled)

    return pd.concat(resampled_dfs).reset_index(drop=True)

In [None]:
data = balance_class(data, 'attack_cat', 'Analysis', 677)
data = balance_class(data, 'attack_cat', 'Backdoor', 577)
data = balance_class(data, 'attack_cat', 'DoS', 4089)
data = balance_class(data, 'attack_cat', 'Exploits', 7061)
data = balance_class(data, 'attack_cat', 'Fuzzers', 12062)
data = balance_class(data, 'attack_cat', 'Generic', 5016)
data = balance_class(data, 'attack_cat', 'Normal', 31395)
data = balance_class(data, 'attack_cat', 'Reconnaissance', 1695)
data = balance_class(data, 'attack_cat', 'Shellcode', 378)
data = balance_class(data, 'attack_cat', 'Worms', 44)

In [None]:
# Number of features
print('Number of features: ',data.shape[1])

Number of features:  49


In [None]:
# Record per class
print('Record per class:\n',data.groupby('attack_cat').size())
print('\nSum:\t\t',data['attack_cat'].size)

Record per class:
 attack_cat
Analysis            677
Backdoor            577
DoS                4089
Exploits           7061
Fuzzers           12062
Generic            5016
Normal            31395
Reconnaissance     1695
Shellcode           378
Worms                44
dtype: int64

Sum:		 62994





Dataset preparation


---






---

Data preprocessing

In [None]:
# Encode categorical variables into numeric values
labels = data['attack_cat'].unique()
label_encoders = {}
for column in data.select_dtypes(include=['object']).columns:
    label_encoders[column] = LabelEncoder()
    data[column] = data[column].astype(str)
    data[column] = label_encoders[column].fit_transform(data[column])

In [None]:
X = data.drop(['attack_cat', 'Label'],axis=1)
y = data['attack_cat']
y = label_encoders['attack_cat'].inverse_transform(y)

In [None]:
print('Number of train features: ',X.shape[1])

Number of train features:  47


In [None]:
## Scaling and standardlize
scaler = MinMaxScaler()
X = pd.DataFrame(scaler.fit_transform(X),columns=X.columns)


Data preprocessing


---





---

Feature selecttion


In [None]:
def initialize_population(size, num_features, chromo_len):
    population = []
    for _ in range(size):
        chromosome = np.random.choice(num_features, size=chromo_len, replace=False)
        population.append(chromosome)
    return np.array(population)

In [None]:
from sklearn.model_selection import KFold, cross_val_score
from sklearn.linear_model import LogisticRegression
def compute_fitness(population, X, y):
    fitness_values = []
    kf=KFold(shuffle=True)
    for chromosome in population:
        # Accuracy
        Xfs = X.iloc[:,chromosome]

        accuracy = cross_val_score(LogisticRegression(), X, y, cv=KFold(), scoring='accuracy').mean()

        # Correlation matrix
        corr_matrix = np.corrcoef(Xfs, rowvar=False)
        corr_matrix = pd.DataFrame(corr_matrix).fillna(0).to_numpy()

        # Correlation transform
        corr_t_avg = 1 - abs((np.sum(corr_matrix) - np.trace(corr_matrix)) / (corr_matrix.size - corr_matrix.shape[0]))

        fitness_values.append((accuracy + corr_t_avg) / 2)

    return fitness_values

In [None]:
def parents_selection(population, fitness_values):
    total_fitness = np.sum(fitness_values)
    # Calculate selection probabilities for each chromosome
    selection_probabilities = fitness_values / total_fitness
    # Perform roulette wheel selection
    selected_indices = np.random.choice(np.arange(len(population)), size=len(population), p=selection_probabilities)
    # Select the chromosomes based on the selected indices
    selected_parents = population[selected_indices]
    return selected_parents

In [None]:
def crossover(parents, crossover_rate=0.5):
    num_parents = parents.shape[0]
    num_genes = parents.shape[1]
    offspring = np.empty_like(parents)
    cross_g = int(num_genes * crossover_rate)
    for i in range(num_parents):
        # Select two parents for crossover
        parent1 = parents[i]
        parent2 = parents[(i + 1) % num_parents]
        # Create a child chromosome
        child = np.empty(num_genes)
        # Copy the second half of parent1 to first half of child
        child[:cross_g] = parent1[cross_g:]
        # Copy (num_genes - cross_g) genes from parent2 that different from first half of child to the second half
        child[cross_g:] = [e for e in parent2 if e not in child[:cross_g]][:(num_genes - cross_g)]
        # Add the child to the offspring
        offspring[i] = child
    return offspring

In [None]:
def mutation(parents, num_features, mutation_rate=0.5):
    num_parents = parents.shape[0]
    num_genes = parents.shape[1]
    offspring = np.empty_like(parents)
    mutate_g = int(num_genes * mutation_rate)
    for i in range(num_parents):
        child = np.empty(num_genes)

        # Half first
        s = 0
        a = []
        for j in range(num_genes):
            s+=mutation_rate
            if int(s)==1:
                a.append(parents[i][j])
                s-=1
        child[:mutate_g] = a

        # Half second
        child[mutate_g:] =  np.random.choice(np.setdiff1d(np.arange(num_features), child),
                                             size=num_genes-mutate_g, replace=False)

        # Add the mutated child to the offspring
        offspring[i] = child
    return offspring

In [None]:
def generations(X, y, pop_size, chromo_len,
                mutation_rate=0.5, crossover_rate=0.5):
    population = initialize_population(size=pop_size, num_features=X.shape[1], chromo_len=chromo_len)
    fitness_values = compute_fitness(population=population, X=X, y=y)

    mf_g = np.max(fitness_values)
    mf = mf_g
    best_chromo_g = population[np.argmax(fitness_values)]
    gen = 0
    best_chromo = best_chromo_g

    print("Max fitness of generation", gen, ": ", mf_g)
    mf_g = -1

    while True:
      gen += 1

      selected_parents = parents_selection(population, fitness_values)
      population = crossover(selected_parents, crossover_rate=crossover_rate)
      fitness_values = compute_fitness(population=population, X=X, y=y)
      if np.max(fitness_values) > mf_g:
        mf_g = np.max(fitness_values)
        best_chromo_g = population[np.argmax(fitness_values)]

      selected_parents = parents_selection(population, fitness_values)
      population = mutation(selected_parents, num_features=X.shape[1], mutation_rate=mutation_rate)
      fitness_values = compute_fitness(population=population, X=X, y=y)
      if np.max(fitness_values) > mf_g:
        mf_g = np.max(fitness_values)
        best_chromo_g = population[np.argmax(fitness_values)]

      print("Max fitness of generation", gen, ": ", mf_g)
      if mf_g > mf:
        mf = mf_g
        best_chromo = best_chromo_g
        mf_g = -1
      else:
        break

    return best_chromo, mf

In [None]:
best_chromo, max_fitness = generations(X=X, y=y, pop_size=100, chromo_len=10,
                                       crossover_rate = 0.5, mutation_rate = 0.5)
print("Best feature: ", list(X.columns[best_chromo]))
print("Max fitness: ", max_fitness)

Max fitness of generation 0 :  0.7737636077004644
Max fitness of generation 1 :  0.7742008979204442
Max fitness of generation 2 :  0.7741007003088096
Best feature:  ['Dintpkt', 'dmeansz', 'dttl', 'ct_dst_ltm', 'srcip', 'ct_state_ttl', 'synack', 'dbytes', 'dur', 'ct_dst_sport_ltm']
Max fitness:  0.7742008979204442


Feature Selection


---



In [None]:
# Save best chromo
np.save('UNSW_best_chromo.npy', best_chromo)

In [None]:
while True: pass