# Phishing, Malware, and Spam Detection Model Training
This notebook trains models for phishing, malware, spam, and attachment malware detection using a  Enron dataset.


In [7]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from sklearn.ensemble import IsolationForest
import pickle
import re
from urllib.parse import urlparse
import math

def extract_urls(text):
    url_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+'
    return re.findall(url_pattern, text)

def compute_entropy(text):
    if not text:
        return 0.0
    entropy = 0
    for x in set(text):
        p_x = float(text.count(x)) / len(text)
        entropy -= p_x * math.log2(p_x)
    return entropy

def compute_url_features(url, email_body="", include_extended_features=False):
    parsed_url = urlparse(url)
    hostname = parsed_url.hostname or ''
    path = parsed_url.path
    query = parsed_url.query
    features = {
        'NumDots': hostname.count('.'),
        'SubdomainLevel': len([s for s in hostname.split('.') if s]) - 2 if hostname else 0,
        'PathLevel': len([p for p in path.split('/') if p]),
        'UrlLength': len(url),
        'NumDash': url.count('-'),
        'NumDashInHostname': hostname.count('-'),
        'AtSymbol': 1 if '@' in url else 0,
        'TildeSymbol': 1 if '~' in url else 0,
        'NumUnderscore': url.count('_'),
        'NumPercent': url.count('%'),
        'NumQueryComponents': len(query.split('&')) if query else 0,
        'NumAmpersand': url.count('&'),
        'NumHash': url.count('#'),
        'NumNumericChars': sum(c.isdigit() for c in url),
        'NoHttps': 0 if url.startswith('https') else 1,
        'RandomString': 1 if any(c.isalpha() for c in hostname) and len(hostname) > 20 else 0,
        'IpAddress': 1 if re.match(r'^\d+\.\d+\.\d+\.\d+$', hostname) else 0,
        'DomainInSubdomains': 1 if 'domain' in hostname.lower() else 0,
        'DomainInPaths': 1 if 'domain' in path.lower() else 0,
        'HttpsInHostname': 1 if 'https' in hostname.lower() else 0,
        'HostnameLength': len(hostname),
        'PathLength': len(path),
        'QueryLength': len(query),
        'DoubleSlashInPath': 1 if '//' in path else 0,
        'NumSensitiveWords': sum(1 for word in ['login', 'secure', 'account', 'bank'] if word in url.lower()),
        'EmbeddedBrandName': 1 if any(brand in hostname.lower() for brand in ['paypal', 'google', 'facebook']) else 0,
        'PctExtHyperlinks': 0.0,
        'PctExtResourceUrls': 0.0,
        'ExtFavicon': 0,
        'InsecureForms': 0,
        'RelativeFormAction': 0,
        'ExtFormAction': 0,
        'AbnormalFormAction': 0,
        'PctNullSelfRedirectHyperlinks': 0.0,
        'FrequentDomainNameMismatch': 0,
        'FakeLinkInStatusBar': 0,
        'RightClickDisabled': 0,
        'PopUpWindow': 0,
        'SubmitInfoToEmail': 0,
        'IframeOrFrame': 0,
        'MissingTitle': 0,
        'ImagesOnlyInForm': 0,
        'SubdomainLevelRT': 1 if len([s for s in hostname.split('.') if s]) - 2 <= 1 else 0,
        'UrlLengthRT': 1 if len(url) <= 75 else 0,
        'PctExtResourceUrlsRT': 1,
        'AbnormalExtFormActionR': 1,
        'ExtMetaScriptLinkRT': 1,
        'PctExtNullSelfRedirectHyperlinksRT': 1
    }
    if include_extended_features:
        spam_keywords = ['win', 'free', 'urgent', 'lottery', 'click here']
        email_features = {
            'SpamKeywordCount': sum(1 for word in spam_keywords if word.lower() in email_body.lower()),
            'EmailLength': len(email_body),
            'SenderReputation': 0,
            'UrlEntropy': compute_entropy(url)
        }
        features.update(email_features)
    return features

def compute_attachment_features(attachment_info):
    try:
        filename, size_str = attachment_info.split(',')
        size = int(size_str.strip().replace('KB', '')) * 1024 if 'KB' in size_str else int(size_str.strip())
        file_ext = filename.split('.')[-1].lower() if '.' in filename else ''
        risky_extensions = ['exe', 'bat', 'js', 'vbs', 'scr']
        features = {
            'FileSize': size,
            'IsRiskyExtension': 1 if file_ext in risky_extensions else 0,
            'FileNameLength': len(filename),
            'FileNameEntropy': compute_entropy(filename)
        }
        return features
    except:
        return {'FileSize': 0, 'IsRiskyExtension': 0, 'FileNameLength': 0, 'FileNameEntropy': 0}

In [8]:
data = pd.read_csv('enron_spam_data.csv')
print(data.columns)
data['email_body'] = data['Message'].fillna('')
data['url'] = data['email_body'].apply(lambda x: extract_urls(x)[0] if extract_urls(x) else '')
data['attachment_info'] = np.where(data['Spam/Ham'] == 'spam', 'doc.exe,500KB', 'doc.pdf,200KB')
data['label'] = data['Spam/Ham'].map({'spam': 1, 'ham': 0})
data['attachment_label'] = np.where((data['label'] == 1) & (data['attachment_info'].str.contains('exe')), 1, 0)

url_features = data.apply(lambda row: compute_url_features(row['url'], row['email_body'], include_extended_features=True), axis=1)
url_feature_df = pd.DataFrame(url_features.tolist())
attachment_features = data['attachment_info'].apply(compute_attachment_features)
attachment_feature_df = pd.DataFrame(attachment_features.tolist())

X_train_url, X_test_url, y_train_url, y_test_url = train_test_split(url_feature_df, data['label'], test_size=0.2, random_state=42)
X_train_att, X_test_att, y_train_att, y_test_att = train_test_split(attachment_feature_df, data['attachment_label'], test_size=0.2, random_state=42)

Index(['Unnamed: 0', 'Subject', 'Message', 'Spam/Ham', 'Date'], dtype='object')


In [9]:
xgb_model = XGBClassifier(random_state=42)
xgb_model.fit(X_train_url, y_train_url)

lgb_model = LGBMClassifier(random_state=42)
lgb_model.fit(X_train_url, y_train_url)

anomaly_model = IsolationForest(random_state=42, contamination=0.1)
anomaly_model.fit(X_train_url)

malware_model = LGBMClassifier(random_state=42)
malware_model.fit(X_train_url, y_train_url)

spam_model = LGBMClassifier(random_state=42)
spam_model.fit(X_train_url, y_train_url)

attachment_malware_model = LGBMClassifier(random_state=42)
attachment_malware_model.fit(X_train_att, y_train_att)

with open('url_threat_xgb_model.pkl', 'wb') as f:
    pickle.dump(xgb_model, f)
with open('url_threat_lgb_model.pkl', 'wb') as f:
    pickle.dump(lgb_model, f)
with open('url_anomaly_model.pkl', 'wb') as f:
    pickle.dump(anomaly_model, f)
with open('malware_model.pkl', 'wb') as f:
    pickle.dump(malware_model, f)
with open('spam_model.pkl', 'wb') as f:
    pickle.dump(spam_model, f)
with open('attachment_malware_model.pkl', 'wb') as f:
    pickle.dump(attachment_malware_model, f)

[LightGBM] [Info] Number of positive: 13703, number of negative: 13269
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.000123 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 260
[LightGBM] [Info] Number of data points in the train set: 26972, number of used features: 2
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.508045 -> initscore=0.032184
[LightGBM] [Info] Start training from score 0.032184
[LightGBM] [Info] Number of positive: 13703, number of negative: 13269
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.000122 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 260
[LightGBM] [Info] Number of data points in the train set: 26972, number of used features: 2
[LightGBM] [Info] [binar