## Read input

In [305]:
with open('dataset/cryptominer_commands.txt', 'r', encoding='utf-8') as f:
    crypto_lines = f.readlines()

with open('dataset/non_cryptominer_commands.txt', 'r', encoding='utf-8') as f:
    noncrypto_lines = f.readlines()

len(crypto_lines), len(noncrypto_lines)

(56, 100)

## Tokenisation and training split

In [307]:
import torch
import tiktoken
enc = tiktoken.get_encoding("gpt2")

command_max_length = 200

def convert_lines_to_token_tensors(lines,): 
    data = [enc.encode(line) for line in lines]
    padded_lists = [lst + [0] * (command_max_length - len(lst)) for lst in data]

    tensor_data = torch.tensor(padded_lists).long()
    return tensor_data

X = convert_lines_to_token_tensors(crypto_lines + noncrypto_lines)
Y = torch.cat([torch.ones(len(crypto_lines)), torch.zeros(len(noncrypto_lines))])

X.shape, Y.shape

(torch.Size([156, 200]), torch.Size([156]))

In [308]:
vocab_size = enc.n_vocab # Total number of tokens in the vocabulary for the tiktoken encoding
embedding_dim = 16 # Dimension of the embedding vector per token. Each token will be converted to this size vector and later will be transformed to have inner meaning

from torch import nn
from torch.nn import functional as F


class CryptoMinerDetectionModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.embedding_layer = torch.nn.Embedding(vocab_size, embedding_dim)
        self.flatten_layer = torch.nn.Flatten()
        self.linear_layer = torch.nn.Linear(embedding_dim * command_max_length, 1)
        self.sigmoid = torch.nn.Sigmoid()

    def forward(self, inputs, targets = None):
        # Forward pass
        token_embeddings = self.embedding_layer(inputs)
        flatten_layer_output = self.flatten_layer(token_embeddings)
        linear_layer_output = self.linear_layer(flatten_layer_output)
        logits = self.sigmoid(linear_layer_output)

        if targets is None:
            loss = None
        else:
            loss = F.cross_entropy(logits.view(-1), targets)
        return logits, loss
    


## Training

In [319]:
model = CryptoMinerDetectionModel()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)

inputs = X
targets =  Y
Y

tensor([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
        1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
        1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
        1., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

In [334]:
for steps in range(1000):
    logits, loss = model(inputs, targets)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()
print(loss)

tensor(253.7101, grad_fn=<DivBackward1>)


## Prediction

In [336]:
test_input = "/usr/bin/node /pitcher/pitcher-agent.cjs containers exec run --id 4074f312-5bae-4d5a-97e1-1b8ca53d9b41 --workspace /project/sandbox --interactive --tty --verbose --command pnpm dev"
test_input_tensor = convert_lines_to_token_tensors([test_input])

with torch.no_grad():
    prediction = model(test_input_tensor)
    print(prediction)

(tensor([[0.9998]]), None)
