In [1]:
!pip install transformers
!pip install scikit-learn
!pip install torch



In [8]:
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
from tqdm import tqdm
import math
print("[INF] base loaded")
import torch
import torch.nn as nn
import torch.nn.functional as F
print("[INF] torch loaded")
from transformers import BertTokenizer, BertModel
print("[INF] transformers loaded")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

[INF] base loaded
[INF] torch loaded
[INF] transformers loaded


In [10]:
print("[INF] parsing dataset...")

csv_path = "ssh_anomaly_dataset.csv"
df = pd.read_csv(csv_path)

df["target"] = (df["label"] != "normal").astype(int)

df["timestamp"] = pd.to_datetime(df["timestamp"])
df["hour"] = df["timestamp"].dt.hour
df["minute"] = df["timestamp"].dt.minute

df = df.drop(columns=["label", "timestamp", "detail"])

X = df.drop(columns=["target"])
y = df["target"]

tmpX = (X["hour"].astype(str)
    + " " + X["minute"].astype(str)
    + " " + X["source_ip"]
    + " " + X["username"]
    + " " + X["event_type"]
    + " " + X["status"])
X = tmpX.tolist()

categorical_cols = ["source_ip", "username", "event_type", "status"]
numeric_cols = ["hour", "minute"]

[INF] parsing dataset...


In [None]:
print("[INF] tokenizing...")
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
model = BertModel.from_pretrained("bert-base-uncased")
model.eval()

def bert_embeddings(texts):
    embeddings = []
    for text in tqdm(texts):
        with torch.no_grad():
            inputs = tokenizer(text, return_tensors='pt', truncation=True, max_length=80, padding='max_length')
            outputs = model(**inputs)

            embeddings.append(outputs.last_hidden_state[:, 0, :].squeeze().cpu().numpy())
    # embeddings = np.array(embeddings, axis=1)
    return embeddings

embeddings = bert_embeddings(X)

print(embeddings[0].shape)
print(y.shape)

X_train, X_test, mask_train, mask_test, y_train, y_test = train_test_split(
    embeddings,
    y,
    test_size=0.2,
    random_state=42,
    stratify=y
)

[INF] tokenizing...


  4%|‚ñç         | 1797/41825 [06:06<2:11:45,  5.06it/s]

In [None]:
class SimpleTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=128):
        super().__init__()

        self.embedding = nn.Embedding(
            vocab_size,
            d_model,
            padding_idx=tokenizer.pad_token_id
        )

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=4,
            dim_feedforward=256,
            batch_first=True
        )

        self.encoder = nn.TransformerEncoder(
            encoder_layer,
            num_layers=2
        )

        self.classifier = nn.Linear(d_model, len(pd.unique(y)))

    def forward(self, input_ids, attention_mask):
        pad_mask = attention_mask == 0

        x = self.embedding(input_ids)
        x = self.encoder(x, src_key_padding_mask=pad_mask)

        attention_mask = attention_mask.unsqueeze(-1)
        x = (x * attention_mask).sum(dim=1) / attention_mask.sum(dim=1)

        return self.classifier(x)

model = SimpleTransformer(vocab_size=tokenizer.vocab_size)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()

In [None]:
for epoch in tqdm(range(10)):
    model.train()
    optimizer.zero_grad()

    for i in range(len(X_train)):
        logits = model(X_train[i], mask_train[i])
        loss = loss_fn(logits, y_train[i])

        loss.backward()

    optimizer.step()

    print(f"Epoch {epoch+1} | Loss: {loss.item():.4f}")

model.eval()
with torch.no_grad():
    preds = model(X_test, mask_test).argmax(dim=1)
    acc = (preds == y_test).float().mean()

print("Test accuracy:", acc.item())