In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

## dataset

In [3]:
!pip install ucimlrepo



In [4]:
from ucimlrepo import fetch_ucirepo 

In [6]:
# fetch dataset 
phishing_websites = fetch_ucirepo(id=327)

X = phishing_websites.data.features 
y = phishing_websites.data.targets 

y.head()

Unnamed: 0,result
0,-1
1,-1
2,-1
3,-1
4,1


In [7]:
X.head()

Unnamed: 0,having_ip_address,url_length,shortining_service,having_at_symbol,double_slash_redirecting,prefix_suffix,having_sub_domain,sslfinal_state,domain_registration_length,favicon,...,rightclick,popupwindow,iframe,age_of_domain,dnsrecord,web_traffic,page_rank,google_index,links_pointing_to_page,statistical_report
0,-1,1,1,1,-1,-1,-1,-1,-1,1,...,1,1,1,-1,-1,-1,-1,1,1,-1
1,1,1,1,1,1,-1,0,1,-1,1,...,1,1,1,-1,-1,0,-1,1,1,1
2,1,0,1,1,1,-1,-1,-1,-1,1,...,1,1,1,1,-1,1,-1,1,0,-1
3,1,0,1,1,1,-1,-1,-1,1,1,...,1,1,1,-1,-1,1,-1,1,-1,1
4,1,0,-1,1,1,-1,1,1,-1,1,...,1,-1,1,-1,-1,0,-1,1,1,1


In [9]:
# Suddividi i dati in training e test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [11]:
print(X_train.dtypes)  # Controlla il tipo di dati delle feature
print(y_train.dtypes)

having_ip_address             int64
url_length                    int64
shortining_service            int64
having_at_symbol              int64
double_slash_redirecting      int64
prefix_suffix                 int64
having_sub_domain             int64
sslfinal_state                int64
domain_registration_length    int64
favicon                       int64
port                          int64
https_token                   int64
request_url                   int64
url_of_anchor                 int64
links_in_tags                 int64
sfh                           int64
submitting_to_email           int64
abnormal_url                  int64
redirect                      int64
on_mouseover                  int64
rightclick                    int64
popupwindow                   int64
iframe                        int64
age_of_domain                 int64
dnsrecord                     int64
web_traffic                   int64
page_rank                     int64
google_index                

In [55]:
# Converte i dati in tensori PyTorch
X_train_tensor = torch.tensor(X_train.values.astype(float), dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values.astype(float), dtype=torch.float32)
X_test_tensor = torch.tensor(X_test.values.astype(float), dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.values.astype(float), dtype=torch.float32)

In [45]:
X_train_tensor = X_train_tensor.unsqueeze(1)  # Per avere l'input a un canale per la nn[batch_size, 1, 30]
X_test_tensor = X_test_tensor.unsqueeze(1)

# Solo se necessario per il calcolo della loss # NON HO ANCORA CAPITO SE SERVE
y_train_tensor = y_train_tensor.unsqueeze(1)  # Diventa [batch_size, 1]
y_test_tensor = y_test_tensor.unsqueeze(1)    # Diventa [batch_size, 1]

https://medium.com/biased-algorithms/what-do-tensordataset-and-dataloader-do-0e1d74c433d3

In [56]:
# Creazione del dataset accoppiando feature (X) e target (y)
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

In [57]:
# Creazione dei DataLoader per gestire i batch durante il training e il test
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Neural Network for malware detection

A simple feedforward neural network with fully connected layers, suitable for binary classification (malware detection)

In [58]:
class MalwareDetector(nn.Module):
    def __init__(self):
        super(MalwareDetector, self).__init__()
        
        # Primo strato fully connected (input a 30 feature)
        self.fc1 = nn.Linear(30, 64)  # 30 features in input, 64 neuroni nel primo layer
        self.relu1 = nn.ReLU()  # Funzione di attivazione ReLU
        
        # Secondo strato fully connected
        self.fc2 = nn.Linear(64, 32)  # 64 neuroni in ingresso, 32 neuroni nel secondo layer
        self.relu2 = nn.ReLU()  # Funzione di attivazione ReLU
        
        # Terzo strato fully connected
        self.fc3 = nn.Linear(32, 16)  # 32 neuroni in ingresso, 16 neuroni nel terzo layer
        self.relu3 = nn.ReLU()  # Funzione di attivazione ReLU
        
        # Strato di output
        self.fc4 = nn.Linear(16, 1)  # 16 neuroni in ingresso, 1 neurone di output per la classificazione binaria
        self.sigmoid = nn.Sigmoid()  # Funzione di attivazione Sigmoid per output binario

    def forward(self, x):
        x = self.relu1(self.fc1(x))  # Passa attraverso il primo strato con ReLU
        x = self.relu2(self.fc2(x))  # Passa attraverso il secondo strato con ReLU
        x = self.relu3(self.fc3(x))  # Passa attraverso il terzo strato con ReLU
        x = self.fc4(x)              # Passa attraverso il layer finale
        x = self.sigmoid(x)          # Attivazione sigmoid per ottenere un output tra 0 e 1
        return x

In [59]:
model = MalwareDetector()

### Train Hyperparameters

In [60]:
# Hyperparameters
criterion = nn.BCELoss()  # Binary Cross Entropy Loss
learning_rate = 0.001
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
NUM_EPOCHS = 50

## Train model

In [63]:
# Example training loop
def train_model(model, criterion, optimizer, train_loader, num_epochs=20):
    model.train()  # Set the model to training mode
    
    for epoch in range(num_epochs):
        running_loss = 0.0
    
        for inputs, labels in train_loader:
            # Move data to GPU if available
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Zero the parameter gradients
            optimizer.zero_grad()

            # Modifica le etichette per fare in modo che siano 0 o 1
            labels = (labels == 1).float()  # Converti -1 in 0 e 1 rimane 1
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        # Print loss every epoch
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

In [64]:
train_model(model, criterion, optimizer, train_loader, num_epochs=NUM_EPOCHS)

Epoch [1/50], Loss: 0.3707
Epoch [2/50], Loss: 0.1827
Epoch [3/50], Loss: 0.1674
Epoch [4/50], Loss: 0.1584
Epoch [5/50], Loss: 0.1457
Epoch [6/50], Loss: 0.1376
Epoch [7/50], Loss: 0.1294
Epoch [8/50], Loss: 0.1196
Epoch [9/50], Loss: 0.1129
Epoch [10/50], Loss: 0.1061
Epoch [11/50], Loss: 0.1038
Epoch [12/50], Loss: 0.0991
Epoch [13/50], Loss: 0.0925
Epoch [14/50], Loss: 0.0916
Epoch [15/50], Loss: 0.0928
Epoch [16/50], Loss: 0.0865
Epoch [17/50], Loss: 0.0839
Epoch [18/50], Loss: 0.0783
Epoch [19/50], Loss: 0.0788
Epoch [20/50], Loss: 0.0738
Epoch [21/50], Loss: 0.0734
Epoch [22/50], Loss: 0.0720
Epoch [23/50], Loss: 0.0692
Epoch [24/50], Loss: 0.0716
Epoch [25/50], Loss: 0.0709
Epoch [26/50], Loss: 0.0665
Epoch [27/50], Loss: 0.0651
Epoch [28/50], Loss: 0.0608
Epoch [29/50], Loss: 0.0628
Epoch [30/50], Loss: 0.0616
Epoch [31/50], Loss: 0.0620
Epoch [32/50], Loss: 0.0575
Epoch [33/50], Loss: 0.0553
Epoch [34/50], Loss: 0.0572
Epoch [35/50], Loss: 0.0585
Epoch [36/50], Loss: 0.0570
E

## Evaluate model

In [None]:
def evaluate_model(model, test_loader):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            predicted = (outputs > 0.5).float()  # Convert probabilities to binary labels
            total += labels.size(0)
            correct += (predicted == labels.unsqueeze(1)).sum().item()
    
    print(f'Accuracy: {100 * correct / total:.2f}%')

# Assuming test_loader is your DataLoader for the test data
evaluate_model(model, test_loader)


# NOISE
noise_factor: Controls how much noise to add. A small value like 0.01 adds subtle noise, while a larger value like 0.1 adds more significant perturbations.

In [None]:
NOISE_FACTOR = 0.01

def add_noise_to_weights(model, noise_factor):
    """Adds random Gaussian noise to the model's weights.
    
    Args:
        model: PyTorch neural network model.
        noise_factor: The magnitude of the noise to be added to the weights.
    """
    with torch.no_grad():  # No need to track gradients
        for param in model.parameters():
            noise = torch.randn(param.size()) * noise_factor
            param.add_(noise)  # Add noise to the current parameters

    print(f"Added noise with factor {noise_factor} to model weights.")


## evaluate model with noise

In [None]:
def test_with_noise(model, test_loader, noise_factor):
    """Test the model after adding noise to the weights."""
    # Save the original weights
    original_state_dict = model.state_dict()

    # Add noise to the model
    add_noise_to_weights(model, noise_factor=noise_factor)

    # Evaluate the model with noisy weights
    print("Testing model with noisy weights...")
    evaluate_model(model, test_loader)

    # Restore the original weights after testing
    model.load_state_dict(original_state_dict)
    print("Restored original model weights.")


In [None]:
test_with_noise(model, test_loader, noise_factor=NOISE_FACTOR)

### another possibility of evaluating

In [None]:
def evaluate_with_poisoned_tracking(model, test_loader, poisoned_indices, noise_factor=0.01):
    """Evaluate the model after adding noise and track the effect on poisoned vs. clean samples."""
    model.eval()
    total = 0
    correct_poisoned = 0
    correct_clean = 0
    poisoned_samples = 0
    clean_samples = 0

    # Add noise to the model
    add_noise_to_weights(model, noise_factor=noise_factor)

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(test_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            predicted = (outputs > 0.5).float()

            # Track accuracy for poisoned vs. clean samples
            for idx, prediction in enumerate(predicted):
                total += 1
                is_poisoned = i * len(predicted) + idx in poisoned_indices
                
                if is_poisoned:
                    poisoned_samples += 1
                    correct_poisoned += (prediction == labels[idx]).item()
                else:
                    clean_samples += 1
                    correct_clean += (prediction == labels[idx]).item()

    # Calculate accuracy for poisoned and clean samples
    accuracy_poisoned = 100 * correct_poisoned / poisoned_samples if poisoned_samples > 0 else 0
    accuracy_clean = 100 * correct_clean / clean_samples if clean_samples > 0 else 0

    print(f"Accuracy on poisoned samples: {accuracy_poisoned:.2f}%")
    print(f"Accuracy on clean samples: {accuracy_clean:.2f}%")

    # Restore the original weights
    model.load_state_dict(original_state_dict)
