## Load Data

In [23]:
import kagglehub
import os
import json
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report
from sklearn.compose import ColumnTransformer
from xgboost import XGBClassifier
from scipy.stats import uniform, loguniform


path = kagglehub.dataset_download("naserabdullahalam/phishing-email-dataset")
data = []

datasets = [
    "Nigerian_Fraud.csv",
    "Ling.csv",
    "Nazario.csv",
    "SpamAssasin.csv",
    "CEAS_08.csv",
    # "Enron.csv"
]
for file in datasets:
    print(file)
    csv_path = os.path.join(path, file)
    subset_data = pd.read_csv(csv_path)
    data.append(subset_data)

all_data = pd.concat(data)


Nigerian_Fraud.csv
Ling.csv
Nazario.csv
SpamAssasin.csv
CEAS_08.csv


## Clean & Split Data

In [24]:
all_data['receiver'] = all_data['receiver'].str.replace('undisclosed-recipients:;', 'Unknown')
# -- Timestamp features -- 
all_data['date_parsed'] = pd.to_datetime(all_data['date'], errors='coerce', utc=True)

# Week-of-year
iso_week = all_data['date_parsed'].dt.isocalendar().week
iso_week = iso_week.astype(float)
week0 = ((iso_week - 1) % 52)
theta_week = 2.0 * np.pi * week0 / 52
all_data['sin_week'] = np.where(week0.notna(), np.sin(theta_week), 0.0)
all_data['cos_week'] = np.where(week0.notna(), np.cos(theta_week), 0.0)

# Hour-of-day 
hour = all_data['date_parsed'].dt.hour.astype(float)  # NaN for missing
theta_hour = 2.0 * np.pi * hour / 24
all_data['sin_hour'] = np.where(hour.notna(), np.sin(theta_hour), 0.0)
all_data['cos_hour'] = np.where(hour.notna(), np.cos(theta_hour), 0.0)

# Weekend binary (0/1)
weekday = all_data['date_parsed'].dt.weekday
all_data['is_weekend'] = np.where(weekday.isna(), 0, ((weekday >= 5).astype(int)))

# Timestamp feature list
timestamp_features = [
    "sin_week",
    "cos_week",
    "sin_hour",
    "cos_hour",
    "is_weekend"
]

# -- Sender/reciever feature engineering -- 
with open('domains.json', 'r') as file:
    public_email_domains = json.load(file)
    
email_regex = r'([a-zA-Z0-9._%+\-|{}^&"\'=]+@(?:[a-zA-Z0-9.-]+|\[[0-9.]+\]))'    
for column_name in ('sender', 'receiver'):
    all_data[f'{column_name}_email'] = all_data[column_name].str.extract(email_regex, expand=False)
    all_data[f'{column_name}_domain'] = all_data[f'{column_name}_email'].str.split('@', n=1).str[1]
    all_data[f'{column_name}_domain_len'] = all_data[f'{column_name}_domain'].str.len()
    all_data[f'{column_name}_domain_public'] = all_data[f'{column_name}_domain'].str.lower().isin(public_email_domains).astype(int)
    all_data[f'{column_name}_n_subdomains'] = all_data[f'{column_name}_domain'].str.lower().str.count(r'\.')
    all_data[f'{column_name}_email_n_digits'] = all_data[f'{column_name}_domain'].str.lower().str.count(r'\d')
    
    all_data[f'{column_name}_name'] = all_data[column_name].str.replace(email_regex, '', regex=True)
    all_data[f'{column_name}_name'] = all_data[f'{column_name}_name'].str.replace(r'[<>"\'\(\)]', '', regex=True).str.strip()
    
all_data['is_internal_email'] = (
    (all_data['sender_domain'] == all_data['receiver_domain']) & 
    (all_data['sender_domain'].notna())
).astype(int)

all_data['sender_name_contains_email'] = all_data['sender_name'].str.contains('@', na=False).astype(int)

# Sender/reciever feature list
email_features = [
    "sender_domain_public",
    "sender_domain_len",
    "sender_n_subdomains",
    "sender_email_n_digits",
    "sender_name_contains_email",
    # "is_internal_email"
]

# -- Fill in url count for missing entries -- 
url_regex = r'((?:https?|ftp)://\S+|www\.\S+)'
text_column = 'body' 
missing_count_mask = all_data['urls'].isna()
all_data.loc[missing_count_mask, 'urls'] = (
    all_data.loc[missing_count_mask, text_column]
    .astype(str)
    .str.count(url_regex)
)

all_data[['body', 'subject']] = all_data[['body', 'subject']].fillna('Unknown')

feature_set= [
    'subject',
    'body',
    *email_features,
    *timestamp_features
]

X = all_data[feature_set]
y = all_data['label'] 

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)


In [None]:
# -- Initialize Model --
vectorizer = ColumnTransformer([
    ('subject_word_vectorizer', TfidfVectorizer(lowercase=False, analyzer='word'), 'subject'),
    ('subject_charwb_vectorizer', TfidfVectorizer(lowercase=False, analyzer='char_wb'), 'subject'),
    ('body_vectorizer', TfidfVectorizer(lowercase=True, analyzer='word'), 'body' )
    ],
    remainder="passthrough"
)

model_pipeline = Pipeline([
    ('text_vectorizer', vectorizer),
    ('xgboost', XGBClassifier())
])

# -- Hyperparameter Tuning --
param_dist = {
    'xgboost__n_estimators': range(100, 500),
    'xgboost__learning_rate': loguniform(0.01, 0.3),
    'xgboost__max_depth': range(3, 10),
    'xgboost__subsample': uniform(0.6, 0.1), 
    
    'text_vectorizer__body_vectorizer__ngram_range': [(1, 1), (1, 2)],
    'text_vectorizer__body_vectorizer__min_df': loguniform(1e-4, 0.1),
    
}

random_search = RandomizedSearchCV(
    model_pipeline, 
    param_distributions=param_dist,
    n_iter=50,
    cv=3,
    scoring='f1',
    n_jobs=-1,
    verbose=1,
    random_state=42
)

print("Starting Random Search...")
random_search.fit(X_train, y_train)

print(f"Best Score: {random_search.best_score_}")
print(f"Best Params: {random_search.best_params_}")


Starting Random Search...
Fitting 3 folds for each of 50 candidates, totalling 150 fits


In [None]:
# -- Analyze results --

y_pred = random_search.predict(X_test)
model_result = classification_report(y_test, y_pred, digits=5)

print("\nClassification Report:")
print(model_result)


xg_model = model_pipeline.named_steps['xgboost']
preprocessor = model_pipeline.named_steps['text_vectorizer']

feature_names = preprocessor.get_feature_names_out()
importances = xg_model.feature_importances_

feats_df = pd.DataFrame({
    'Feature': feature_names,
    'Importance': importances
})

print(feats_df.sort_values(by='Importance', ascending=False).head(30))
with open('results.txt', 'a') as file:
    file.write(f"""
\n--- RESULTS ---
Datasets Used: {datasets}
Processing: {preprocessor.named_transformers_}
Features: {X.columns.tolist()}
{model_result}
""")



Classification Report:
              precision    recall  f1-score   support

           0    0.99267   0.98397   0.98830      4678
           1    0.98730   0.99420   0.99074      5866

    accuracy                        0.98966     10544
   macro avg    0.98999   0.98909   0.98952     10544
weighted avg    0.98968   0.98966   0.98966     10544



NotFittedError: This ColumnTransformer instance is not fitted yet. Call 'fit' with appropriate arguments before using this estimator.