### Imports

In [None]:
import glob
import json
import pandas as pd
import numpy as np
import os

## Define the objective

1. Mnemonics groups
2. Strings oer BERT
3. Section entropy
4. Functions
5. Generic information
6. All per BERT

In [None]:
# For mnemonics

data = "/home/sergio/Documents/TFM/repos/BinaryIntelligence/Models/DATA/groups.csv"
output_history = "data-vault/scores_mnemonics.json"
model_pickle = "data-vault/model_mnemonics.pkl"
all_models_pickle = "data-vault/all_models_mnemonics.pkl"
target = "groups.feature"
output_image = os.path.abspath("data-vault/mnemonics-results.png")

### Prepare data to csv

In [None]:
location = "/home/sergio/Documents/TFM/data/dataset"
csv = []
malware = 0
for file in glob.glob(f"**/{target}", recursive=True, root_dir=location):
    header = []
    with open(f"{location}/{file}", "r") as f:
        current_data = json.loads(f.read())
    if "malware" in file:
        malware = 1
    header = [",".join(list(current_data.keys())) + ",Malware"]
    row = [",".join([str(x) for x in list(current_data.values())]) + f",{malware}"]
    csv.append(row)
csv.insert(0, header)

output_file = open(data, "w")
for element in csv:
    if len(element[0].split(",")) != 15:
        continue
    output_file.write(f"{element[0]}\n")

output_file.close()

### Data to DF

In [None]:
df = pd.read_csv(data)
df = df.sample(frac=1)
df.head()

In [None]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
from torch import nn, optim
from tqdm import tqdm

In [None]:
class Data(Dataset):
    def __init__(self, X_train, y_train):
        self.X = torch.from_numpy(X_train.astype(np.float32))
        self.y = torch.from_numpy(y_train).type(torch.LongTensor)
        self.len = self.X.shape[0]
    
    def __getitem__(self, index):
        return self.X[index], self.y[index]
    def __len__(self):
        return self.len

In [None]:
def get_data_loaders(X, y, batch_size=4, val_size=0.1, test_size=0.2):
    # Split data into train+val and test sets
    X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=test_size)

    # Split train+val data into separate train and val sets
    X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=val_size/(1 - test_size))
    
    # Wrap the data into our custom datasets
    traindata = Data(X_train, y_train)
    valdata = Data(X_val, y_val)
    testdata = Data(X_test, y_test)
    
    # Create DataLoaders
    trainloader = DataLoader(traindata, batch_size=batch_size, shuffle=True, num_workers=2)
    valloader = DataLoader(valdata, batch_size=batch_size, shuffle=False, num_workers=2)
    testloader = DataLoader(testdata, batch_size=batch_size, shuffle=False, num_workers=2)

    return trainloader, valloader, testloader

In [None]:
def train_model(model, trainloader, valloader, optimizer, criterion, epochs=10):
    for epoch in range(epochs):
        running_loss = 0.0

        # Progress bar for training data
        pbar = tqdm(enumerate(trainloader), total=len(trainloader), desc="Training", leave=True)
        for i, data in pbar:
            inputs, labels = data
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            
            # Update the progress bar.
            pbar.set_postfix({'Running Loss': running_loss / (i+1)})

        # Calculate accuracy after each epoch on the validation set
        correct, total = 0, 0
        with torch.no_grad():
            # Progress bar for validation data
            val_pbar = tqdm(valloader, desc="Validating", leave=True)
            for data in val_pbar:
                inputs, labels = data
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

                # Calculate accuracy and update the progress bar.
                accuracy = 100 * correct / total
                val_pbar.set_postfix({'Validation Accuracy': f'{accuracy:.2f}%'})
        
        # Print stats at the end of each epoch.
        print(f'Epoch: {epoch + 1}, Loss: {running_loss / (i+1):.5f}, Validation Accuracy: {accuracy:.2f}%')

In [None]:
def evaluate_model(model, testloader):
    correct, total = 0, 0
    all_labels = []
    all_predictions = []

    with torch.no_grad():
        for data in testloader:
            inputs, labels = data
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            # Accumulate all true labels and predictions for later metrics computation
            all_labels.extend(labels.tolist())
            all_predictions.extend(predicted.tolist())

    accuracy = 100 * correct / total
    print(f'Test Accuracy: {accuracy:.2f}%')

    # Now we compute precision, recall, f1_score, and confusion matrix with the accumulated labels and predictions
    precision = precision_score(all_labels, all_predictions, average='macro')
    recall = recall_score(all_labels, all_predictions, average='macro')
    f1 = f1_score(all_labels, all_predictions, average='macro')
    confusion = confusion_matrix(all_labels, all_predictions)

    print(f'Precision: {precision:.2f}')
    print(f'Recall: {recall:.2f}')
    print(f'F1 score: {f1:.2f}')
    print('Confusion Matrix:')
    print(confusion)

In [None]:
# Divide in characteristics and tags
X = df.drop('Malware', axis=1).values
y = df['Malware'].values

class Network(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout_prob=0.5):
        super(Network, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout_prob)
        )
        self.layer2 = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout_prob)
        )
        self.out = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = torch.sigmoid(self.out(x))
        return x

input_dim = len(X_train[0])
hidden_dim = 50
dropout_prob = 0.5
output_dim = 1
clf = Network(input_dim, hidden_dim, output_dim, dropout_prob)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(clf.parameters(), lr=0.001)

In [None]:
trainloader, valloader, testloader = get_data_loaders(X, y, batch_size=4)
train_model(clf, trainloader, valloader, optimizer, criterion, epochs=5)
evaluate_model(clf, testloader)