In [None]:
!git clone https://github.com/UniboSecurityResearch/PLC-LD-dataset


In [None]:
!pip install -q transformers datasets accelerate torch xmltodict evaluate peft optuna wandb scikit-learn


In [None]:
# 2. Imports & Config
import os
import xmltodict
import numpy as np
from dataclasses import dataclass
from sklearn.model_selection import train_test_split
from datasets import Dataset
import evaluate
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding,
    set_seed
)
import wandb

@dataclass
class Config:
    legit_dir: str = "/content/PLC-LD-dataset/legitimate"
    mal_dir: str   = "/content/PLC-LD-dataset/malicious"
    model_name: str = "microsoft/codebert-base"
    output_dir: str = "/content/model_output"
    max_length: int = 512
    train_batch_size: int = 16
    eval_batch_size: int  = 32
    epochs: int     = 100
    lr: float       = 2e-5
    weight_decay: float = 0.01
    seed: int       = 42
    eval_split: float = 0.2
    accum_steps: int = 2
    fp16: bool      = True
    logging_steps: int = 100
    save_steps: int    = 500
    project_name: str = "PLC-LD-Detection"

cfg = Config()
set_seed(cfg.seed)

# Initialize Weights & Biases
wandb.init(project=cfg.project_name, config=vars(cfg))

In [None]:
# 3. Load and Prepare Data

def xml_to_text(path):
    with open(path, 'r', encoding='utf-8') as f:
        return str(xmltodict.parse(f.read()))

texts, labels = [], []
for fn in os.listdir(cfg.legit_dir):
    if fn.endswith('.xml') and fn.startswith('l'):
        legit_path = os.path.join(cfg.legit_dir, fn)
        mal_path   = os.path.join(cfg.mal_dir, 'm'+fn[1:])
        if os.path.exists(mal_path):
            texts.append(xml_to_text(legit_path)); labels.append(0)
            texts.append(xml_to_text(mal_path));   labels.append(1)

train_texts, val_texts, train_labels, val_labels = train_test_split(
    texts, labels, test_size=cfg.eval_split, random_state=cfg.seed, stratify=labels
)

dataset = Dataset.from_dict({
    'text': train_texts + val_texts,
    'label': train_labels + val_labels
})

# 4. Tokenization

tokenizer = AutoTokenizer.from_pretrained(cfg.model_name)

def preprocess(examples):
    enc = tokenizer(
        examples['text'], padding='max_length', truncation=True,
        max_length=cfg.max_length
    )
    enc['labels'] = examples['label']
    return enc

dataset = dataset.map(preprocess, batched=True)

dataset = dataset.train_test_split(test_size=len(val_texts), seed=cfg.seed)
train_dataset = dataset['train']
eval_dataset  = dataset['test']

data_collator = DataCollatorWithPadding(tokenizer)

# 5. Metrics
metric_acc = evaluate.load('accuracy')
metric_f1  = evaluate.load('f1')
def compute_metrics(pred):
    logits, labels = pred
    preds = np.argmax(logits, axis=-1)
    return {
        **metric_acc.compute(predictions=preds, references=labels),
        **metric_f1.compute(predictions=preds, references=labels, average='weighted')
    }

# 6. TrainingArguments & Trainer
training_args = TrainingArguments(
    output_dir=cfg.output_dir,
    num_train_epochs=cfg.epochs,
    per_device_train_batch_size=cfg.train_batch_size,
    per_device_eval_batch_size=cfg.eval_batch_size,
    gradient_accumulation_steps=cfg.accum_steps,
    learning_rate=cfg.lr,
    weight_decay=cfg.weight_decay,
    fp16=cfg.fp16,
    logging_steps=cfg.logging_steps,
    save_steps=cfg.save_steps,
    seed=cfg.seed,
    report_to="wandb"
)

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(cfg.model_name, num_labels=2)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics
)


In [None]:
# 7. Train & Evaluate
trainer.train()
eval_results = trainer.evaluate()
print("Evaluation Results:", eval_results)

In [None]:
model.save_pretrained(cfg.output_dir)
tokenizer.save_pretrained(cfg.output_dir)
print(f"Model and tokenizer saved to {cfg.output_dir}")

In [None]:
import pandas as pd
from IPython.display import display

eval_df = pd.DataFrame([eval_results])
display(eval_df)


In [None]:
import random

# Select a random file
all_files = [os.path.join(cfg.legit_dir, f) for f in os.listdir(cfg.legit_dir) if f.endswith('.xml') and f.startswith('l')] + \
            [os.path.join(cfg.mal_dir, f) for f in os.listdir(cfg.mal_dir) if f.endswith('.xml') and f.startswith('m')]

random_file_path = random.choice(all_files)
print(f"Selected random file: {random_file_path}")

# Determine the true label
true_label = 0 if 'legitimate' in random_file_path else 1
print(f"True label: {true_label}")


# Preprocess the file
text_to_classify = xml_to_text(random_file_path)
processed_input = preprocess({'text': [text_to_classify], 'label': [true_label]})

# Convert to PyTorch tensors and move to GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_ids = torch.tensor(processed_input['input_ids']).to(device)
attention_mask = torch.tensor(processed_input['attention_mask']).to(device)
model.to(device) # Ensure model is on the correct device


# Make prediction
model.eval() # Set the model to evaluation mode
with torch.no_grad():
    outputs = model(input_ids, attention_mask=attention_mask)
    logits = outputs.logits
    predicted_class = torch.argmax(logits, dim=-1).item()

print(f"Predicted class: {predicted_class}")

# Map predicted class to label name
label_map = {0: 'legitimate', 1: 'malicious'}
print(f"Predicted label: {label_map[predicted_class]}")