<a href="https://colab.research.google.com/github/msorrentino3/Tirocinio/blob/main/DistillbertMalware.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [27]:
import torch
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification
from torch.utils.data import TensorDataset, DataLoader


In [28]:
#caricamento del dataset
data = pd.read_csv("data.csv")
#creo nuova colonna nel dataset dove metto 1 se nella colonna 3 compare attack altrimenti metto 0
data["label"] = np.where(data["class3"] == "Attack", 1, 0)

#creo testo per addestare modello distillbert
#converto ogni colonna nel formato string e se nella cella della colonna del dataset risulta
#senza dato verr√† sostituito con stringa vuota
scr = data["Scr_IP"].astype(str).replace("nan", "")
des = data["Des_IP"].astype(str).replace("nan", "")
proto = data["Protocol"].astype(str).replace("nan", "")
serv = data["Service"].astype(str).replace("nan", "")
alert = data["OSSEC_alert"].astype(str).replace("nan", "")

#creo nel dataset colonna nella quale metto sottoforma di string il contenuto della riga
data["text"] = pd.Series(
    np.where(scr != "", "Da " + scr + " ", "") +
    np.where(des != "", "A " + des + " ", "") +
    np.where(proto != "", "Protocollo " + proto + " ", "") +
    np.where(serv != "", "Servizio " + serv + " ", "") +
    np.where((alert != "") & (alert != "0"), "Alert " + alert, "")
).str.strip()

#divido il set di dati per addestrare il modello (60% dei dati) e sia per testarlo(40%)
train_df, test_df = train_test_split(
    data[["text", "label"]],
    test_size=0.4,
    stratify=data["label"],
    random_state=42
)
data = pd.read_csv("data.csv")

In [29]:
#tokenizzo sia i dati di addestramento che quelli di test
tokenizer = DistilBertTokenizerFast.from_pretrained("distilbert-base-uncased")

train_enc = tokenizer(list(train_df["text"]), padding=True, truncation=True, max_length=128, return_tensors="pt")
test_enc  = tokenizer(list(test_df["text"]),  padding=True, truncation=True, max_length=128, return_tensors="pt")

# Aggiungo le etichette
train_enc['labels'] = torch.tensor(train_df["label"].values)
test_enc['labels']  = torch.tensor(test_df["label"].values)

train_dataset = TensorDataset(train_enc['input_ids'], train_enc['attention_mask'], train_enc['labels'])
test_dataset  = TensorDataset(test_enc['input_ids'],  test_enc['attention_mask'],  test_enc['labels'])

#creazione dataset e dataloder
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, pin_memory=True, num_workers=4)
test_loader  = DataLoader(test_dataset,  batch_size=32, shuffle=False, pin_memory=True, num_workers=4)



In [30]:
#seleziono se utilizzare la gpu se √® libera oppure presente nel dispositivo altrimenti utilizza la gpu
device = "cuda" if torch.cuda.is_available() else "cpu"

#scarico modello pre-addestrato di ditillbert ingrado di fare delle classificazioni
model = DistilBertForSequenceClassification.from_pretrained(
    "distilbert-base-uncased",
    num_labels=2 #utilizzo lables = 2 perch√® devo fare due tipi di classificazione (attacco e normale)
).to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5) #ottimizzo i parametri e passo la velocit√† con cui il modello impara


Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [31]:
#Addestro il modello
model.train()
epoche = 3 #scelto 3 perch√® √® cos√¨ ho prestazioni superiori al 90% anche
for ep in range(epoche): #il modello scansiona i dati per essere addestrato x il numero di epoche
    tot_loss = 0
    for input_ids, attention_mask, labels in train_loader: #prendo i dati per l'allenamento
        input_ids = input_ids.to(device, non_blocking=True) #sposto il testo tokenizzato sulla gpu per rendere pi√π veloce l'esecuzione
        attention_mask = attention_mask.to(device, non_blocking=True) #spsoto nella gpu sullo le parti di testo che non sono rimpieti con il padding
        labels = labels.to(device, non_blocking=True) #porto sulla gpu i valori per indicare se si tratta di un attacco o meno
        optimizer.zero_grad() #azzero i parametri cos√¨ nella prossima epoca i valori non si sommino
        loss = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels).loss #il modello legge il testo e cerca di capire se si tratta di un attacco o meno
        #infine calcola quanto ha sbagliato
        loss.backward() #calcola di quanto devono essere aggiornati i pesi dei parametri per la prossima epoca
        optimizer.step()#cambio i pesi dei parametri (il modello ha "imparato")

        tot_loss += loss.item() #calcolo la media dell'errore alla fine dell'epoca
    print(f"Epoca {ep+1}: Loss media = {tot_loss/len(train_loader):.4f}")


Epoca 1: Loss media = 0.6050
Epoca 2: Loss media = 0.2994
Epoca 3: Loss media = 0.1581


In [32]:
#valuto il modello
model.eval() #setto il modello in mdalit√† valutazione cos√¨ che non aggiorni pi√π i pesi dei parametri
all_preds = []
all_labels = []

with torch.no_grad():
    for input_ids, attention_mask, labels in test_loader:#prendo i dati per effettuare il test del modello
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        labels = labels.to(device)

        logits = model(input_ids=input_ids, attention_mask=attention_mask).logits #il modello effettua la previsione
        preds = logits.argmax(dim=1) #sceglie se si tratta di un attacco oppure no

        all_preds.extend(preds.cpu().numpy()) #salva le predizioni e le sposta sulla cpu in formato numpy
        all_labels.extend(labels.cpu().numpy())#salva i dati del batch e li sposta sulla cpu in formato numpy

# Calcolo metriche
accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)

print(f"\nAccuracy:  {accuracy*100:.2f}%")
print(f"Precision: {precision*100:.2f}%")
print(f"Recall:    {recall*100:.2f}%")
print(f"F1-score:  {f1*100:.2f}%")


Accuracy:  95.00%
Precision: 93.60%
Recall:    96.45%
F1-score:  95.00%


In [33]:
#test svolto per capire se il modello √® funzionante o meno
def predici(text):
    x = tokenizer(text, return_tensors="pt", truncation=True, max_length=128).to(device)
    with torch.no_grad():
        logits = model(**x).logits
        probs = torch.softmax(logits, dim=1)
        label = probs.argmax().item()
    return ("ATTACK" if label else "NORMAL"), float(probs[0][label])

esempi = [
    "Da: 10.0.1.5 | A: 131.236.3.92 | Protocollo: udp | Servizio: dns",
    "Da: 192.168.2.199 | A: 192.168.2.10 | Protocollo: tcp | Servizio: http | Alert: TRUE",
    "Da: 172.24.1.80 | A: 172.24.1.1 | Protocollo: udp | Servizio: dns"
]

for i, testo in enumerate(esempi, 1):
    pred, conf = predici(testo)
    print(f"\nüì° Esempio {i}:")
    print(f"Traffico: {testo}")
    print(f"Predizione: {pred} ({conf*100:.1f}% sicuro)")


üì° Esempio 1:
Traffico: Da: 10.0.1.5 | A: 131.236.3.92 | Protocollo: udp | Servizio: dns
Predizione: NORMAL (97.5% sicuro)

üì° Esempio 2:
Traffico: Da: 192.168.2.199 | A: 192.168.2.10 | Protocollo: tcp | Servizio: http | Alert: TRUE
Predizione: ATTACK (87.5% sicuro)

üì° Esempio 3:
Traffico: Da: 172.24.1.80 | A: 172.24.1.1 | Protocollo: udp | Servizio: dns
Predizione: NORMAL (97.4% sicuro)
