In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

In [2]:
from gensim.models.fasttext import FastText
import pickle

In [3]:
class MyLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(MyLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, input_seq):
        h0 = torch.zeros(1, input_seq.size(0), self.hidden_size).to(input_seq.device)
        c0 = torch.zeros(1, input_seq.size(0), self.hidden_size).to(input_seq.device)
        lstm_out, _ = self.lstm(input_seq, (h0, c0))
        output = self.fc(lstm_out[:, -1, :])
        output = self.softmax(output)
        return output


In [4]:
# Loading existing model
FastTextmodel = FastText.load("drive/MyDrive/PFE-Colab/FastText6/fasttext_model")

In [5]:
# Initialize the LSTM model
input_size = 100  # Assuming each token is a 100-dimensional vector
hidden_size = 64
output_size = 2  # Number of output classes
batch_size = 16
num_epochs = 1
model = MyLSTM(input_size, hidden_size, output_size)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [6]:
######### Loading tokens
with open("drive/MyDrive/tokens_malware.pkl", "rb") as f:
    tokenized_log = pickle.load(f)
# Generating dummy labels
labels = []
for i in range(len(tokenized_log)):
    labels.append(1)

In [7]:
batches = []
batch_tensors = []
for program in tokenized_log:
      # Get a list of tokens in a program
      sequence_list = []
      for line in program:
            for token in line:
                  vector = FastTextmodel.wv[token]
                  sequence_list.append(torch.tensor(vector, dtype=torch.float32))
      # Convert list of token tensors to a single sequence tensor
      sequence_tensor = torch.stack(sequence_list)
      # Add to batch for parallel training on multiple programs at the same time
      if len(batch_tensors) == batch_size:
            batches.append(batch_tensors)
            batch_tensors = []
      batch_tensors.append(sequence_tensor)
# Last batch may be incomplete due to not enough programs to evenly divide with batch_size
if len(batch_tensors) != 0:
      batches.append(batch_tensors)
      batch_tensors = []


# Empty the unused variables
tokenized_log = []
FastTextmodel = []
sequence_list = []
sequence_tensor = []
batch_tensors = []

In [8]:
# Prepare labels
labels_per_batches = [labels[i:i+batch_size] for i in range(0, len(labels), batch_size)]
label_tensors = [torch.tensor(batch, dtype=torch.long) for batch in labels_per_batches]

labels_per_batches = []

In [15]:
# Old custom version - better version below
# Train the LSTM model
# len(batches) = len(label_tensors)
for epoch in range(num_epochs):
    for i in range(len(batches)):
        batch_tensors = batches[i]
        batch_labels = label_tensors[i]
        padded_batch = nn.utils.rnn.pad_sequence(batch_tensors, batch_first=True)

        optimizer.zero_grad()
        outputs = model(padded_batch)
        loss = criterion(outputs, batch_labels)
        loss.backward()
        optimizer.step()

In [None]:
# ChatGPT-enhanced version
# Train the LSTM model
for epoch in range(num_epochs):
    epoch_loss = 0.0  # To accumulate the total loss for the epoch
    correct_predictions = 0
    total_predictions = 0

    for i in range(len(batches)):
        batch_tensors = batches[i]
        batch_labels = label_tensors[i]
        padded_batch = nn.utils.rnn.pad_sequence(batch_tensors, batch_first=True)

        # Forward pass
        optimizer.zero_grad()
        outputs = model(padded_batch)

        # Calculate loss
        loss = criterion(outputs, batch_labels)
        epoch_loss += loss.item()  # Accumulate the loss

        # Calculate accuracy
        _, predicted = torch.max(outputs, 1)
        correct_predictions += (predicted == batch_labels).sum().item()
        total_predictions += batch_labels.size(0)

        # Backward pass and update weights
        loss.backward()
        optimizer.step()

    # Calculate epoch-level metrics
    epoch_loss /= len(batches)  # Calculate average loss for the epoch
    accuracy = correct_predictions / total_predictions  # Calculate accuracy

    # Print metrics for the epoch
    print(f"Epoch [{epoch + 1}/{num_epochs}] - Loss: {epoch_loss:.4f} - Accuracy: {accuracy:.4f}")
