In [None]:
import torch
import torch.nn as nn
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader, TensorDataset


### I am using Apple Silicon, Metal Performance Shaders is used

In [None]:

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
# use cuda if available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Load traces

In [None]:
traces_file = 'traces.csv'
nrows = 1000 # number of traces to use (debug purposes)
batch_size = 50
print('number of traces: ', len(pd.read_csv(traces_file)))
print('number of traces used: ', nrows)

traces = pd.read_csv(traces_file, nrows=nrows)
# Split into train and test
train_data, test_data = train_test_split(traces, test_size=0.25)

# get number of unique values in each column
print('number of unique values to predict: ', traces.nunique()[-1])

### Create vocabulary

In [None]:
def load_data(data, batch_size):
    label_encoder_pc = LabelEncoder()
    label_encoder_delta_in = LabelEncoder()
    label_encoder_delta_out = LabelEncoder()

    # Fit label encoder and transform labels into encoded values
    data['pc_encoded'] = label_encoder_pc.fit_transform(data['pc'])
    data['delta_in_encoded'] = label_encoder_delta_in.fit_transform(data['delta_in'])
    data['delta_out_encoded'] = label_encoder_delta_out.fit_transform(data['delta_out'])

    # Convert dataframes to tensors
    pc = torch.tensor(data['pc_encoded'].values)
    delta_in = torch.tensor(data['delta_in_encoded'].values)
    targets = torch.tensor(data['delta_out_encoded'].values)

    # Create a custom Dataset instance
    dataset = TensorDataset(pc, delta_in, targets)

    # Create a DataLoader instance
    data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

    # Get unique target keys
    target_keys = set(data['delta_out_encoded'].unique())

    return data_loader, len(label_encoder_pc.classes_), len(label_encoder_delta_in.classes_), len(label_encoder_delta_out.classes_), target_keys

train_iter, num_pc, num_delta_in, num_output_next, target_keys = load_data(train_data, batch_size=batch_size)
test_iter, _, _, _, _ = load_data(test_data, batch_size=batch_size)
# add 1 for the next delta which is not in the training set
num_pc += 1
num_delta_in += 1
num_output_next += 1

print('number of unique pc: ', num_pc)
print('number of unique input delta: ', num_delta_in)
print('number of unique output delta: ', num_output_next)


### Define LSTM model

In [None]:
class EmbeddingLSTM(nn.Module):
    def __init__(self, num_pc, num_delta_in, num_output_next, embed_dim, hidden_dim, topPredNum, num_layers, dropout):
        # Layer structure is described in the paper
        super(EmbeddingLSTM, self).__init__()
        self.topPredNum = topPredNum
        # Define embedding layers
        self.pc_embed_layer = nn.Embedding(num_pc, embed_dim)
        self.delta_embed_layer = nn.Embedding(num_delta_in, embed_dim)
        # Define LSTM layer
        # LSTM input dimension: (pc_embed + delta_embed) * 2
        lstm_input_dim = embed_dim * 2
        self.lstm = nn.LSTM(lstm_input_dim, hidden_dim, num_layers, dropout=dropout)
        # Define output layer
        self.fc = nn.Linear(hidden_dim, num_output_next)
        # Define dropout layer
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, h_c_state, target=None):
        pc, delta = x
        # pc goes through pc embedding layer
        # delta goes through delta embedding layer
        pc_embed_layer = self.pc_embed_layer(pc)
        delta_embed_layer = self.delta_embed_layer(delta)
        # Concatenate pc and delta embedding layers
        # The concatenated layer is the input to the LSTM layer
        pc_delta_embed_out = torch.cat([pc_embed_layer, delta_embed_layer], dim = -1)
        lstm_out, state = self.lstm(pc_delta_embed_out, h_c_state)
        # lstm_out shape: (batch_size, seq_len, hidden_dim)
        outputs = self.dropout(self.fc(lstm_out))
        ####### Embedding LSTM layers constructed #######

        ####### Get top k predictions #######
        delta_probabilities = nn.functional.log_softmax(outputs, dim = -1).squeeze(dim = 1)
        # Get top k predictions
        _, preds = torch.topk(delta_probabilities, self.topPredNum, sorted=False)

        if target is not None:
            loss = nn.functional.cross_entropy(delta_probabilities, target) 
        else:
            loss = None
        
        return preds, state, loss

    def predict(self, X, lstm_state):
        with torch.no_grad():
            preds, state, _ = self.forward(X, lstm_state)
            return preds, state



### Tune hyperparameters here

In [None]:
# Set up hyperparameters
hparams = {
    "topPredNum": 5,
    "embed_dim": 256,
    "hidden_dim": 256,
    "num_layers": 2,
    "dropout": 0.1,
    "learning_rate": 0.001,
    "epochs": 1
}

### Train the model

In [None]:
# Initialize the model with hyperparameters
my_model = EmbeddingLSTM(
    num_pc,
    num_delta_in,
    num_output_next,
    hparams["embed_dim"],
    hparams["hidden_dim"],
    topPredNum=hparams["topPredNum"],
    num_layers=hparams["num_layers"],
    dropout=hparams["dropout"]
)

# Prepare for training
train_loss = []
my_model = my_model.to(device)
optimizer = torch.optim.Adam(my_model.parameters(), lr=hparams["learning_rate"])

# Start training
for epoch in range(hparams["epochs"]):
    # Switch to training mode
    my_model.train()
    lstm_state = None
    for idx, batch in enumerate(train_iter):
        batch = [ds.to(device) for ds in batch]
        inputs = batch[:-1]
        targets = batch[-1]
        _, lstm_state, batch_loss = my_model(inputs, lstm_state, targets)
        optimizer.zero_grad()
        batch_loss.backward()
        optimizer.step()
        train_loss.append(float(batch_loss.detach()))
        print(f"Epoch {epoch + 1}, Iteration {idx + 1}, Loss: {train_loss[-1]:.8f}")
        # Remove state gradients to prevent autograd errors
        lstm_state = tuple([s.detach() for s in lstm_state])


### Validate the model

In [None]:
def validate_model(network, data_iterator, relevant_keys, computing_device="cpu", initial_state=None):
    network.eval()

    accuracy_metrics = [process_batch(i, batch_data, network, computing_device, initial_state, relevant_keys) 
                        for i, batch_data in enumerate(data_iterator)]

    average_accuracy = torch.tensor(accuracy_metrics).mean()
    print("Average Validation Accuracy: {:.4f}".format(average_accuracy))

def process_batch(batch_index, batch_data, network, device, state, keys):
    print(f"Processing batch {batch_index}")

    batch_data = [item.to(device) for item in batch_data]
    input_data = batch_data[:-1]
    labels = batch_data[-1]

    predictions, state = network.predict(input_data, state)

    accuracy = compute_accuracy(predictions, labels, keys)

    return accuracy

def compute_accuracy(predictions, labels, keys):
    combined_data = list(zip(labels, predictions))
    count_correct = sum([1 for label, predicted in combined_data 
                         if label.item() in keys and label in predicted])
    
    return count_correct / len(labels)

validate_model(my_model, test_iter, target_keys, computing_device="mps")
