<a href="https://colab.research.google.com/github/zionvanwyk/email-phishing-detector/blob/main/Additional_Features_Phishing_Email_Model_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Set up environment

*   Install dependencies.
*   Connect to Huggingface to access the uploaded dataset via access token.





In [None]:
!pip install transformers evaluate accelerate pandas scikit-learn
!pip install huggingface_hub
!pip install torch
!pip install -U datasets

In [None]:
from huggingface_hub import notebook_login
import pandas as pd
import os

notebook_login()


# Upload dataset

*   Used phishing_email.csv from the Kaggle phishing email dataset that can be found here: https://www.kaggle.com/datasets/naserabdullahalam/phishing-email-dataset


*   This is a combination of six datasets. The "text_combined" column is the central element of the final dataset in this phishing email analysis. It combines the subject line, the body, date, and sender email text of the emails from the initial datasets.


In [None]:
from datasets import load_dataset
dataset = load_dataset("zionia/phishing-emails")

## Tokenize the Dataset

*   Tokenize data using AutoTokenizer.
*   Add custom features:
        * text_lengths: Total character count of each email text.
        * token_counts: Number of whitespace-separated tokens (words/symbols) in each email.
        * avg_token_lengths: Average length (characters per token) for each email.
        * num_date_tokens: Count of date-related tokens (months, years) in email.
        * has_attachment_references: Binary flag (1/0) indicating if the email mentions file attachments (e.g., .pdf, "attached").
        * has_operational_keywords: Binary flag (1/0) for operational terms (e.g., "report", "schedule", "data").
        * has_phishy_keywords: Binary flag (1/0) for suspicious phrases (e.g., "urgent", "login", "verify").
*   Format the dataset for the trainer to use the custom features.

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
import numpy as np
import evaluate
import os

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

def tokenize_function(examples):
    tokenized_output = tokenizer(
        examples["text_combined"],
        padding="max_length",
        truncation=True,
        max_length=256
    )

    text_lengths = []
    token_counts = []
    avg_token_lengths = []
    num_date_tokens = []
    has_attachment_references = []
    has_operational_keywords = []
    has_phishy_keywords = []

    for text in examples["text_combined"]:
        text_length = len(text)

        tokens = text.lower().split()
        token_count = len(tokens)
        avg_token_length = sum(len(token) for token in tokens) / token_count if token_count > 0 else 0

        date_related = {"jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec",
                       "january", "february", "march", "april", "may", "june", "july", "august",
                       "september", "october", "november", "december",
                       "2001", "2002", "2003", "2004", "2005", "2006", "2007", "2008", "2009", "2010",
                       "2011", "2012", "2013", "2014", "2015", "2016", "2017", "2018", "2019", "2020",
                       "2021", "2022", "2023", "2024", "2025"}
        num_date_token = sum(1 for token in tokens if token in date_related)

        attachment_ref = any(ext in text.lower() for ext in [".xls", ".xlsx", ".pdf", ".doc", ".docx",
                                                           "attachment", "attached", "file"])
        operational_keywords = any(word in text.lower() for word in ["nom", "actual", "vols", "schedule",
                                                                   "attached", "report", "data", "summary"])
        phishy_keywords = any(word in text.lower() for word in ["verify", "urgent", "login", "click",
                                                               "bank", "account", "update", "password",
                                                               "security", "alert", "confirm", "immediately"])
        text_lengths.append(text_length)
        token_counts.append(token_count)
        avg_token_lengths.append(avg_token_length)
        num_date_tokens.append(num_date_token)
        has_attachment_references.append(int(attachment_ref))
        has_operational_keywords.append(int(operational_keywords))
        has_phishy_keywords.append(int(phishy_keywords))

    tokenized_output["text_length"] = text_lengths
    tokenized_output["token_count"] = token_counts
    tokenized_output["avg_token_length"] = avg_token_lengths
    tokenized_output["num_date_tokens"] = num_date_tokens
    tokenized_output["has_attachment_reference"] = has_attachment_references
    tokenized_output["has_operational_keywords"] = has_operational_keywords
    tokenized_output["has_phishy_keywords"] = has_phishy_keywords

    return tokenized_output

tokenized_dataset = dataset.map(tokenize_function, batched=True)

In [None]:
def format_dataset_for_trainer(dataset):
    def to_tensor(example):
        engineered_features = torch.tensor([
            example["text_length"],
            example["token_count"],
            example["avg_token_length"],
            example["num_date_tokens"],
            example["has_attachment_reference"],
            example["has_operational_keywords"],
            example["has_phishy_keywords"]
        ], dtype=torch.float)

        return {
            'input_ids': torch.tensor(example['input_ids']),
            'attention_mask': torch.tensor(example['attention_mask']),
            'labels': torch.tensor(example['label']),
            'extra_features': engineered_features
        }

    return dataset.map(to_tensor)


In [None]:
formatted_dataset = {
    "train": format_dataset_for_trainer(tokenized_dataset["train"]),
    "test": format_dataset_for_trainer(tokenized_dataset["test"]),
}

## Train the Model

*  Chosen model is DistilBERT.
*  Add the 7 custom features to the configuration for fine-tuning.
*  Train model and push to Huggingface hub.

In [None]:
from transformers import DistilBertPreTrainedModel, DistilBertModel

class DistilBertWithFeatures(DistilBertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.distilbert = DistilBertModel(config)
        self.dropout = nn.Dropout(config.dropout)

        self.classifier = nn.Linear(768 + 7, config.num_labels)
        self.init_weights()

    def forward(self, input_ids=None, attention_mask=None, extra_features=None, labels=None, **kwargs):
        distilbert_output = self.distilbert(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        hidden_state = distilbert_output[0]
        pooled_output = hidden_state[:, 0]

        if extra_features is not None:
            pooled_output = torch.cat((pooled_output, extra_features), dim=1)

        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

        return {'loss': loss, 'logits': logits} if loss is not None else logits

In [None]:
from transformers import DistilBertConfig
import torch
import torch.nn as nn

config = DistilBertConfig.from_pretrained(
    "distilbert-base-uncased",
    num_labels=2,
    dropout=0.1
)
model = DistilBertWithFeatures.from_pretrained(
    "distilbert-base-uncased",
    config=config
)

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import Trainer, TrainingArguments
from datasets import Dataset
import torch
import numpy as np
import evaluate

class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        extra_features = inputs.pop("extra_features")
        outputs = model(**inputs, extra_features=extra_features)
        loss = torch.nn.functional.cross_entropy(outputs, labels)
        return (loss, outputs) if return_outputs else loss


training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
    save_strategy="epoch",
    load_best_model_at_end=True,
    push_to_hub=True,
    hub_model_id="zionia/email-phishing-detector",
    hub_strategy="end",
    report_to="none"
)

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    accuracy = evaluate.load("accuracy").compute(predictions=predictions, references=labels)['accuracy']
    precision = evaluate.load("precision").compute(predictions=predictions, references=labels, average='binary')['precision']
    recall = evaluate.load("recall").compute(predictions=predictions, references=labels, average='binary')['recall']
    f1 = evaluate.load("f1").compute(predictions=predictions, references=labels, average='binary')['f1']
    return {"accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1}

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=formatted_dataset["train"],
    eval_dataset=formatted_dataset["test"],
    compute_metrics=compute_metrics,
)

In [None]:
from transformers import Trainer, TrainingArguments

trainer.train()
trainer.push_to_hub("Training complete - basic model (with feature extraction)")
tokenizer.push_to_hub("zionia/email-phishing-detector")