In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


## Importações, carregamento e rotulação dos dados

In [None]:
import numpy as np
import pandas as pd
from google.colab import drive
import os
import re
import matplotlib.pyplot as plt # Gráficos
import seaborn as sns # Gráficos
from tqdm.notebook import tqdm # Barras de Progresso

In [None]:
from sklearn.ensemble import IsolationForest

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

from sklearn.metrics import silhouette_score
from sklearn.metrics import roc_curve, roc_auc_score

from sklearn.metrics import confusion_matrix
from sklearn.decomposition import PCA

In [None]:
sns.set_theme() # Define o estilo dos gráficos para o tema padrão

In [None]:
X_train = pd.read_csv("/content/drive/My Drive/UFPE/UFPE 2024.1/PET/Imersão/Network Databases/normal_network.csv")

df_fuzzy = pd.read_csv("/content/drive/My Drive/UFPE/UFPE 2024.1/PET/Imersão/Network Databases/fuzzy_attack.csv")
df_spoofing = pd.read_csv("/content/drive/My Drive/UFPE/UFPE 2024.1/PET/Imersão/Network Databases/spoofing_attack.csv")
df_replay = pd.read_csv("/content/drive/My Drive/UFPE/UFPE 2024.1/PET/Imersão/Network Databases/replay_attack.csv")
df_ddos = pd.read_csv("/content/drive/My Drive/UFPE/UFPE 2024.1/PET/Imersão/Network Databases/DDoS_attack.csv")

## Tratamento dos dados

In [None]:
# Descartando registros com valores NaN/Null/NA
initial_len = X_train.shape[0]
X_train = X_train.dropna()
print(f'Tamanho inicial: {initial_len}, tamanho final {X_train.shape[0]} | Descartados {initial_len - X_train.shape[0]} registros com valores NA')

Tamanho inicial: 2713011, tamanho final 2713010 | Descartados 1 registros com valores NA


In [None]:
# Rotular os dados ## 0 - BENIGN | 1 - MALICIOUS

dfs = [df_fuzzy, df_spoofing, df_replay, df_ddos]
for df in dfs:
  df['Label'] = df['B1'].apply(lambda x: 1.0 if x == 1 else 0.0)

## Replay e DDoS - B1 fica com o valor 0
df_ddos['B1'] = 0
df_replay['B1'] = 0

## Spoofing e Fuzzy - Preciso preencher B1 com um valor de byte
mask = df_fuzzy['B1'] == 1
random_hex_values = [np.random.randint(256) for _ in range(mask.sum())]
df_fuzzy.loc[mask, 'B1'] = random_hex_values

mask = df_spoofing['B1'] == 1
random_hex_values = [np.random.randint(256) for _ in range(mask.sum())]
df_spoofing.loc[mask, 'B1'] = random_hex_values


In [None]:
# Tratando os valores do timestamp

dfs = [df_fuzzy, df_spoofing, df_replay, df_ddos, X_train]
for df in dfs:
  df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
  df['Timestamp'] = (df['Timestamp'] - df['Timestamp'].min()).dt.total_seconds()

In [None]:
# Passar os valores em hexadecimal para decimal

dfs = [df_fuzzy, df_spoofing, df_replay, df_ddos]
for df in dfs:
  df['Arbitration ID'] = df['Arbitration ID'].apply(lambda x: int(x, 16))

In [None]:
# Passar os valores inteiros para o tipo float

dfs = [df_fuzzy, df_spoofing, df_replay, df_ddos, X_train]
columns = ['Arbitration ID', 'DLC', 'B1', 'B2', 'B3', 'B4', 'B5', 'B6', 'B7', 'B8']

for df in dfs:
  for column in columns:
    df[column] = df[column].apply(lambda x: float(x))

## Divisão dos dados

In [None]:
# Separando os dados em teste e validação sequencialmente

val_size_fuzzy = int(len(df_fuzzy) * .65)
val_size_spoofing = int(len(df_spoofing) * .65)
val_size_replay = int(len(df_replay) * .65)
val_size_ddos = int(len(df_ddos) * .65)

X_val_fuzzy, X_test_fuzzy = df_fuzzy[:val_size_fuzzy].reset_index(drop=True), df_fuzzy[val_size_fuzzy:].reset_index(drop=True)
Y_val_fuzzy, Y_test_fuzzy = X_val_fuzzy['Label'], X_test_fuzzy['Label']
X_val_fuzzy.drop(columns=['Label'], inplace=True)
X_test_fuzzy.drop(columns=['Label'], inplace=True)

X_val_spoofing, X_test_spoofing = df_spoofing[:val_size_spoofing].reset_index(drop=True), df_spoofing[val_size_spoofing:].reset_index(drop=True)
Y_val_spoofing, Y_test_spoofing = X_val_spoofing['Label'], X_test_spoofing['Label']
X_val_spoofing.drop(columns=['Label'], inplace=True)
X_test_spoofing.drop(columns=['Label'], inplace=True)

X_val_replay, X_test_replay = df_replay[:val_size_replay].reset_index(drop=True), df_replay[val_size_replay:].reset_index(drop=True)
Y_val_replay, Y_test_replay = X_val_replay['Label'], X_test_replay['Label']
X_val_replay.drop(columns=['Label'], inplace=True)
X_test_replay.drop(columns=['Label'], inplace=True)

X_val_ddos, X_test_ddos = df_ddos[:val_size_ddos].reset_index(drop=True), df_ddos[val_size_ddos:].reset_index(drop=True)
Y_val_ddos, Y_test_ddos = X_val_ddos['Label'], X_test_ddos['Label']
X_val_ddos.drop(columns=['Label'], inplace=True)
X_test_ddos.drop(columns=['Label'], inplace=True)

## Normalização dos dados

In [None]:
std_scaler = StandardScaler()
std_scaler = std_scaler.fit(X_train)

norm_X_train = std_scaler.transform(X_train)
norm_X_val_fuzzy, norm_X_test_fuzzy = std_scaler.transform(X_val_fuzzy), std_scaler.transform(X_test_fuzzy)
norm_X_val_spoofing, norm_X_test_spoofing = std_scaler.transform(X_val_spoofing), std_scaler.transform(X_test_spoofing)
norm_X_val_replay, norm_X_test_replay = std_scaler.transform(X_val_replay), std_scaler.transform(X_test_replay)
norm_X_val_ddos, norm_X_test_ddos = std_scaler.transform(X_val_ddos), std_scaler.transform(X_test_ddos)

In [None]:
del X_train
del X_val_fuzzy, X_test_fuzzy
del X_val_spoofing, X_test_spoofing
del X_val_replay, X_test_replay
del X_val_ddos, X_test_ddos

## IF

In [None]:
RANDOM_SEED = 33
np.random.seed(RANDOM_SEED)

In [None]:
N_ESTIMATORS = 100

model = IsolationForest(n_estimators=N_ESTIMATORS, random_state=RANDOM_SEED)
model.fit(norm_X_train)

In [None]:
from pickle import dump
with open("IF.pkl", "wb") as f:
    dump(model, f, protocol=5)

import joblib
joblib.dump(std_scaler, 'scalerIF.pkl')

['scalerIF.pkl']