In [None]:
# BaitDetector: Phishing URL Detection

# Setup and Imports
!pip install requests tldextract scikit-learn patool

In [None]:
import requests
import pandas as pd
import numpy as np
from urllib.parse import urlparse
import tldextract
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, roc_auc_score
from sklearn.feature_extraction.text import TfidfVectorizer
from io import StringIO
import matplotlib.pyplot as plt
from sklearn.feature_selection import RFE
import re
import seaborn as sns
import random
import time
from sklearn.utils import resample
from concurrent.futures import ThreadPoolExecutor, as_completed
import patoolib
import tarfile
import io

In [None]:
"""# 0. Preparing the dataset
url = 'https://raw.githubusercontent.com/mitchellkrogza/Phishing.Database/master/ALL-phishing-links.tar.gz'
response = requests.get(url, stream=True)
file = 'temp.tar.gz'

with open(file, 'wb') as f:
    for chunk in response.iter_content(chunk_size=1024):
        if chunk:
            f.write(chunk)

patoolib.extract_archive(file)

with open('YOUR_FILE.txt', 'r') as f:
    text = f.read()"""

In [None]:
# 1. Data Collection

def fetch_phishing_urls_github(limit=50000):
    """Fetch phishing URLs from Mitchell Krogza's Phishing.Database on GitHub."""
    github_url = "https://raw.githubusercontent.com/mitchellkrogza/Phishing.Database/master/ALL-phishing-links.tar.gz"
    try:
        response = requests.get(github_url)
        response.raise_for_status()

        # Create a BytesIO object from the response content
        tar_bytes = io.BytesIO(response.content)

        # Open the tar.gz file
        with tarfile.open(fileobj=tar_bytes, mode='r:gz') as tar:
            urls = []
            # Iterate through all files in the archive
            for member in tar.getmembers():
                if member.name.endswith('.lst'):
                    # Extract the .lst file content
                    f = tar.extractfile(member)
                    if f:
                        content = f.read().decode('utf-8')
                        urls.extend(content.strip().split('\n'))

        print(f"Fetched {len(urls)} URLs from GitHub")
        return urls[:limit]
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data from GitHub Phishing.Database: {e}")
        return []
    except tarfile.TarError as e:
        print(f"Error processing tar.gz file: {e}")
        return []
    except Exception as e:
        print(f"Unexpected error: {e}")
        return []

def fetch_legitimate_urls_majestic(limit=50000):
    """Fetch legitimate URLs from Majestic Million."""
    majestic_url = "https://downloads.majestic.com/majestic_million.csv"
    try:
        response = requests.get(majestic_url)
        response.raise_for_status()
        csv_content = StringIO(response.text)
        df = pd.read_csv(csv_content, usecols=['Domain'], nrows=limit)
        return ["http://" + domain for domain in df['Domain']]
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data from Majestic Million: {e}")
        return []
    except pd.errors.EmptyDataError:
        print("The CSV file from Majestic Million is empty.")
        return []

# Collect data
print("Fetching phishing URLs from GitHub...")
phishing_urls = fetch_phishing_urls_github(50000)
print(f"Fetched a total of {len(phishing_urls)} phishing URLs")

print("\nFetching legitimate URLs...")
legitimate_urls = fetch_legitimate_urls_majestic(50000)
print(f"Fetched {len(legitimate_urls)} legitimate URLs")

# Remove duplicates and ensure all URLs start with http/https
phishing_urls = list(set([url for url in phishing_urls if url.startswith('http')]))
legitimate_urls = list(set(legitimate_urls))

# Shuffle URLs
random.shuffle(phishing_urls)
random.shuffle(legitimate_urls)

# Balance the dataset
min_urls = min(len(phishing_urls), len(legitimate_urls))
if min_urls > 0:
    phishing_urls = phishing_urls[:min_urls]
    legitimate_urls = legitimate_urls[:min_urls]

    print(f"\nFinal dataset:")
    print(f"Collected {len(phishing_urls)} phishing URLs")
    print(f"Collected {len(legitimate_urls)} legitimate URLs")

    # Display some examples
    print("\nExample phishing URLs:")
    print(phishing_urls[:5])
    print("\nExample legitimate URLs:")
    print(legitimate_urls[:5])

    # Combine all URLs and create labels
    all_urls = phishing_urls + legitimate_urls
    labels = [1] * len(phishing_urls) + [0] * len(legitimate_urls)

    # Shuffle the combined dataset
    combined = list(zip(all_urls, labels))
    random.shuffle(combined)
    all_urls, labels = zip(*combined)

    print("\nDataset is ready for feature extraction and model training.")
else:
    print("Error: Unable to collect enough URLs. Please check your internet connection and the source URLs.")


In [None]:
def extract_features(url):
    parsed = urlparse(url)
    extracted = tldextract.extract(url)

    features = {
        'length': len(url),
        'num_dots': url.count('.'),
        'num_hyphens': url.count('-'),
        'num_underscores': url.count('_'),
        'num_digits': sum(c.isdigit() for c in url),
        'has_https': int(parsed.scheme == 'https'),
        'domain_length': len(extracted.domain),
        'num_subdomains': len(extracted.subdomain.split('.')) if extracted.subdomain else 0,
        'has_ip': int(bool(re.match(r'\d+\.\d+\.\d+\.\d+', extracted.domain))),
        'has_at_symbol': int('@' in url),
        'has_double_slash': int('//' in parsed.path),
        'has_hex': int(any(c.isdigit() or c.lower() in 'abcdef' for c in url)),
        'num_params': len(parsed.query.split('&')) if parsed.query else 0,
        # Additional Lexical Features
        'has_login': int('login' in url.lower()),
        'has_verify': int('verify' in url.lower()),
        'has_secure': int('secure' in url.lower()),
    }

    # Non-predefined features (TF-IDF)
    tfidf = TfidfVectorizer(analyzer='char', ngram_range=(3,5), max_features=50)
    url_features = tfidf.fit_transform([url])  # Fit on individual URL
    feature_names = ['tfidf_' + f for f in tfidf.get_feature_names_out()]
    features.update(dict(zip(feature_names, url_features.toarray()[0])))

    return features

# Extract features
all_features = [extract_features(url) for url in all_urls]
df = pd.DataFrame(all_features)
df['label'] = labels

tfidf = TfidfVectorizer(analyzer='char', ngram_range=(3,5), max_features=50)
url_features = tfidf.fit_transform(all_urls)
feature_names = ['tfidf_' + f for f in tfidf.get_feature_names_out()]
url_feature_df = pd.DataFrame(url_features.toarray(), columns=feature_names)

# Combine all features
df_combined = pd.concat([df, url_feature_df], axis=1)


# Prepare data for modeling
X = df_combined.drop('label', axis=1)
y = df_combined['label']

# Feature selection
rfe = RFE(estimator=RandomForestClassifier(n_estimators=100, random_state=42), n_features_to_select=20)
X_selected = rfe.fit_transform(X, y)

# Perform stratified k-fold cross-validation
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = []

for train_index, test_index in skf.split(X_selected, y):
    X_train, X_test = X_selected[train_index], X_selected[test_index]
    y_train, y_test = y[train_index], y[test_index]

    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    cv_scores.append(accuracy_score(y_test, y_pred))

print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV score: {np.mean(cv_scores):.4f}")


In [None]:
# 3. Model Training and Evaluation

# Prepare data for modeling
X = df_combined.drop('label', axis=1)
y = df_combined['label']

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Train a Random Forest model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Make predictions
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]

# Evaluate the model
accuracy = accuracy_score(y_test, y_pred)
roc_auc = roc_auc_score(y_test, y_pred_proba)

print(f"Accuracy: {accuracy:.4f}")
print(f"ROC AUC: {roc_auc:.4f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

# Perform cross-validation
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = cross_val_score(model, X, y, cv=cv, scoring='roc_auc')
print("\nCross-validation ROC AUC scores:", cv_scores)
print("Mean CV ROC AUC score:", cv_scores.mean())

In [None]:
# 4. Feature Importance

feature_importance = pd.DataFrame({
    'feature': X.columns,
    'importance': model.feature_importances_
}).sort_values('importance', ascending=False)

print("\nTop 10 Most Important Features:")
print(feature_importance.head(10))

# Visualization
plt.figure(figsize=(10, 6))
plt.bar(feature_importance['feature'][:10], feature_importance['importance'][:10])
plt.xticks(rotation=45, ha='right')
plt.title('Top 10 Most Important Features')
plt.tight_layout()
plt.show()

# Update this list to match your actual features
numerical_features = ['length', 'num_dots', 'num_hyphens', 'num_digits', 'domain_length', 'num_params']

fig, axes = plt.subplots(2, 3, figsize=(20, 12))
fig.suptitle('Distribution of Features by URL Type (Updated)', fontsize=16)

axes = axes.flatten()

for i, feature in enumerate(numerical_features):
    sns.histplot(data=df_combined, x=feature, hue='label', multiple="stack", ax=axes[i])
    axes[i].set_title(f'Distribution of {feature}')
    axes[i].legend(['Legitimate', 'Phishing'])

plt.tight_layout()
plt.show()

# Additional visualization for binary features
binary_features = ['has_https', 'has_ip', 'has_at_symbol', 'has_double_slash', 'has_hex']

fig, axes = plt.subplots(2, 3, figsize=(20, 12))
fig.suptitle('Distribution of Binary Features by URL Type', fontsize=16)

axes = axes.flatten()

for i, feature in enumerate(binary_features):
    sns.countplot(data=df_combined, x=feature, hue='label', ax=axes[i])
    axes[i].set_title(f'Distribution of {feature}')
    axes[i].legend(['Legitimate', 'Phishing'])

plt.tight_layout()
plt.show()

# Correlation heatmap
plt.figure(figsize=(20, 16))
correlation_matrix = df_combined.corr()
sns.heatmap(correlation_matrix, annot=False, cmap='coolwarm', linewidths=0.5)
plt.title('Correlation Heatmap of Features')
plt.show()

# Print some statistics about the dataset
print("Statistics for Legitimate URLs:")
print(df_combined[df_combined['label'] == 0].describe())
print("\nStatistics for Phishing URLs:")
print(df_combined[df_combined['label'] == 1].describe())

# Check for any perfect separators
for feature in df_combined.columns:
    if feature != 'label':
        legitimate_max = df_combined[df_combined['label'] == 0][feature].max()
        phishing_min = df_combined[df_combined['label'] == 1][feature].min()
        if legitimate_max < phishing_min or phishing_min > legitimate_max:
            print(f"Perfect separator found: {feature}")
            print(f"Legitimate max: {legitimate_max}")
            print(f"Phishing min: {phishing_min}")


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Assuming you have your predictions in y_pred and true labels in y_test
# If not, you'll need to run your model predictions first

# Calculate the confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Create a confusion matrix display
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Legitimate', 'Phishing'])

# Plot the confusion matrix
plt.figure(figsize=(10, 8))
disp.plot(cmap='Blues', values_format='d')
plt.title('Confusion Matrix')
plt.show()

# Print out the confusion matrix values
print("Confusion Matrix:")
print(f"True Negatives: {cm[0][0]}")
print(f"False Positives: {cm[0][1]}")
print(f"False Negatives: {cm[1][0]}")
print(f"True Positives: {cm[1][1]}")

# Calculate and print additional metrics
total = sum(sum(cm))
accuracy = (cm[0][0] + cm[1][1]) / total
precision = cm[1][1] / (cm[1][1] + cm[0][1])
recall = cm[1][1] / (cm[1][1] + cm[1][0])
f1_score = 2 * precision * recall / (precision + recall)

print(f"\nAccuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1_score:.4f}")