In [None]:
pip install --upgrade transformers

In [None]:
import pandas as pd
import re
from urllib.parse import urlparse, parse_qs
import os


# Data Preprocessing before training

with open("/content/urlhaus.abuse.ch.txt", "r") as f:
    urls = [line.strip() for line in f if not line.startswith("#") and line.strip()]

df = pd.DataFrame(urls, columns=['url'])

# Function to extract components of urls, as BERT trains on text
def extract_url_components(url):
    parsed = urlparse(url)
    hostname = parsed.hostname
    ip_pattern = r"^\d{1,3}(\.\d{1,3}){3}$"
    is_ip = bool(re.match(ip_pattern, hostname)) if hostname else False
    domain = hostname if not is_ip else hostname
    subdomain = "" if is_ip or not hostname else '.'.join(hostname.split('.')[:-2]) if hostname.count('.') > 1 else ''
    path = parsed.path
    file_ext = os.path.splitext(parsed.path)[1] if '.' in os.path.basename(parsed.path) else ''
    query_params = parsed.query
    return pd.Series([domain, subdomain, path, file_ext, query_params, is_ip])


df[['Domain', 'Subdomain', 'Path', 'File Extension', 'Query Parameters', 'Contains_IP']] = df['url'].apply(extract_url_components)
df['label'] = [1] * len(df['url'])


# Data for training from these files
phis = pd.read_csv('/content/phishing-urls.csv')
phis['label'] = [1] * len(phis['Domain'])
phis['url'] = phis['Domain'] + phis['Path']
phis_df = phis[['url','label']]
leg = pd.read_csv('/content/legitimate-urls.csv')
leg['label'] = [0] * len(leg['Domain'])
leg['url'] = leg['Domain'] + leg['Path']
leg_df = leg[['url','label']]



# GEtting data into the right format
df_combined = pd.concat([phis_df, leg_df], ignore_index=True)
df_combined['url'] = df_combined['url'].astype(str).fillna('')
df_combined[['Domain', 'Subdomain', 'Path', 'File Extension', 'Query Parameters', 'Contains_IP']] = (
    df_combined['url'].apply(extract_url_components).apply(pd.Series)
)
df_combined['label'] = df_combined['label'].astype(bool)

df_final_last = pd.concat([df, df_combined], ignore_index=True)
df_final_last['label'] = df_final_last['label'].astype(int)



In [None]:


# Training Model
import pandas as pd
import torch
import time
from transformers import BertTokenizer, BertForSequenceClassification
from torch.optim import AdamW

from torch.utils.data import DataLoader, Dataset, RandomSampler, SequentialSampler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix




# Splitting the data into training and testing sets for validation later
train_df, test_df = train_test_split(df_final_last, test_size=0.2, random_state=42)

# Load the BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Custom Dataset Class to reuse code
class URLDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.data = dataframe
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        # Extract features
        url = self.data.iloc[index]['url']
        domain = self.data.iloc[index]['Domain']
        subdomain = self.data.iloc[index]['Subdomain']
        path = self.data.iloc[index]['Path']
        file_extension = self.data.iloc[index]['File Extension']
        query_params = self.data.iloc[index]['Query Parameters']
        contains_ip = self.data.iloc[index]['Contains_IP']

        # Combine features into a single string
        combined_text = (
            f"URL: {url} Domain: {domain} Subdomain: {subdomain} "
            f"Path: {path} File Extension: {file_extension} "
            f"Query Parameters: {query_params} Contains IP: {contains_ip}"
        )

        # Tokenize the combined text
        encoding = self.tokenizer.encode_plus(
            combined_text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )

        # Get the label
        label = self.data.iloc[index]['label']

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'label': torch.tensor(label, dtype=torch.long)
        }




# Making Dataset
max_len = 128
train_dataset = URLDataset(train_df, tokenizer, max_len)
test_dataset = URLDataset(test_df, tokenizer, max_len)

train_dataloader = DataLoader(train_dataset, sampler=RandomSampler(train_dataset), batch_size=16)
test_dataloader = DataLoader(test_dataset, sampler=SequentialSampler(test_dataset), batch_size=16)


In [None]:

# Loading the BERT model for sequence classification
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)
optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')    # Used GPU in training my model , so always use GPU to run it
model.to(device)


epochs = 2   # Number of epochs can be changed , but due to computational constraints kept it as 2

for epoch in range(epochs):
    model.train()
    total_loss = 0
    start_time = time.time()

    for batch_idx, batch in enumerate(train_dataloader):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        model.zero_grad()
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        total_loss += loss.item()

        loss.backward()
        optimizer.step()

        if batch_idx % 10 == 0:
            print(f'Epoch {epoch + 1}/{epochs}, Batch {batch_idx}/{len(train_dataloader)}, Loss: {loss.item()}')

    # Calculate average training loss
    avg_train_loss = total_loss / len(train_dataloader)
    end_time = time.time()
    #print(f'Epoch {epoch + 1}/{epochs} - Training Loss: {avg_train_loss}, Time: {end_time - start_time} seconds')       Comment can be removed for more real time tracking of the progress

# Evaluating the model
# Initializing the lists
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch in test_dataloader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits

        preds = torch.argmax(logits, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate evaluation metrics
accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds)
conf_matrix = confusion_matrix(all_labels, all_preds)

print(f'Test Accuracy: {accuracy}')
print(f'Precision: {precision}')
print(f'Confusion Matrix:\n{conf_matrix}')

In [None]:
model.eval()
test_preds, test_labels = [], []

for batch in test_dataloader:
    inputs = {k: v.to(device) for k, v in batch.items() if k != 'label'}
    with torch.no_grad():
        outputs = model(**inputs)
    preds = torch.argmax(outputs.logits, dim=1)
    test_preds.extend(preds.cpu().numpy())
    test_labels.extend(batch['label'].cpu().numpy())

print(f"Accuracy: {accuracy_score(test_labels, test_preds)}")

Accuracy: 0.9986
