In [31]:
# %%[1] imports
import os
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import kagglehub
from tqdm.notebook import tqdm
import numpy as np

In [2]:
# %%[0] download dataset

path = kagglehub.dataset_download("sid321axn/malicious-urls-dataset")
file_path = os.path.join(path, "malicious_phish.csv")
df = pd.read_csv(file_path)

In [17]:
# %%[1] preprocess

def map_labels(x):
    if x.lower() == "benign":
        return 0
    elif x.lower() == "phishing":
        return 1
    elif x.lower() == "defacement":
        return 2
    else:
        return 3

id_to_label = {
    0: "benign",
    1: "phishing",
    2: "defacement",
    3: "malware"
}

df = df[['url', 'type']].dropna()
df['label'] = df['type'].apply(map_labels)
train_df, test_df = train_test_split(df[['url', 'label']], test_size=0.2, stratify=df['label'], random_state=42)

In [34]:
# %%[2] alphabet and encoding

ALPHABET = list("abcdefghijklmnopqrstuvwxyz0123456789-._~:/?#[]@!$&'()*+,;=%")
CHAR2IDX = {c: i + 1 for i, c in enumerate(ALPHABET)}
VOCAB_SIZE = len(CHAR2IDX) + 1
MAX_LEN = 200

class URLDataset(Dataset):
    def __init__(self, df):
        self.urls = df['url'].tolist()
        self.labels = df['label'].tolist()

    def __len__(self):
        return len(self.urls)

    def encode(self, url):
        url = url.lower()[:MAX_LEN]
        return torch.tensor([CHAR2IDX.get(c, 0) for c in url] + [0] * (MAX_LEN - len(url)), dtype=torch.long)

    def __getitem__(self, idx):
        return self.encode(self.urls[idx]), torch.tensor(self.labels[idx], dtype=torch.long)

class CharCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(VOCAB_SIZE, 16, padding_idx=0)
        self.conv1 = nn.Conv1d(16, 128, kernel_size=5, padding=2)
        self.conv2 = nn.Conv1d(128, 128, kernel_size=5, padding=2)
        self.pool = nn.AdaptiveMaxPool1d(1)
        self.fc1 = nn.Linear(128, 64)
        self.dropout_1 = nn.Dropout(p=.3)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, 4)

    def forward(self, x):
        x = self.embedding(x).transpose(1, 2)
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.pool(x).squeeze(-1)
        x = F.relu(self.fc1(x))
        x = self.dropout_1(x)
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def train(model, loader, optimizer, criterion):
    model.train()
    for x, y in tqdm(loader):
        x, y = x.cuda(), y.cuda()
        optimizer.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()

def evaluate(model, loader):
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for x, y in loader:
            x, y = x.cuda(), y.cuda()
            out = (F.softmax(model(x), dim=1)).detach().cpu().numpy()
            all_preds.extend(out)
            all_labels.extend(y.cpu().numpy())
    preds_bin = [np.argmax(p) for p in all_preds]
    f1 = f1_score(all_labels, preds_bin, average='macro')
    print(f"F1: {f1:.4f}")
    return f1

# data
train_ds = URLDataset(train_df)
test_ds = URLDataset(test_df)
train_dl = DataLoader(train_ds, batch_size=64, shuffle=True)
test_dl = DataLoader(test_ds, batch_size=64)

In [35]:
# %%[3]raining

model = CharCNN().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

for epoch in tqdm(range(5)):
    train(model, train_dl, optimizer, criterion)
    torch.save(model.state_dict(), f"char_cnn_snapshot_{epoch}.pth")
    print(f"\nEpoch {epoch + 1}")
    evaluate(model, test_dl)

  0%|          | 0/5 [00:00<?, ?it/s]

  0%|          | 0/8140 [00:00<?, ?it/s]


Epoch 1
F1: 0.9595


  0%|          | 0/8140 [00:00<?, ?it/s]


Epoch 2
F1: 0.9699


  0%|          | 0/8140 [00:00<?, ?it/s]


Epoch 3
F1: 0.9735


  0%|          | 0/8140 [00:00<?, ?it/s]


Epoch 4
F1: 0.9732


  0%|          | 0/8140 [00:00<?, ?it/s]


Epoch 5
F1: 0.9740


In [11]:
model_dict = torch.load("char_cnn_snapshot_4.pth")
model.load_state_dict(model_dict)

  model_dict = torch.load("char_cnn_snapshot_4.pth")


<All keys matched successfully>

In [23]:
from sklearn.metrics import f1_score, roc_auc_score
from sklearn.model_selection import StratifiedKFold

def evaluate_metrics(model, loader):
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for x, y in loader:
            x, y = x.cuda(), y.cuda()
            out = (F.softmax(model(x), dim=1)).detach().cpu().numpy()
            all_preds.extend(out)
            all_labels.extend(y.cpu().numpy())
    preds_bin = [np.argmax(p) for p in all_preds]
    f1 = f1_score(all_labels, preds_bin, average='macro')
    auc = roc_auc_score(all_labels, all_preds)
    print(f"F1: {f1:.4f}, AUC: {auc:.4f}")
    return f1, auc

# training with scheduler
def train_model(model, train_dl, val_dl=None, epochs=5, lr=1e-3):
    model = model.cuda()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=1, verbose=True)
    criterion = nn.BCELoss()

    for epoch in range(epochs):
        model.train()
        for x, y in train_dl:
            x, y = x.cuda(), y.cuda()
            optimizer.zero_grad()
            out = model(x)
            loss = criterion(out, y)
            loss.backward()
            optimizer.step()
        print(f"\nEpoch {epoch + 1}")
        if val_dl:
            f1, auc = evaluate_metrics(model, val_dl)
            scheduler.step(auc)
        else:
            evaluate(model, train_dl)
    return model

# cross-validation loop
def run_cv(df, k=5, epochs=5):
    skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=42)
    f1s, aucs = [], []

    for i, (train_idx, val_idx) in enumerate(skf.split(df['url'], df['label'])):
        print(f"\nFold {i + 1}")
        train_fold = df.iloc[train_idx]
        val_fold = df.iloc[val_idx]

        train_ds = URLDataset(train_fold)
        val_ds = URLDataset(val_fold)
        train_dl = DataLoader(train_ds, batch_size=64, shuffle=True)
        val_dl = DataLoader(val_ds, batch_size=64)

        model = CharCNN()
        model = train_model(model, train_dl, val_dl, epochs=epochs)
        f1, auc = evaluate_metrics(model, val_dl)
        f1s.append(f1)
        aucs.append(auc)

    print("\nCV Summary")
    print(f"F1 mean: {sum(f1s) / len(f1s):.4f}")
    print(f"AUC mean: {sum(aucs) / len(aucs):.4f}")

# simple hyperparam tuning scaffold
def sweep_hyperparams(param_grid):
    for lr in param_grid.get("lr", [1e-3]):
        for bs in param_grid.get("batch_size", [64]):
            print(f"\nTesting config: lr={lr}, batch_size={bs}")
            train_ds = URLDataset(train_df)
            test_ds = URLDataset(test_df)
            train_dl = DataLoader(train_ds, batch_size=bs, shuffle=True)
            test_dl = DataLoader(test_ds, batch_size=bs)

            model = CharCNN()
            model = train_model(model, train_dl, test_dl, epochs=5, lr=lr)
            evaluate_metrics(model, test_dl)

# example usage:
# run_cv(train_df, k=5, epochs=5)
# sweep_hyperparams({"lr": [1e-3, 5e-4], "batch_size": [32, 64]})

In [14]:
evaluate_metrics(model, test_dl)

F1: 0.9856, AUC: 0.9986


(0.9856186434023124, 0.9985624621243145)