In [2]:
# If running inside Jupyter, use the ! prefix to run shell commands
!python -m pip install -q --upgrade pip setuptools wheel

# Core ML + utilities
!pip install -q scikit-learn==1.6.0 numpy pandas joblib tqdm matplotlib seaborn shap==0.46.0 \
    tldextract uritools python-Levenshtein

# API + serving
!pip install -q fastapi==0.115.5 uvicorn[standard]==0.32.0 pydantic==2.9.2 python-multipart

# Optional: ONNX runtime for a “Local mode” inference toggle
!pip install -q onnxruntime==1.19.2

# Optional: Optuna for quick tuning if time permits
!pip install -q optuna


ERROR: Could not find a version that satisfies the requirement onnxruntime==1.19.2 (from versions: 1.20.0, 1.20.1, 1.21.0, 1.21.1, 1.22.0, 1.22.1, 1.23.0, 1.23.1, 1.23.2)
ERROR: No matching distribution found for onnxruntime==1.19.2


In [6]:
!pip install -q onnxruntime==1.23.2


In [8]:
# Notebook cell: load data and select features
import pandas as pd
import numpy as np
from pathlib import Path

DATA_PATH = Path("PhiUSIIL_Phishing_URL_Dataset.csv")  # adjust if different
use_cols = [
    "URL", "Domain", "TLD", "URLLength", "DomainLength", "TLDLength",
    "IsHTTPS", "IsDomainIP", "NoOfSubDomain",
    "NoOfLettersInURL", "NoOfDegitsInURL", "DegitRatioInURL", "LetterRatioInURL",
    "NoOfQMarkInURL", "NoOfAmpersandInURL", "NoOfEqualsInURL", "NoOfOtherSpecialCharsInURL",
    "SpacialCharRatioInURL", "HasObfuscation", "NoOfObfuscatedChar", "ObfuscationRatio",
    "label"
]

df = pd.read_csv(DATA_PATH, usecols=use_cols)
df = df.dropna()  # dataset generally has no missing, but safe
df.head(), df['label'].value_counts(normalize=True)


(                                  URL  URLLength                      Domain  \
 0    https://www.southbankmosaics.com         31    www.southbankmosaics.com   
 1            https://www.uni-mainz.de         23            www.uni-mainz.de   
 2      https://www.voicefmradio.co.uk         29      www.voicefmradio.co.uk   
 3         https://www.sfnmjournal.com         26         www.sfnmjournal.com   
 4  https://www.rewildingargentina.org         33  www.rewildingargentina.org   
 
    DomainLength  IsDomainIP  TLD  TLDLength  NoOfSubDomain  HasObfuscation  \
 0            24           0  com          3              1               0   
 1            16           0   de          2              1               0   
 2            22           0   uk          2              2               0   
 3            19           0  com          3              1               0   
 4            26           0  org          3              1               0   
 
    NoOfObfuscatedChar  ...  LetterR

In [9]:
from sklearn.model_selection import train_test_split

# Collapse rare TLDs
tld_counts = df["TLD"].fillna("unknown").value_counts()
rare = set(tld_counts[tld_counts < 50].index)
df["TLD"] = df["TLD"].fillna("unknown").apply(lambda x: "other" if x in rare else x)

feature_cols = [c for c in df.columns if c not in ["label", "URL", "Domain"]]
X = df[feature_cols].copy()
y = df["label"].astype(int)

# Ensure categorical dtype for TLD (for tree support or for one-hot)
X["TLD"] = X["TLD"].astype("category")

X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)
X_train.shape, X_val.shape


((188636, 19), (47159, 19))

In [10]:
#Train + evaluate
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import roc_auc_score, f1_score, precision_recall_curve, average_precision_score, classification_report
from sklearn.calibration import CalibratedClassifierCV
import numpy as np

# Identify categorical features (only TLD here)
categorical_features = [X_train.columns.get_loc("TLD")]

hgb = HistGradientBoostingClassifier(
    max_depth=None,
    max_leaf_nodes=31,
    learning_rate=0.08,
    max_iter=300,
    validation_fraction=0.1,
    early_stopping=True,
    random_state=42,
    categorical_features=categorical_features
)

hgb.fit(X_train, y_train)

# Calibrate probabilities (sigmoid) on validation split for better dial behavior
cal = CalibratedClassifierCV(hgb, cv="prefit", method="sigmoid")
cal.fit(X_val, y_val)

# Metrics
proba = cal.predict_proba(X_val)[:, 1]
pred = (proba >= 0.5).astype(int)

auc = roc_auc_score(y_val, proba)
ap = average_precision_score(y_val, proba)
f1 = f1_score(y_val, pred)

# Precision@90% recall (safety posture)
prec, rec, th = precision_recall_curve(y_val, proba)
target_recall = 0.90
idx = np.where(rec >= target_recall)[0]
p_at_90 = float(prec[idx[0]]) if len(idx) else float("nan")
thr_at_90 = float(th[idx[0]-1]) if len(idx) else float("nan")

print({"AUC": auc, "AP": ap, "F1@0.5": f1, "Prec@90%Rec": p_at_90, "Thr@90%Rec": thr_at_90})
print(classification_report(y_val, pred, digits=4))



{'AUC': np.float64(0.9989957719351902), 'AP': np.float64(0.9988523965504068), 'F1@0.5': 0.9978532035385128, 'Prec@90%Rec': 0.5718950783519583, 'Thr@90%Rec': 0.9999865414941713}
              precision    recall  f1-score   support

           0     0.9995    0.9948    0.9971     20189
           1     0.9961    0.9996    0.9979     26970

    accuracy                         0.9975     47159
   macro avg     0.9978    0.9972    0.9975     47159
weighted avg     0.9975    0.9975    0.9975     47159



In [11]:
# STEP J: Persist the calibrated model to disk for reuse in API
import joblib, json, os
os.makedirs("models", exist_ok=True)

joblib.dump(cal, "models/phish_url_hgb_cal.joblib")

# STEP K: Save the exact schema: column order, categorical columns, and TLD categories
schema = {
    "feature_cols": list(X_train.columns),          # exact order used by the model
    "categorical_cols": ["TLD"],
    "tld_categories": list(X_train["TLD"].cat.categories)
}
json.dump(schema, open("models/schema.json", "w"))

print("Saved: models/phish_url_hgb_cal.joblib and models/schema.json")


Saved: models/phish_url_hgb_cal.joblib and models/schema.json


In [14]:
# STEP N0b: Create scripts/featureizer.py from scratch
from pathlib import Path
import os

# Make sure scripts folder exists
Path("scripts").mkdir(exist_ok=True)

# Write the featureizer module
featureizer_code = r'''import re
from urllib.parse import urlparse
import tldextract

RARE_TLD = set()
TLD_CATS = []

def _count(pattern, s):
    return len(re.findall(pattern, s))

def features_from_url(url: str):
    u = (url or "").strip()
    parsed = urlparse(u)
    scheme = parsed.scheme or ""
    ext = tldextract.extract(u)
    domain = ".".join([p for p in [ext.subdomain, ext.domain, ext.suffix] if p])
    tld = ext.suffix or "unknown"

    url_len = len(u)
    domain_len = len(domain)
    tld_len = len(tld)
    is_https = 1 if scheme.lower() == "https" else 0
    is_ip = 1 if re.fullmatch(r"\d{1,3}(\.\d{1,3}){3}", ext.domain or "") else 0

    subdomain = ext.subdomain or ""
    no_sub = 0 if not subdomain else subdomain.count(".") + 1

    letters = _count(r"[A-Za-z]", u)
    digits = _count(r"[0-9]", u)
    others = _count(r"[^A-Za-z0-9]", u)

    letter_ratio = round(letters / url_len, 3) if url_len else 0.0
    digit_ratio  = round(digits / url_len, 3) if url_len else 0.0
    special_ratio = round(others / url_len, 3) if url_len else 0.0

    qmarks = u.count("?")
    amps = u.count("&")
    equals = u.count("=")
    other_specials = others

    U = u.upper()
    pct_tokens = ["%2F", "%3A", "%2E"]
    has_obf = 1 if ("@" in u or any(tok in U for tok in pct_tokens)) else 0
    no_obf_chars = sum(U.count(tok) for tok in pct_tokens)
    obf_ratio = round(no_obf_chars / max(1, url_len), 3)

    tld_norm = "other" if tld in RARE_TLD else (tld or "unknown")

    return {
        "URLLength": url_len,
        "DomainLength": domain_len,
        "TLDLength": tld_len,
        "IsHTTPS": is_https,
        "IsDomainIP": is_ip,
        "NoOfSubDomain": no_sub,
        "NoOfLettersInURL": letters,
        "NoOfDegitsInURL": digits,
        "DegitRatioInURL": digit_ratio,
        "LetterRatioInURL": letter_ratio,
        "NoOfQMarkInURL": qmarks,
        "NoOfAmpersandInURL": amps,
        "NoOfEqualsInURL": equals,
        "NoOfOtherSpecialCharsInURL": other_specials,
        "SpacialCharRatioInURL": special_ratio,
        "HasObfuscation": has_obf,
        "NoOfObfuscatedChar": no_obf_chars,
        "ObfuscationRatio": obf_ratio,
        "TLD": tld_norm
    }
'''

with open("scripts/featureizer.py", "w", encoding="utf-8") as f:
    f.write(featureizer_code)

print("Created scripts/featureizer.py")
print("File exists:", os.path.exists("scripts/featureizer.py"))

# Now import
from scripts.featureizer import features_from_url
print("Import successful!")


Created scripts/featureizer.py
File exists: True


ModuleNotFoundError: No module named 'scripts.featureizer'

In [15]:
import json
import re
from urllib.parse import urlparse
import tldextract
import pandas as pd
import numpy as np
from pathlib import Path

# --- Load the schema file we saved from the notebook ---
SCHEMA_PATH = Path(__file__).parent / "models/schema.json"
with open(SCHEMA_PATH, "r") as f:
    SCHEMA = json.load(f)

# Get the lists from the schema
FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATEGORIES = set(SCHEMA["tld_categories"])

# --- Helper functions to calculate features ---

def count_special_chars(url: str) -> dict:
    """Counts individual special characters."""
    counts = {
        "NoOfQMarkInURL": url.count("?"),
        "NoOfAmpersandInURL": url.count("&"),
        "NoOfEqualsInURL": url.count("="),
    }
    
    # For 'NoOfOtherSpecialCharsInURL', we count all non-alphanumeric chars
    # *except* the common ones: / . : - _
    # and the ones we already counted: ? & =
    other_chars = re.findall(r"[^a-zA-Z0-9/:._\-?&=]", url)
    counts["NoOfOtherSpecialCharsInURL"] = len(other_chars)
    return counts

def calculate_ratios(url: str, url_len: int) -> dict:
    """Calculates letter, digit, and special char ratios."""
    if url_len == 0:
        return {
            "NoOfLettersInURL": 0,
            "NoOfDegitsInURL": 0,
            "DegitRatioInURL": 0.0,
            "LetterRatioInURL": 0.0,
            "SpacialCharRatioInURL": 0.0,
        }

    letters = re.findall(r"[a-zA-Z]", url)
    digits = re.findall(r"\d", url)
    special_chars = re.findall(r"[^a-zA-Z0-9]", url) # All non-alphanumeric

    num_letters = len(letters)
    num_digits = len(digits)
    num_special = len(special_chars)

    return {
        "NoOfLettersInURL": num_letters,
        "NoOfDegitsInURL": num_digits,
        "DegitRatioInURL": num_digits / url_len,
        "LetterRatioInURL": num_letters / url_len,
        "SpacialCharRatioInURL": num_special / url_len,
    }

def check_obfuscation(url: str) -> dict:
    """Checks for URL-encoded chars (%XX) as a proxy for obfuscation."""
    # Simple check: does it contain URL encoding?
    has_obfuscation = 1 if re.search(r"%[0-9a-fA-F]{2}", url) else 0
    obfuscated_chars = re.findall(r"%[0-9a-fA-F]{2}", url)
    num_obfuscated_char = len(obfuscated_chars)
    
    # Obfuscation ratio is the number of %-encoded chars vs. total URL length
    ratio = num_obfuscated_char / len(url) if len(url) > 0 else 0.0
    
    return {
        "HasObfuscation": has_obfuscation,
        "NoOfObfuscatedChar": num_obfuscated_char,
        "ObfuscationRatio": ratio,
    }

# --- The Main Featureizer Function ---

def featureize_url(url: str) -> pd.DataFrame:
    """
    Takes a raw URL string and returns a single-row DataFrame
    matching the training schema.
    """
    # Ensure url has a scheme for proper parsing
    if not re.match(r"^[a-zA-Z]+://", url):
        url = "http://" + url
        
    features = {}
    
    try:
        # --- Basic Lengths ---
        features["URLLength"] = len(url)
        
        # --- Parse with urlparse and tldextract ---
        parsed_url = urlparse(url)
        tld_parts = tldextract.extract(url)
        
        domain = tld_parts.domain
        subdomain = tld_parts.subdomain
        tld = tld_parts.suffix
        
        features["Domain"] = f"{domain}.{tld}"
        features["DomainLength"] = len(features["Domain"])
        
        # --- TLD Handling (Critical!) ---
        features["TLDLength"] = len(tld)
        # If the TLD wasn't in our training categories, map it to 'other'
        if tld in TLD_CATEGORIES:
            features["TLD"] = tld
        else:
            features["TLD"] = "other"
            
        # --- Protocol and IP ---
        features["IsHTTPS"] = 1 if parsed_url.scheme == "https" else 0
        features["IsDomainIP"] = 1 if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", parsed_url.netloc) else 0
        
        # --- Subdomain Count ---
        if subdomain:
            features["NoOfSubDomain"] = len(subdomain.split('.'))
        else:
            features["NoOfSubDomain"] = 0
            
        # --- Ratios and Obfuscation ---
        features.update(calculate_ratios(url, features["URLLength"]))
        features.update(count_special_chars(url))
        features.update(check_obfuscation(url))

        # --- Create DataFrame ---
        # Create a single-row DataFrame
        df_row = pd.DataFrame([features])
        
        # --- Final Schema Alignment (Most Important Step) ---
        
        # 1. Set TLD to the correct categorical type
        df_row["TLD"] = pd.Categorical(df_row["TLD"], categories=SCHEMA["tld_categories"])
        
        # 2. Re-order all columns to *exactly* match the training order
        df_row = df_row[FEATURE_COLS]
        
        return df_row

    except Exception as e:
        print(f"Error featureizing URL {url}: {e}")
        # On error, return a DataFrame of NaNs/Zeros, matching the schema
        error_df = pd.DataFrame(columns=FEATURE_COLS)
        error_df.loc[0] = np.nan # Fill with NaN
        # Fill TLD with the first known category to avoid dtype errors
        error_df["TLD"] = pd.Categorical(SCHEMA["tld_categories"][0], categories=SCHEMA["tld_categories"])
        error_df = error_df.fillna(0) # Convert NaNs to 0
        return error_df

# --- Self-Test (to run this file directly) ---
if __name__ == "__main__":
    test_url = "https://www.google-login.com/auth/path?user=1&session=abc%20"
    
    print(f"Testing featureizer with URL: {test_url}")
    features_df = featureize_url(test_url)
    
    print("\n--- Featureizer Output ---")
    print(features_df.to_string())
    
    print("\n--- Output dtypes ---")
    print(features_df.info())
    
    # Test a rare TLD
    test_url_rare = "http://my.bank.login.zz" # .zz is not a real TLD
    print(f"\nTesting rare TLD: {test_url_rare}")
    rare_df = featureize_url(test_url_rare)
    print(f"Mapped TLD: {rare_df['TLD'].iloc[0]}")
    
    # Test an IP
    test_url_ip = "http://192.168.1.1/login.php"
    print(f"\nTesting IP URL: {test_url_ip}")
    ip_df = featureize_url(test_url_ip)
    print(f"IsDomainIP: {ip_df['IsDomainIP'].iloc[0]}")

NameError: name '__file__' is not defined

In [16]:
# Notebook-friendly schema loader (no __file__)
import json
from pathlib import Path

SCHEMA_PATH = Path("models/schema.json")  # relative to your notebook CWD
if not SCHEMA_PATH.exists():
    # If notebook is inside a subfolder, try parent
    alt = Path("..") / "models" / "schema.json"
    if alt.exists():
        SCHEMA_PATH = alt
    else:
        raise FileNotFoundError(f"schema.json not found at {SCHEMA_PATH} or {alt}")

with open(SCHEMA_PATH, "r", encoding="utf-8") as f:
    SCHEMA = json.load(f)

FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATEGORIES = SCHEMA["tld_categories"]  # keep as list; set/categorical later


In [17]:
# Make sure the 'scripts' package is importable from the notebook
import sys, pathlib

project_root = pathlib.Path(".").resolve()  # adjust if notebook is inside a subfolder
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

# Clear any stale failed import
for k in list(sys.modules.keys()):
    if k.startswith("scripts"):
        del sys.modules[k]

from scripts.featureizer import features_from_url  # should work now


In [18]:
# STEP N: Quick end‑to‑end sanity predictions using the calibrated model 'cal'
import json, pandas as pd

# Load schema to align columns
SCHEMA = json.load(open("models/schema.json"))
FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATEGORIES = SCHEMA["tld_categories"]

from scripts.featureizer import features_from_url  # import now that it works

tests = [
    "https://www.google.com/",
    "http://192.168.1.50/login.php?user=guest",
    "https://secure-paypa1.com.verify-account.co/reset?session=abc123",
    "http://example.com/%2F%2E%2E/redirect?to=http://bad.ru",
    "https://www.uni-mainz.de/",
]

def predict_one(u):
    feats = features_from_url(u)
    X = pd.DataFrame([feats])[FEATURE_COLS]
    p = float(cal.predict_proba(X)[:,1][0])
    label = "phishing" if p >= 0.5 else "benign"
    return u, p, label, feats

for u in tests:
    url, p, label, f = predict_one(u)
    print(f"{url}\n  prob={p:.4f} -> {label}  (len={f['URLLength']}, tld={f['TLD']}, subs={f['NoOfSubDomain']}, https={f['IsHTTPS']})")


https://www.google.com/
  prob=0.0000 -> benign  (len=23, tld=com, subs=1, https=1)
http://192.168.1.50/login.php?user=guest
  prob=0.0000 -> benign  (len=40, tld=unknown, subs=0, https=0)
https://secure-paypa1.com.verify-account.co/reset?session=abc123
  prob=0.0000 -> benign  (len=64, tld=co, subs=2, https=1)
http://example.com/%2F%2E%2E/redirect?to=http://bad.ru
  prob=0.0000 -> benign  (len=54, tld=com, subs=0, https=0)
https://www.uni-mainz.de/
  prob=0.0000 -> benign  (len=25, tld=de, subs=1, https=1)


In [19]:
# Align to training schema on every inference call
import json, pandas as pd

SCHEMA = json.load(open("models/schema.json", "r", encoding="utf-8"))
FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATEGORIES = SCHEMA["tld_categories"]  # ordered list

def align_to_schema(df_row: pd.DataFrame) -> pd.DataFrame:
    # 1) Coerce missing columns (if any) to 0
    for c in FEATURE_COLS:
        if c not in df_row.columns:
            df_row[c] = 0
    # 2) Extra columns (if any) are dropped
    df_row = df_row[FEATURE_COLS]
    # 3) Categorical TLD with training categories (very important)
    df_row["TLD"] = pd.Categorical(df_row["TLD"], categories=TLD_CATEGORIES)
    return df_row


In [20]:
from scripts.featureizer import features_from_url

def predict_one(u):
    feats = features_from_url(u)                 # dict of 19 features
    X = pd.DataFrame([feats])
    X = align_to_schema(X)                       # enforce order and categorical TLD
    p = float(cal.predict_proba(X)[:,1][0])
    label = "phishing" if p >= 0.5 else "benign"
    return u, p, label, feats

for u in tests:
    url, p, label, f = predict_one(u)
    print(f"{url}\n  prob={p:.4f} -> {label}  (len={f['URLLength']}, tld={f['TLD']}, subs={f['NoOfSubDomain']}, https={f['IsHTTPS']})")


https://www.google.com/
  prob=0.0000 -> benign  (len=23, tld=com, subs=1, https=1)
http://192.168.1.50/login.php?user=guest
  prob=0.0000 -> benign  (len=40, tld=unknown, subs=0, https=0)
https://secure-paypa1.com.verify-account.co/reset?session=abc123
  prob=0.0000 -> benign  (len=64, tld=co, subs=2, https=1)
http://example.com/%2F%2E%2E/redirect?to=http://bad.ru
  prob=0.0000 -> benign  (len=54, tld=com, subs=0, https=0)
https://www.uni-mainz.de/
  prob=0.0000 -> benign  (len=25, tld=de, subs=1, https=1)


In [21]:
# Cell 1: Load schema + model and define a strict aligner
import json, pandas as pd, joblib

SCHEMA = json.load(open("models/schema.json", "r", encoding="utf-8"))
FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATEGORIES = SCHEMA["tld_categories"]  # ordered list

# Always reload the model to avoid stale state
cal = joblib.load("models/phish_url_hgb_cal.joblib")

def align_to_schema(df_row: pd.DataFrame) -> pd.DataFrame:
    # Add any missing columns as zeros
    for c in FEATURE_COLS:
        if c not in df_row.columns:
            df_row[c] = 0
    # Drop extras and enforce order
    df_row = df_row[FEATURE_COLS]
    # Enforce categorical dtype for TLD with training categories
    df_row["TLD"] = pd.Categorical(df_row["TLD"], categories=TLD_CATEGORIES)
    # Force numeric dtypes for all numerics (except TLD)
    for c in FEATURE_COLS:
        if c != "TLD":
            df_row[c] = pd.to_numeric(df_row[c], errors="coerce").fillna(0)
    return df_row


In [22]:
# Cell 2: Featureize -> align -> predict; print debug to confirm columns/dtypes
from scripts.featureizer import features_from_url

def predict_one_debug(u: str):
    feats = features_from_url(u)                         # dict of 19 live features
    X = pd.DataFrame([feats])
    X = align_to_schema(X)                               # critical alignment
    # Debug checks
    print("Cols match:", list(X.columns) == FEATURE_COLS)
    print("TLD dtype:", X["TLD"].dtype)
    print("Unique TLD value:", X["TLD"].unique().tolist())
    print("Sample row:\n", X.head(1).to_string(index=False))
    # Predict
    p = float(cal.predict_proba(X)[:, 1][0])
    label = "phishing" if p >= 0.5 else "benign"
    print(f"prob={p:.4f} -> {label}\n")
    return p, label

tests = [
    "https://www.google.com/",
    "http://192.168.1.50/login.php?user=guest",
    "https://secure-paypa1.com.verify-account.co/reset?session=abc123",
    "http://example.com/%2F%2E%2E/redirect?to=http://bad.ru",
    "https://www.uni-mainz.de/",
]
for u in tests:
    print(u)
    predict_one_debug(u)


https://www.google.com/
Cols match: True
TLD dtype: category
Unique TLD value: ['com']
Sample row:
  URLLength  DomainLength  IsDomainIP TLD  TLDLength  NoOfSubDomain  HasObfuscation  NoOfObfuscatedChar  ObfuscationRatio  NoOfLettersInURL  LetterRatioInURL  NoOfDegitsInURL  DegitRatioInURL  NoOfEqualsInURL  NoOfQMarkInURL  NoOfAmpersandInURL  NoOfOtherSpecialCharsInURL  SpacialCharRatioInURL  IsHTTPS
        23            14           0 com          3              1               0                   0               0.0                17             0.739                0              0.0                0               0                   0                           6                  0.261        1
prob=0.0000 -> benign

http://192.168.1.50/login.php?user=guest
Cols match: True
TLD dtype: category
Unique TLD value: [nan]
Sample row:
  URLLength  DomainLength  IsDomainIP TLD  TLDLength  NoOfSubDomain  HasObfuscation  NoOfObfuscatedChar  ObfuscationRatio  NoOfLettersInURL  LetterRatioInU

In [23]:
# Patch: robust schema alignment handling NaN/unknown TLD and numeric types
import json, pandas as pd, joblib
from scripts.featureizer import features_from_url

SCHEMA = json.load(open("models/schema.json", "r", encoding="utf-8"))
FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATEGORIES = SCHEMA["tld_categories"]  # must include "other"
assert "TLD" in FEATURE_COLS, "TLD must be part of features"
assert "other" in TLD_CATEGORIES, "'other' must be in training categories"

cal = joblib.load("models/phish_url_hgb_cal.joblib")

def align_to_schema_strict(feats: dict) -> pd.DataFrame:
    X = pd.DataFrame([feats])
    # 1) Ensure TLD exists; map empty/NaN to 'other'
    if ("TLD" not in X.columns) or (pd.isna(X.at[0, "TLD"])) or (str(X.at[0, "TLD"]).strip() == ""):
        X["TLD"] = "other"
    else:
        t = str(X.at[0, "TLD"])
        X.at[0, "TLD"] = t if t in TLD_CATEGORIES else "other"
    # 2) Add missing columns as zeros
    for c in FEATURE_COLS:
        if c not in X.columns:
            X[c] = 0
    # 3) Drop extra cols, enforce order
    X = X[FEATURE_COLS]
    # 4) Cast numerics and TLD categorical
    for c in FEATURE_COLS:
        if c != "TLD":
            X[c] = pd.to_numeric(X[c], errors="coerce").fillna(0)
    X["TLD"] = pd.Categorical(X["TLD"], categories=TLD_CATEGORIES)
    return X

def predict_one_fixed(u: str):
    feats = features_from_url(u)
    X = align_to_schema_strict(feats)
    p = float(cal.predict_proba(X)[:, 1][0])
    label = "phishing" if p >= 0.5 else "benign"
    return p, label, X

tests = [
    "https://www.google.com/",
    "http://192.168.1.50/login.php?user=guest",
    "https://secure-paypa1.com.verify-account.co/reset?session=abc123",
    "http://example.com/%2F%2E%2E/redirect?to=http://bad.ru",
    "https://www.uni-mainz.de/",
]
for u in tests:
    p, label, X = predict_one_fixed(u)
    print(u, "\n prob=", round(p, 4), "->", label, "| TLD:", X.at[0,"TLD"], "| IsHTTPS:", X.at[0,"IsHTTPS"], "| len:", X.at[0,"URLLength"])


https://www.google.com/ 
 prob= 0.0 -> benign | TLD: com | IsHTTPS: 1 | len: 23
http://192.168.1.50/login.php?user=guest 
 prob= 0.0 -> benign | TLD: other | IsHTTPS: 0 | len: 40
https://secure-paypa1.com.verify-account.co/reset?session=abc123 
 prob= 0.0 -> benign | TLD: co | IsHTTPS: 1 | len: 64
http://example.com/%2F%2E%2E/redirect?to=http://bad.ru 
 prob= 0.0 -> benign | TLD: com | IsHTTPS: 0 | len: 54
https://www.uni-mainz.de/ 
 prob= 0.0 -> benign | TLD: de | IsHTTPS: 1 | len: 25


In [24]:
import json
SCHEMA = json.load(open("models/schema.json","r",encoding="utf-8"))
print("FEATURE_COLS (len):", len(SCHEMA["feature_cols"]))
print(SCHEMA["feature_cols"])
print("TLD categories sample:", SCHEMA["tld_categories"][:10])


FEATURE_COLS (len): 19
['URLLength', 'DomainLength', 'IsDomainIP', 'TLD', 'TLDLength', 'NoOfSubDomain', 'HasObfuscation', 'NoOfObfuscatedChar', 'ObfuscationRatio', 'NoOfLettersInURL', 'LetterRatioInURL', 'NoOfDegitsInURL', 'DegitRatioInURL', 'NoOfEqualsInURL', 'NoOfQMarkInURL', 'NoOfAmpersandInURL', 'NoOfOtherSpecialCharsInURL', 'SpacialCharRatioInURL', 'IsHTTPS']
TLD categories sample: ['ae', 'ai', 'am', 'app', 'ar', 'art', 'asia', 'at', 'au', 'az']


In [25]:
import json, pandas as pd, joblib

SCHEMA = json.load(open("models/schema.json","r",encoding="utf-8"))
FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATS = SCHEMA["tld_categories"]  # ordered list
cal = joblib.load("models/phish_url_hgb_cal.joblib")

def align_to_schema_strict(feats: dict) -> pd.DataFrame:
    X = pd.DataFrame([feats])
    # Normalize TLD
    if "TLD" not in X or pd.isna(X.at[0,"TLD"]) or str(X.at[0,"TLD"]).strip()=="":
        X["TLD"] = "other"
    else:
        t = str(X.at[0,"TLD"]).lower()
        X.at[0,"TLD"] = t if t in TLD_CATS else "other"
    # Ensure all expected cols exist; extras dropped
    for c in FEATURE_COLS:
        if c not in X.columns:
            X[c] = 0
    X = X[FEATURE_COLS]
    # Dtypes
    for c in FEATURE_COLS:
        if c != "TLD":
            X[c] = pd.to_numeric(X[c], errors="coerce").fillna(0)
    X["TLD"] = pd.Categorical(X["TLD"], categories=TLD_CATS)
    return X


In [26]:
# Should print non-trivial values if the model is healthy
import numpy as np
print("val proba sample:", np.round(cal.predict_proba(X_val[:5])[:,1], 4).tolist())


val proba sample: [0.9969, 0.9959, 0.9983, 0.996, 0.9978]


In [None]:
# Again

In [28]:
# STEP A1: Recreate X,y strictly from the 19 live-computable columns in your schema
import pandas as pd
from pathlib import Path

DATA_PATH = Path("PhiUSIIL_Phishing_URL_Dataset.csv")

feature_cols = [
    "URLLength","DomainLength","IsDomainIP","TLD","TLDLength",
    "NoOfSubDomain","HasObfuscation","NoOfObfuscatedChar","ObfuscationRatio",
    "NoOfLettersInURL","LetterRatioInURL","NoOfDegitsInURL","DegitRatioInURL",
    "NoOfEqualsInURL","NoOfQMarkInURL","NoOfAmpersandInURL",
    "NoOfOtherSpecialCharsInURL","SpacialCharRatioInURL","IsHTTPS"
]

use_cols = ["URL","Domain"] + feature_cols + ["label"]
df = pd.read_csv(DATA_PATH, usecols=use_cols).dropna()

# Rare TLD collapse: EXACTLY as used for training previously (threshold 50)
tld_counts = df["TLD"].fillna("unknown").str.lower().value_counts()
rare = set(tld_counts[tld_counts < 50].index)
df["TLD"] = df["TLD"].fillna("unknown").str.lower().apply(lambda x: "other" if x in rare else x)

X = df[feature_cols].copy()
y = df["label"].astype(int)

# TLD categorical with stable order (sorted ensures determinism)
tld_order = sorted(X["TLD"].astype(str).unique().tolist())
if "other" not in tld_order:
    tld_order = ["other"] + [t for t in tld_order if t != "other"]
X["TLD"] = pd.Categorical(X["TLD"], categories=tld_order)

from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.20, stratify=y, random_state=42
)
X_train.shape, X_val.shape


((188636, 19), (47159, 19))

In [29]:
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import roc_auc_score, average_precision_score, f1_score, precision_recall_curve
from sklearn.calibration import CalibratedClassifierCV
import numpy as np

categorical_features = [X_train.columns.get_loc("TLD")]

hgb = HistGradientBoostingClassifier(
    max_leaf_nodes=31,
    learning_rate=0.08,
    max_iter=300,
    validation_fraction=0.1,
    early_stopping=True,
    random_state=42,
    categorical_features=categorical_features
)
hgb.fit(X_train, y_train)

cal = CalibratedClassifierCV(hgb, cv="prefit", method="sigmoid")
cal.fit(X_val, y_val)

proba = cal.predict_proba(X_val)[:,1]
pred = (proba >= 0.5).astype(int)

auc = roc_auc_score(y_val, proba)
ap = average_precision_score(y_val, proba)
f1 = f1_score(y_val, pred)

prec, rec, th = precision_recall_curve(y_val, proba)
target_recall = 0.90
idx = np.where(rec >= target_recall)[0]
p_at_90 = float(prec[idx[0]]) if len(idx) else float("nan")
thr_at_90 = float(th[idx[0]-1]) if len(idx) else float("nan")

print({"AUC":auc, "AP":ap, "F1@0.5":f1, "Prec@90%Rec":p_at_90, "Thr@90%Rec":thr_at_90})




{'AUC': np.float64(0.9989957719351902), 'AP': np.float64(0.9988523965504068), 'F1@0.5': 0.9978532035385128, 'Prec@90%Rec': 0.5718950783519583, 'Thr@90%Rec': 0.9999865414941713}


In [30]:
import json, joblib, os
os.makedirs("models", exist_ok=True)

joblib.dump(cal, "models/phish_url_hgb_cal.joblib")
SCHEMA = {
    "feature_cols": list(X_train.columns),
    "tld_categories": list(X_train["TLD"].cat.categories)
}
json.dump(SCHEMA, open("models/schema.json","w"), ensure_ascii=False)
print("Saved model + schema for 19 live features")


Saved model + schema for 19 live features


In [31]:
import re, json
from urllib.parse import urlparse
import tldextract
import pandas as pd

SCHEMA = json.load(open("models/schema.json","r",encoding="utf-8"))
FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATS = SCHEMA["tld_categories"]

def featureize_url(url: str) -> pd.DataFrame:
    u = (url or "").strip()
    if not re.match(r"^[a-zA-Z]+://", u):
        u = "http://" + u
    parsed = urlparse(u)
    ext = tldextract.extract(u)
    tld = (ext.suffix or "unknown").lower()

    url_len = len(u)
    domain = ".".join([p for p in [ext.subdomain, ext.domain, ext.suffix] if p])
    domain_len = len(domain)

    is_https = 1 if (parsed.scheme or "").lower()=="https" else 0
    is_ip = 1 if re.fullmatch(r"\d{1,3}(\.\d{1,3}){3}", ext.domain or "") else 0
    subdomain = ext.subdomain or ""
    no_sub = 0 if not subdomain else subdomain.count(".") + 1

    letters = len(re.findall(r"[A-Za-z]", u))
    digits  = len(re.findall(r"[0-9]", u))
    others  = len(re.findall(r"[^A-Za-z0-9]", u))

    letter_ratio = (letters/url_len) if url_len else 0.0
    digit_ratio  = (digits/url_len)  if url_len else 0.0
    special_ratio= (others/url_len)  if url_len else 0.0

    qmarks = u.count("?")
    amps   = u.count("&")
    equals = u.count("=")

    U = u.upper()
    pct_tokens = ["%2F","%3A","%2E"]
    has_obf = 1 if ("@" in u or any(tok in U for tok in pct_tokens)) else 0
    no_obf = sum(U.count(tok) for tok in pct_tokens)
    obf_ratio = (no_obf/url_len) if url_len else 0.0

    tld_norm = tld if tld in TLD_CATS else "other"

    row = {
        "URLLength": url_len,
        "DomainLength": domain_len,
        "IsDomainIP": is_ip,
        "TLD": tld_norm,
        "TLDLength": len(tld),
        "NoOfSubDomain": no_sub,
        "HasObfuscation": has_obf,
        "NoOfObfuscatedChar": no_obf,
        "ObfuscationRatio": obf_ratio,
        "NoOfLettersInURL": letters,
        "LetterRatioInURL": letter_ratio,
        "NoOfDegitsInURL": digits,
        "DegitRatioInURL": digit_ratio,
        "NoOfEqualsInURL": equals,
        "NoOfQMarkInURL": qmarks,
        "NoOfAmpersandInURL": amps,
        "NoOfOtherSpecialCharsInURL": others,
        "SpacialCharRatioInURL": special_ratio,
        "IsHTTPS": is_https
    }
    X = pd.DataFrame([row])
    # enforce order and dtypes
    for c in FEATURE_COLS:
        if c not in X.columns: X[c]=0
    X = X[FEATURE_COLS]
    for c in FEATURE_COLS:
        if c!="TLD":
            X[c]=pd.to_numeric(X[c], errors="coerce").fillna(0)
    X["TLD"] = pd.Categorical(X["TLD"], categories=TLD_CATS)
    return X


In [36]:
import joblib
import json
import re
from urllib.parse import urlparse
import tldextract
import pandas as pd
import numpy as np
from pathlib import Path

# --- 1. Load Schema and Model (Fixed for Notebook) ---
# This path works from your notebook, assuming 'models' folder is in the same directory
SCHEMA_PATH = Path("models/schema.json")
MODEL_PATH = Path("models/phish_url_hgb_cal.joblib")

try:
    with open(SCHEMA_PATH, "r") as f:
        SCHEMA = json.load(f)
    
    # Load the model
    cal = joblib.load(MODEL_PATH)
    
    # Get lists from the schema
    FEATURE_COLS = SCHEMA["feature_cols"]
    TLD_CATEGORIES = set(SCHEMA["tld_categories"])
    
    print("Model and schema.json loaded successfully.")

except FileNotFoundError:
    print(f"FATAL: Could not find '{SCHEMA_PATH}' or '{MODEL_PATH}'.")
    print("Please make sure these files are in a 'models' folder in the same directory as your notebook.")
    # Create dummy vars so the rest of the cell can run
    FEATURE_COLS = []
    TLD_CATEGORIES = set()
    cal = None

# --- 2. Helper Functions (Required by the Featureizer) ---

def count_special_chars(url: str) -> dict:
    counts = {
        "NoOfQMarkInURL": url.count("?"),
        "NoOfAmpersandInURL": url.count("&"),
        "NoOfEqualsInURL": url.count("="),
    }
    other_chars = re.findall(r"[^a-zA-Z0-9/:._\-?&=]", url)
    counts["NoOfOtherSpecialCharsInURL"] = len(other_chars)
    return counts

def calculate_ratios(url: str, url_len: int) -> dict:
    if url_len == 0:
        return {"NoOfLettersInURL": 0, "NoOfDegitsInURL": 0, "DegitRatioInURL": 0.0, "LetterRatioInURL": 0.0, "SpacialCharRatioInURL": 0.0}

    letters = re.findall(r"[a-zA-Z]", url)
    digits = re.findall(r"\d", url)
    special_chars = re.findall(r"[^a-zA-Z0-9]", url)
    num_letters, num_digits, num_special = len(letters), len(digits), len(special_chars)

    return {
        "NoOfLettersInURL": num_letters,
        "NoOfDegitsInURL": num_digits,
        "DegitRatioInURL": num_digits / url_len if url_len > 0 else 0.0,
        "LetterRatioInURL": num_letters / url_len if url_len > 0 else 0.0,
        "SpacialCharRatioInURL": num_special / url_len if url_len > 0 else 0.0,
    }

def check_obfuscation(url: str) -> dict:
    has_obfuscation = 1 if re.search(r"%[0-9a-fA-F]{2}", url) else 0
    obfuscated_chars = re.findall(r"%[0-9a-fA-F]{2}", url)
    num_obfuscated_char = len(obfuscated_chars)
    ratio = num_obfuscated_char / len(url) if len(url) > 0 else 0.0
    return {"HasObfuscation": has_obfuscation, "NoOfObfuscatedChar": num_obfuscated_char, "ObfuscationRatio": ratio}

# --- 3. The Corrected featureize_url Function ---
# (This definition will override any old ones)

def featureize_url(url: str) -> pd.DataFrame:
    """
    Takes a raw URL string and returns a single-row DataFrame
    matching the training schema.
    """
    if not re.match(r"^[a-zA-Z]+://", url):
        url = "http://" + url
        
    features = {}
    
    try:
        features["URLLength"] = len(url)
        
        parsed_url = urlparse(url)
        tld_parts = tldextract.extract(url)
        
        # FIX: Use netloc (hostname) for 'Domain' to match CSV
        hostname = parsed_url.netloc
        if not hostname: # Fallback for simple strings like 'badsite.com'
             hostname = tld_parts.subdomain + "." + tld_parts.domain + "." + tld_parts.suffix
             hostname = hostname.strip('.')
        
        features["DomainLength"] = len(hostname)
        
        tld = tld_parts.suffix
        subdomain_str = tld_parts.subdomain
        
        # FIX: Check for IP Address
        is_ip = 1 if re.match(r"^\d{1,3}(\.\d{1,3}){3}$", tld_parts.domain) and tld_parts.suffix == '' else 0
        features["IsDomainIP"] = is_ip
        
        if is_ip:
            # Logic for IPs
            features["TLD"] = "other"  # Map IPs to 'other' as they have no TLD
            features["TLDLength"] = 0
            features["NoOfSubDomain"] = 0
        else:
            # Logic for normal domains
            features["TLDLength"] = len(tld)
            features["TLD"] = "other" if tld not in TLD_CATEGORIES else tld
            features["NoOfSubDomain"] = len(subdomain_str.split('.')) if subdomain_str else 0
        
        features["IsHTTPS"] = 1 if parsed_url.scheme == "https" else 0
        
        features.update(calculate_ratios(url, features["URLLength"]))
        features.update(count_special_chars(url))
        features.update(check_obfuscation(url))

        df_row = pd.DataFrame([features])
        
        # --- Final Schema Alignment ---
        df_row["TLD"] = pd.Categorical(df_row["TLD"], categories=SCHEMA["tld_categories"])
        
        for col in FEATURE_COLS:
            if col not in df_row.columns:
                df_row[col] = 0
                
        df_row = df_row[FEATURE_COLS]
        df_row = df_row.fillna(0) # Final safety net
        
        return df_row

    except Exception as e:
        print(f"Error featureizing URL {url}: {e}")
        error_df = pd.DataFrame(columns=FEATURE_COLS)
        error_df.loc[0] = 0
        error_df["TLD"] = pd.Categorical("other", categories=SCHEMA["tld_categories"])
        return error_df.fillna(0)

# --- 4. Your Test Script (Now using the function defined above) ---

if cal:
    print("\n--- Running Tests ---")
    tests = [
        "https://www.google.com/",
        "http://192.168.1.50/login.php?user=guest",
        "https://secure-paypa1.com.verify-account.co/reset?session=abc123", # Phishing
        "http://example.com/%2F%2E%2E/redirect?to=http://bad.ru", # Obfuscation
        "https://www.uni-mainz.de/",
    ]
    for u in tests:
        Xinf = featureize_url(u)
        p = float(cal.predict_proba(Xinf)[:,1][0])
        print(u, "->", round(p, 4))
else:
    print("\nTests skipped because model could not be loaded.")

Model and schema.json loaded successfully.

--- Running Tests ---
Error featureizing URL https://www.google.com/: Cannot setitem on a Categorical with a new category (0), set the categories first


TypeError: Categorical input must be list-like

In [37]:
import json, pandas as pd, re, joblib
from urllib.parse import urlparse
import tldextract

# Load schema/model
SCHEMA = json.load(open("models/schema.json","r",encoding="utf-8"))
FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATS = SCHEMA["tld_categories"]  # ordered, should include "other"
cal = joblib.load("models/phish_url_hgb_cal.joblib")

def _count_other_specials(u: str) -> int:
    # Match training: count all non-alphanumeric as "other specials"
    return len(re.findall(r"[^A-Za-z0-9]", u))

def featureize_url(url: str) -> pd.DataFrame:
    u = (url or "").strip()
    if not re.match(r"^[a-zA-Z]+://", u):
        u = "http://" + u

    parsed = urlparse(u)
    ext = tldextract.extract(u)
    tld_raw = (ext.suffix or "unknown").lower()

    url_len = len(u)
    domain = ".".join([p for p in [ext.subdomain, ext.domain, ext.suffix] if p])
    domain_len = len(domain)

    is_https = 1 if (parsed.scheme or "").lower() == "https" else 0
    is_ip = 1 if re.fullmatch(r"\d{1,3}(\.\d{1,3}){3}", ext.domain or "") else 0
    subdomain = ext.subdomain or ""
    no_sub = 0 if not subdomain else subdomain.count(".") + 1

    letters = len(re.findall(r"[A-Za-z]", u))
    digits  = len(re.findall(r"[0-9]", u))
    others  = _count_other_specials(u)

    letter_ratio  = (letters/url_len) if url_len else 0.0
    digit_ratio   = (digits/url_len)  if url_len else 0.0
    special_ratio = (others/url_len)  if url_len else 0.0

    qmarks = u.count("?")
    amps   = u.count("&")
    equals = u.count("=")

    U = u.upper()
    pct_tokens = ["%2F","%3A","%2E"]
    has_obf = 1 if ("@" in u or any(tok in U for tok in pct_tokens)) else 0
    no_obf = sum(U.count(tok) for tok in pct_tokens)
    obf_ratio = (no_obf/url_len) if url_len else 0.0

    # Map TLD into training categories
    tld_norm = tld_raw if tld_raw in TLD_CATS else ("other" if "other" in TLD_CATS else TLD_CATS[0])

    row = {
        "URLLength": url_len,
        "DomainLength": domain_len,
        "IsDomainIP": is_ip,
        "TLD": tld_norm,
        "TLDLength": len(tld_raw),
        "NoOfSubDomain": no_sub,
        "HasObfuscation": has_obf,
        "NoOfObfuscatedChar": no_obf,
        "ObfuscationRatio": obf_ratio,
        "NoOfLettersInURL": letters,
        "LetterRatioInURL": letter_ratio,
        "NoOfDegitsInURL": digits,
        "DegitRatioInURL": digit_ratio,
        "NoOfEqualsInURL": equals,
        "NoOfQMarkInURL": qmarks,
        "NoOfAmpersandInURL": amps,
        "NoOfOtherSpecialCharsInURL": others,
        "SpacialCharRatioInURL": special_ratio,
        "IsHTTPS": is_https
    }

    X = pd.DataFrame([row])
    # Ensure all expected columns exist
    for c in FEATURE_COLS:
        if c not in X.columns:
            X[c] = 0

    # Reorder
    X = X[FEATURE_COLS]

    # Cast numerics and set categorical LAST (avoid fillna on categorical)
    for c in FEATURE_COLS:
        if c != "TLD":
            X[c] = pd.to_numeric(X[c], errors="coerce").fillna(0)
    X["TLD"] = pd.Categorical(X["TLD"], categories=TLD_CATS)

    return X


In [38]:
# Rebuild X directly from URLs for perfect parity
urls = df["URL"].tolist()
rows = []
for u in urls:
    rows.append(featureize_url(u).iloc[0].to_dict())
X2 = pd.DataFrame(rows)[FEATURE_COLS]
y2 = df["label"].astype(int)

# Ensure TLD categorical matches and numerics are numeric
for c in FEATURE_COLS:
    if c != "TLD":
        X2[c] = pd.to_numeric(X2[c], errors="coerce").fillna(0)
X2["TLD"] = pd.Categorical(X2["TLD"], categories=TLD_CATS)

from sklearn.model_selection import train_test_split
Xtr, Xva, ytr, yva = train_test_split(X2, y2, test_size=0.20, stratify=y2, random_state=42)

from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import roc_auc_score, average_precision_score, f1_score, precision_recall_curve
import numpy as np, joblib, json, os

categorical_features = [Xtr.columns.get_loc("TLD")]
hgb = HistGradientBoostingClassifier(
    max_leaf_nodes=31, learning_rate=0.08, max_iter=300,
    validation_fraction=0.1, early_stopping=True, random_state=42,
    categorical_features=categorical_features
)
hgb.fit(Xtr, ytr)
cal2 = CalibratedClassifierCV(hgb, cv="prefit", method="sigmoid")
cal2.fit(Xva, yva)

proba2 = cal2.predict_proba(Xva)[:,1]
pred2 = (proba2 >= 0.5).astype(int)
auc2 = roc_auc_score(yva, proba2)
ap2 = average_precision_score(yva, proba2)
f12 = f1_score(yva, pred2)
prec, rec, th = precision_recall_curve(yva, proba2)
idx = np.where(rec >= 0.90)[0]
p_at_90 = float(prec[idx[0]]) if len(idx) else float("nan")
thr_at_90 = float(th[idx[0]-1]) if len(idx) else float("nan")
print({"AUC":auc2, "AP":ap2, "F1@0.5":f12, "Prec@90%Rec":p_at_90, "Thr@90%Rec":thr_at_90})

# Save parity-locked model and schema
os.makedirs("models", exist_ok=True)
joblib.dump(cal2, "models/phish_url_hgb_cal.joblib")
SCHEMA2 = {"feature_cols": FEATURE_COLS, "tld_categories": TLD_CATS}
json.dump(SCHEMA2, open("models/schema.json","w"), ensure_ascii=False)




{'AUC': np.float64(0.9987836303990693), 'AP': np.float64(0.9986261356966399), 'F1@0.5': 0.9970950134147469, 'Prec@90%Rec': 0.5718950783519583, 'Thr@90%Rec': 0.9999994336470258}


In [39]:
import joblib
import json
import re
from urllib.parse import urlparse
import tldextract
import pandas as pd
import numpy as np
from pathlib import Path

# --- Load the NEWLY SAVED Schema and Model ---
SCHEMA_PATH = Path("models/schema.json")
MODEL_PATH = Path("models/phish_url_hgb_cal.joblib") # This should now point to the retrained model

try:
    with open(SCHEMA_PATH, "r") as f:
        SCHEMA = json.load(f)
    cal = joblib.load(MODEL_PATH) # Load the RETRAINED model
    FEATURE_COLS = SCHEMA["feature_cols"]
    TLD_CATEGORIES = SCHEMA["tld_categories"]
    print("Retrained model and schema loaded successfully.")
except FileNotFoundError:
    print(f"FATAL: Could not find '{SCHEMA_PATH}' or '{MODEL_PATH}'.")
    FEATURE_COLS = []
    TLD_CATEGORIES = []
    cal = None

# --- Use the SAME featureize_url function ---
# (Make sure the definition from the retraining cell is still active in your notebook's memory,
# or redefine it here if necessary)
# Define featureize_url and helpers (count_special_chars, calculate_ratios, check_obfuscation)
# ... [Paste the entire corrected featureize_url function definition here again if needed] ...

# --- Run the Tests ---
if cal:
    print("\n--- Running Tests with Retrained Model ---")
    tests = [
        "https://www.google.com/",
        "http://192.168.1.50/login.php?user=guest",
        "https://secure-paypa1.com.verify-account.co/reset?session=abc123", # Phishing
        "http://example.com/%2F%2E%2E/redirect?to=http://bad.ru", # Obfuscation/Phishing
        "https://www.uni-mainz.de/",
    ]
    for u in tests:
        Xinf = featureize_url(u)
        if not Xinf.empty and cal:
            p = float(cal.predict_proba(Xinf)[:,1][0])
            label = "PHISHING" if p > 0.5 else "benign"
            print(f"{u} -> {round(p, 4)} ({label})")
        else:
            print(f"Featureization failed or model not loaded for {u}, skipping prediction.")
else:
    print("\nTests skipped because model could not be loaded.")

Retrained model and schema loaded successfully.

--- Running Tests with Retrained Model ---
https://www.google.com/ -> 0.1087 (benign)
http://192.168.1.50/login.php?user=guest -> 0.0 (benign)
https://secure-paypa1.com.verify-account.co/reset?session=abc123 -> 0.0 (benign)
http://example.com/%2F%2E%2E/redirect?to=http://bad.ru -> 0.0 (benign)
https://www.uni-mainz.de/ -> 0.9448 (PHISHING)


In [40]:
from urllib.parse import urlparse
import re, pandas as pd, tldextract

def _is_ipv4(host: str) -> bool:
    if not host: return False
    m = re.fullmatch(r"\d{1,3}(\.\d{1,3}){3}", host)
    if not m: return False
    return all(0 <= int(p) <= 255 for p in host.split("."))

def featureize_url(url: str) -> pd.DataFrame:
    u = (url or "").strip()
    if not re.match(r"^[a-zA-Z]+://", u):
        u = "http://" + u

    parsed = urlparse(u)
    host = parsed.hostname or ""  # robust hostname for IP/TLD checks
    ext = tldextract.extract(u)
    tld_raw = (ext.suffix or "unknown").lower()

    url_len = len(u)
    domain = ".".join([p for p in [ext.subdomain, ext.domain, ext.suffix] if p])
    domain_len = len(domain)

    is_https = 1 if (parsed.scheme or "").lower() == "https" else 0
    is_ip = 1 if _is_ipv4(host) else 0  # FIX: IP from hostname
    subdomain = ext.subdomain or ""
    no_sub = 0 if not subdomain else subdomain.count(".") + 1

    letters = len(re.findall(r"[A-Za-z]", u))
    digits  = len(re.findall(r"[0-9]", u))
    others  = len(re.findall(r"[^A-Za-z0-9]", u))  # match training rule

    letter_ratio  = (letters/url_len) if url_len else 0.0
    digit_ratio   = (digits/url_len)  if url_len else 0.0
    special_ratio = (others/url_len)  if url_len else 0.0

    qmarks = u.count("?")
    amps   = u.count("&")
    equals = u.count("=")

    U = u.upper()
    pct_tokens = ["%2F","%3A","%2E"]
    has_obf = 1 if ("@" in u or any(tok in U for tok in pct_tokens)) else 0
    no_obf = sum(U.count(tok) for tok in pct_tokens)
    obf_ratio = (no_obf/url_len) if url_len else 0.0

    # TLD mapping strictly into training categories
    tld_norm = tld_raw if tld_raw in TLD_CATS else ("other" if "other" in TLD_CATS else TLD_CATS[0])

    row = {
        "URLLength": url_len, "DomainLength": domain_len, "IsDomainIP": is_ip,
        "TLD": tld_norm, "TLDLength": len(tld_raw),
        "NoOfSubDomain": no_sub,
        "HasObfuscation": has_obf, "NoOfObfuscatedChar": no_obf, "ObfuscationRatio": obf_ratio,
        "NoOfLettersInURL": letters, "LetterRatioInURL": letter_ratio,
        "NoOfDegitsInURL": digits, "DegitRatioInURL": digit_ratio,
        "NoOfEqualsInURL": equals, "NoOfQMarkInURL": qmarks, "NoOfAmpersandInURL": amps,
        "NoOfOtherSpecialCharsInURL": others, "SpacialCharRatioInURL": special_ratio,
        "IsHTTPS": is_https
    }
    X = pd.DataFrame([row])
    # enforce order & dtypes (numerics first, then categorical)
    for c in FEATURE_COLS:
        if c not in X.columns: X[c] = 0
    X = X[FEATURE_COLS]
    for c in FEATURE_COLS:
        if c != "TLD":
            X[c] = pd.to_numeric(X[c], errors="coerce").fillna(0)
    X["TLD"] = pd.Categorical(X["TLD"], categories=TLD_CATS)
    return X


In [41]:
import json, pandas as pd, numpy as np

# Load schema
SCHEMA = json.load(open("models/schema.json","r",encoding="utf-8"))
FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATS = SCHEMA["tld_categories"]

# 1) Make a dummy URL and build features
X = featureize_url("https://example.com/test?x=1&y=2")

# 2) Column set and order
assert list(X.columns) == FEATURE_COLS, f"Column order mismatch:\n{list(X.columns)}\n!=\n{FEATURE_COLS}"

# 3) Dtypes: all non-TLD numeric, TLD is category with expected categories
for c in FEATURE_COLS:
    if c != "TLD":
        assert pd.api.types.is_numeric_dtype(X[c]), f"{c} is not numeric dtype"
assert pd.api.types.is_categorical_dtype(X["TLD"]), "TLD is not categorical dtype"
cats = list(X["TLD"].cat.categories)
assert cats == TLD_CATS, f"TLD category space mismatch:\n{cats[:10]} ..."

# 4) No NaNs in numerics
num_cols = [c for c in FEATURE_COLS if c != "TLD"]
assert np.isfinite(X[num_cols].to_numpy()).all(), "Found NaN/inf in numeric features"

print("Schema/dtype check: OK")


Schema/dtype check: OK


  assert pd.api.types.is_categorical_dtype(X["TLD"]), "TLD is not categorical dtype"


In [42]:
import joblib
cal = joblib.load("models/phish_url_hgb_cal.joblib")

tests = [
    "https://www.google.com/",
    "http://192.168.1.50/login.php?user=guest",
    "https://secure-paypa1.com.verify-account.co/reset?session=abc123",
    "http://example.com/%2F%2E%2E/redirect?to=http://bad.ru",
    "https://www.uni-mainz.de/",
]
for u in tests:
    Xinf = featureize_url(u)
    p = float(cal.predict_proba(Xinf)[:,1][0])
    assert 0.0 <= p <= 1.0, f"Invalid probability {p} for {u}"
    print(f"{u} -> {p:.4f}")
print("Predict_proba smoke test: OK")


https://www.google.com/ -> 0.1087
http://192.168.1.50/login.php?user=guest -> 0.0000
https://secure-paypa1.com.verify-account.co/reset?session=abc123 -> 0.0000
http://example.com/%2F%2E%2E/redirect?to=http://bad.ru -> 0.0000
https://www.uni-mainz.de/ -> 0.9448
Predict_proba smoke test: OK


In [43]:
# 1) IP host should set IsDomainIP=1 and TLD to 'other' or a valid category
Xi = featureize_url("http://192.168.0.1/admin")
print("IsDomainIP:", int(Xi.at[0,"IsDomainIP"]), "TLD:", str(Xi.at[0,"TLD"]))
assert int(Xi.at[0,"IsDomainIP"]) == 1, "IP detection failed"
assert str(Xi.at[0,"TLD"]) in TLD_CATS, "TLD not in category space"

# 2) Obfuscation tokens should increment counts/ratio
Xo = featureize_url("http://x.com/%2F%2E%2E/%2E%2E?u=1")
print("HasObfuscation:", int(Xo.at[0,"HasObfuscation"]), "NoOfObfuscatedChar:", int(Xo.at[0,"NoOfObfuscatedChar"]))
assert int(Xo.at[0,"HasObfuscation"]) == 1, "Obfuscation flag failed"
assert int(Xo.at[0,"NoOfObfuscatedChar"]) >= 1, "Obfuscated char count failed"

# 3) Ratios are within [0,1] and consistent with counts/length
for Xs in [Xi, Xo]:
    L = int(Xs.at[0,"URLLength"])
    letters = int(Xs.at[0,"NoOfLettersInURL"])
    digits  = int(Xs.at[0,"NoOfDegitsInURL"])
    specials= int(Xs.at[0,"NoOfOtherSpecialCharsInURL"])
    assert 0 <= Xs.at[0,"LetterRatioInURL"] <= 1 and 0 <= Xs.at[0,"DegitRatioInURL"] <= 1 and 0 <= Xs.at[0,"SpacialCharRatioInURL"] <= 1, "Ratio out of bounds"
    assert letters + digits + specials >= L - 5, "Counts vs length look inconsistent (allow small slack)"
print("Edge-case unit tests: OK")


IsDomainIP: 1 TLD: other
HasObfuscation: 1 NoOfObfuscatedChar: 5
Edge-case unit tests: OK


In [44]:
import time
urls = ["https://example.com/?q=test"]*1000
t0 = time.time()
_ = [featureize_url(u) for u in urls]
dt = time.time() - t0
print(f"Featureize throughput: {len(urls)/dt:.1f} URLs/sec")


Featureize throughput: 188.8 URLs/sec


In [45]:
import pandas as pd
# Ensure the latest featureize_url function is defined above this cell

print("\n--- Debugging Features for Misclassified URLs ---")

misclassified_urls = [
    "https://secure-paypa1.com.verify-account.co/reset?session=abc123", # Should be HIGH
    "http://example.com/%2F%2E%2E/redirect?to=http://bad.ru",      # Should be HIGH
    "https://www.uni-mainz.de/",                                     # Should be LOW
]

# Set display options for pandas
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)

for u in misclassified_urls:
    print(f"\n--- Features for: {u} ---")
    # Make sure featureize_url is using the latest patched version
    Xinf = featureize_url(u)
    if not Xinf.empty:
        # Transpose (.T) makes it easier to read
        print(Xinf.T)
    else:
        print("Featureization failed.")

# Reset display options if needed
# pd.reset_option('display.max_columns')
# pd.reset_option('display.width')


--- Debugging Features for Misclassified URLs ---

--- Features for: https://secure-paypa1.com.verify-account.co/reset?session=abc123 ---
                                   0
URLLength                         64
DomainLength                      35
IsDomainIP                         0
TLD                               co
TLDLength                          2
NoOfSubDomain                      2
HasObfuscation                     0
NoOfObfuscatedChar                 0
ObfuscationRatio                 0.0
NoOfLettersInURL                  49
LetterRatioInURL            0.765625
NoOfDegitsInURL                    4
DegitRatioInURL               0.0625
NoOfEqualsInURL                    1
NoOfQMarkInURL                     1
NoOfAmpersandInURL                 0
NoOfOtherSpecialCharsInURL        11
SpacialCharRatioInURL       0.171875
IsHTTPS                            1

--- Features for: http://example.com/%2F%2E%2E/redirect?to=http://bad.ru ---
                                   0
URLLen

In [46]:
# --- Replace the old _count_other_specials with this ---
def _count_other_specials(u: str) -> int:
    """
    Counts special characters EXCLUDING common URL structural/query chars.
    Matches the likely definition used for the CSV features.
    """
    # Exclude: letters, numbers, and common chars: / : . - _ ? = & %
    other_chars = re.findall(r"[^A-Za-z0-9/:.\-_\?=&%]", u)
    return len(other_chars)

# --- AND ---

# --- Find the main featureize_url function again ---
# --- Inside it, find the line calculating 'others' and REPLACE it ---
# Replace this line:
# others  = len(re.findall(r"[^A-Za-z0-9]", u))
# With this line:
    others = _count_other_specials(u) # Use the refined helper function

# (Make sure the rest of the featureize_url function remains the same as the last version)

In [47]:
# --- Re-run this test cell AFTER applying the special char fix ---
if cal:
    print("\n--- Running Tests with Final Featureizer ---")
    tests = [
        "https://www.google.com/",
        "http://192.168.1.50/login.php?user=guest",
        "https://secure-paypa1.com.verify-account.co/reset?session=abc123", # Phishing
        "http://example.com/%2F%2E%2E/redirect?to=http://bad.ru", # Obfuscation/Phishing
        "https://www.uni-mainz.de/", # Benign university
    ]
    for u in tests:
        Xinf = featureize_url(u)
        if not Xinf.empty and cal:
            try:
                p = float(cal.predict_proba(Xinf)[:,1][0])
                label = "PHISHING" if p > 0.5 else "benign"
                print(f"{u} -> {round(p, 4)} ({label})")
            except Exception as e:
                print(f"Error predicting for {u}: {e}")
                print("Feature vector was:")
                print(Xinf.to_string())
        else:
            print(f"Featureization failed or model not loaded for {u}, skipping prediction.")
else:
    print("\nTests skipped because model could not be loaded.")


--- Running Tests with Final Featureizer ---
https://www.google.com/ -> 0.1087 (benign)
http://192.168.1.50/login.php?user=guest -> 0.0 (benign)
https://secure-paypa1.com.verify-account.co/reset?session=abc123 -> 0.0 (benign)
http://example.com/%2F%2E%2E/redirect?to=http://bad.ru -> 0.0 (benign)
https://www.uni-mainz.de/ -> 0.9448 (PHISHING)


In [53]:
import json
import pandas as pd
import re
import joblib
from urllib.parse import urlparse
import tldextract
from pathlib import Path
import numpy as np # Make sure numpy is imported

# --- Load schema/model paths (ensure these paths are correct) ---
SCHEMA_PATH = Path("models/schema.json")
MODEL_PATH = Path("models/phish_url_hgb_cal.joblib")

# --- Load Schema Info ---
try:
    SCHEMA = json.load(open(SCHEMA_PATH, "r", encoding="utf-8"))
    FEATURE_COLS_FROM_SCHEMA = SCHEMA["feature_cols"] # Keep original list for reference
    TLD_CATS = SCHEMA["tld_categories"]  # ordered list from JSON
    print("Schema loaded for featureizer setup.")
except FileNotFoundError:
    print(f"FATAL: Could not load '{SCHEMA_PATH}'. Cannot define featureizer correctly.")
    FEATURE_COLS_FROM_SCHEMA = []
    TLD_CATS = []

# --- Helper Function for IP Check (Corrected Regex) ---
def _is_ipv4(host: str) -> bool:
    if not host: return False
    # CORRECTED REGEX: Use {1,3} for repetition
    m = re.fullmatch(r"\d{1,3}(\.\d{1,3}){3}", host)
    if not m: return False
    # Check if parts are valid byte values
    try:
        return all(0 <= int(p) <= 255 for p in host.split("."))
    except ValueError:
        return False # Should not happen if regex matched, but safe

# --- Helper Function for Refined Special Char Count ---
def _count_other_specials(u: str) -> int:
    # Exclude: letters, numbers, and common chars: / : . - _ ? = & %
    other_chars = re.findall(r"[^A-Za-z0-9/:.\-_\?=&%]", u)
    return len(other_chars)

# --- Final Patched Featureizer Function ---
def featureize_url(url: str) -> pd.DataFrame:
    # Use the TLD_CATS loaded at the start of the cell
    global TLD_CATS, FEATURE_COLS_FROM_SCHEMA # Access the globally loaded schema info

    # Define the FULL list of features *including new ones* for this function
    CURRENT_FEATURE_COLS = [
        "URLLength", "DomainLength", "IsDomainIP", "TLD", "TLDLength",
        "NoOfSubDomain", "HasObfuscation", "NoOfObfuscatedChar", "ObfuscationRatio",
        "NoOfLettersInURL", "LetterRatioInURL", "NoOfDegitsInURL", "DegitRatioInURL",
        "NoOfEqualsInURL", "NoOfQMarkInURL", "NoOfAmpersandInURL",
        "NoOfOtherSpecialCharsInURL", "SpacialCharRatioInURL", "IsHTTPS",
        "ContainsAt", "HasRedirectWord", "BrandMismatchHint" # New features added
    ]

    if not TLD_CATS:
         print("Schema (TLD_CATS) not loaded, cannot featureize.")
         return pd.DataFrame(columns=CURRENT_FEATURE_COLS)

    u = (url or "").strip()
    if not re.match(r"^[a-zA-Z]+://", u):
        u = "http://" + u

    # Initialize with defaults BEFORE try block
    row = {col: 0 for col in CURRENT_FEATURE_COLS if col != 'TLD'}
    row['TLD'] = "other" # Default TLD

    try:
        parsed = urlparse(u)
        host = parsed.hostname or ""
        ext = tldextract.extract(u)

        # Use corrected IP check
        is_ip = 1 if _is_ipv4(host) else 0

        tld_raw = ""
        if is_ip:
            tld_norm = "other"
        else:
            tld_raw = (ext.suffix or "unknown").lower()
            if tld_raw in TLD_CATS:
                tld_norm = tld_raw
            elif "other" in TLD_CATS:
                tld_norm = "other"
            else:
                # Fallback if 'other' is somehow missing from TLD_CATS list
                tld_norm = TLD_CATS[0] if TLD_CATS else "unknown"

        url_len = len(u)
        domain_parts = [p for p in [ext.subdomain, ext.domain, ext.suffix] if p]
        domain = ".".join(domain_parts) if not is_ip else host
        domain_len = len(domain)

        is_https = 1 if (parsed.scheme or "").lower() == "https" else 0
        subdomain = ext.subdomain or ""
        no_sub = 0 if is_ip or not subdomain else subdomain.count(".") + 1

        letters = len(re.findall(r"[A-Za-z]", u))
        digits  = len(re.findall(r"[0-9]", u))
        # Use refined special char count
        others = _count_other_specials(u)
        letter_ratio  = (letters/url_len) if url_len else 0.0
        digit_ratio   = (digits/url_len)  if url_len else 0.0
        special_ratio = (others / url_len) if url_len else 0.0 # Use refined count for ratio

        qmarks = u.count("?")
        amps   = u.count("&")
        equals = u.count("=")

        U = u.upper()
        has_obf = 1 if ("@" in u or re.search(r"%[0-9A-F]{2}", U)) else 0
        no_obf = len(re.findall(r"%[0-9A-F]{2}", U))
        obf_ratio = (no_obf/url_len) if url_len else 0.0

        # --- Create the initial 'row' dictionary ---
        row = {
            "URLLength": url_len, "DomainLength": domain_len, "IsDomainIP": is_ip,
            "TLD": tld_norm, "TLDLength": len(tld_raw),
            "NoOfSubDomain": no_sub,
            "HasObfuscation": has_obf, "NoOfObfuscatedChar": no_obf, "ObfuscationRatio": obf_ratio,
            "NoOfLettersInURL": letters, "LetterRatioInURL": letter_ratio,
            "NoOfDegitsInURL": digits, "DegitRatioInURL": digit_ratio,
            "NoOfEqualsInURL": equals, "NoOfQMarkInURL": qmarks, "NoOfAmpersandInURL": amps,
            "NoOfOtherSpecialCharsInURL": others, # Use refined count
            "SpacialCharRatioInURL": special_ratio, # Use refined count for ratio
            "IsHTTPS": is_https
        }

        # --- Calculate and add lexical flags ---
        L = u.lower()
        contains_at = 1 if "@" in u else 0
        has_redirect_word = 1 if any(w in L for w in ["redirect", "verify", "login", "account", "update", "signin", "auth", "confirm"]) else 0
        brand_mismatch_hint = 1 if any(p in L for p in ["paypa1", "secure-paypa", "verify-account", "confirm-account", "support-", "-login", "account-update"]) else 0

        row.update({
            "ContainsAt": contains_at,
            "HasRedirectWord": has_redirect_word,
            "BrandMismatchHint": brand_mismatch_hint
        })
        # --- End lexical flags ---

        X = pd.DataFrame([row])

        # Enforce schema: order, missing columns, dtypes
        for c in CURRENT_FEATURE_COLS:
            if c not in X.columns:
                X[c] = 0

        X = X[CURRENT_FEATURE_COLS] # Enforce column order

        # Cast numerics safely first
        for c in CURRENT_FEATURE_COLS:
            if c != "TLD":
                X[c] = pd.to_numeric(X[c], errors='coerce').fillna(0).astype(np.float64)

        # Set categorical LAST, ensuring value is valid
        if 'TLD' in X.columns:
             # Ensure category exists, default to 'other' or first if needed
             current_tld = X['TLD'].iloc[0]
             if current_tld not in TLD_CATS:
                  fallback_tld = "other" if "other" in TLD_CATS else (TLD_CATS[0] if TLD_CATS else "unknown")
                  X['TLD'] = fallback_tld
             X["TLD"] = pd.Categorical(X["TLD"], categories=TLD_CATS)
        else:
             X['TLD'] = pd.Categorical(["other"], categories=TLD_CATS) # Should not happen

        # Final check for NaNs (redundant but safe)
        numeric_cols = X.select_dtypes(include=np.number).columns
        X[numeric_cols] = X[numeric_cols].fillna(0)

        return X

    except Exception as e:
        print(f"Error during featureization for URL {url}: {e}")
        # Create default row on error matching CURRENT_FEATURE_COLS
        error_df = pd.DataFrame(columns=CURRENT_FEATURE_COLS)
        error_df.loc[0] = 0
        error_df["TLD"] = pd.Categorical(["other"], categories=TLD_CATS)
        numeric_cols = error_df.select_dtypes(include=np.number).columns
        error_df[numeric_cols] = error_df[numeric_cols].fillna(0)
        return error_df

print("Final featureize_url function defined with corrected IP regex.")

Schema loaded for featureizer setup.
Final featureize_url function defined with corrected IP regex.


In [54]:
import joblib
import json
import re
from urllib.parse import urlparse
import tldextract
import pandas as pd
import numpy as np
from pathlib import Path
from tqdm.notebook import tqdm
from sklearn.model_selection import train_test_split
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import roc_auc_score, average_precision_score, f1_score, precision_recall_curve
import os

# --- 1. Define Updated Feature Columns ---
# This MUST match the features produced by the featureize_url above
UPDATED_FEATURE_COLS = [
    "URLLength", "DomainLength", "IsDomainIP", "TLD", "TLDLength",
    "NoOfSubDomain", "HasObfuscation", "NoOfObfuscatedChar", "ObfuscationRatio",
    "NoOfLettersInURL", "LetterRatioInURL", "NoOfDegitsInURL", "DegitRatioInURL",
    "NoOfEqualsInURL", "NoOfQMarkInURL", "NoOfAmpersandInURL",
    "NoOfOtherSpecialCharsInURL", "SpacialCharRatioInURL", "IsHTTPS",
    "ContainsAt", "HasRedirectWord", "BrandMismatchHint" # New features added
]

# --- 2. Load Schema (for TLD Cats) and Original Data ---
try:
    SCHEMA_PATH = Path("models/schema.json") # Load existing schema for TLD cats
    with open(SCHEMA_PATH, "r") as f:
        SCHEMA = json.load(f)
    # Use TLD_CATS from the global scope (loaded in the previous cell)
    # TLD_CATS = SCHEMA["tld_categories"] # Or reload if needed
    if not TLD_CATS: raise FileNotFoundError("TLD_CATS not available")
    print("Using TLD categories from previous schema.")

    # Reload original data
    DATA_PATH = Path("PhiUSIIL_Phishing_URL_Dataset.csv") # Adjust if needed
    original_use_cols = [ # Load only needed original columns
        "URL", "label", "TLD" # Need TLD to ensure categories align if reloading df
    ]
    df_orig = pd.read_csv(DATA_PATH, usecols=["URL", "label"]).dropna() # Load only URL and label
    n_samples = 20000
    df_sample = df_orig.sample(n=min(n_samples, len(df_orig)), random_state=42)
    print(f"Using a sample of {len(df_sample)} URLs for quick retraining.")

except FileNotFoundError:
    print(f"FATAL: Could not load '{SCHEMA_PATH}' or '{DATA_PATH}'. Cannot proceed.")
    df_sample = pd.DataFrame()


# --- 3. Rebuild X using the *final* featureizer ---
if not df_sample.empty and TLD_CATS:
    print("Rebuilding training data with 22 features using final featureizer...")
    urls = df_sample["URL"].tolist()
    rows = []
    # Make sure the LATEST featureize_url (with fixes) is defined above!
    for u in tqdm(urls, desc="Featureizing URLs (Sample)"):
        feature_dict = featureize_url(u).iloc[0].to_dict()
        rows.append(feature_dict)

    X_new = pd.DataFrame(rows)
    # Ensure all columns exist and enforce order using UPDATED_FEATURE_COLS
    for col in UPDATED_FEATURE_COLS:
        if col not in X_new.columns: X_new[col] = 0
    X_new = X_new[UPDATED_FEATURE_COLS]
    y_new = df_sample["label"].astype(int)

    # Ensure dtypes (Numeric + TLD Categorical)
    for c in UPDATED_FEATURE_COLS:
        if c != "TLD":
            X_new[c] = pd.to_numeric(X_new[c], errors='coerce').fillna(0)
    # Use TLD_CATS loaded from the previous schema
    X_new["TLD"] = pd.Categorical(X_new["TLD"], categories=TLD_CATS)

    print("Sample training data rebuilt. Splitting data...")
    Xtr, Xva, ytr, yva = train_test_split(X_new, y_new, test_size=0.20, stratify=y_new, random_state=42)

    # --- 4. Retrain Model (HGBT + Calibration) ---
    print("Starting sample model training...")
    try:
        cat_feature_index = UPDATED_FEATURE_COLS.index("TLD")
        categorical_features = [cat_feature_index]
    except ValueError:
        print("Warning: TLD column not found. Training without categorical feature.")
        categorical_features = None

    hgb = HistGradientBoostingClassifier(
        max_leaf_nodes=31, learning_rate=0.08, max_iter=300,
        validation_fraction=0.1, early_stopping=True, random_state=42,
        categorical_features=categorical_features
    )
    hgb.fit(Xtr, ytr)

    print("Starting sample model calibration...")
    # Overwrite 'cal' with the new model
    cal = CalibratedClassifierCV(hgb, cv="prefit", method="sigmoid")
    cal.fit(Xva, yva)

    print("Calculating sample metrics...")
    proba_sample = cal.predict_proba(Xva)[:,1]
    pred_sample = (proba_sample >= 0.5).astype(int)
    auc_sample = roc_auc_score(yva, proba_sample)
    ap_sample = average_precision_score(yva, proba_sample)
    f1_sample = f1_score(yva, pred_sample)
    prec, rec, th = precision_recall_curve(yva, proba_sample)
    idx = np.where(rec >= 0.90)[0]
    p_at_90 = float(prec[idx[0]]) if len(idx) else float("nan")
    # Safer indexing for threshold
    thr_at_90 = float(th[idx[0]-1]) if len(idx) > 0 and idx[0] > 0 else (th[0] if len(th) > 0 else np.nan)


    print("\n--- Sample Model Performance (22 Features) ---")
    print({"AUC":auc_sample, "AP":ap_sample, "F1@0.5":f1_sample, "Prec@90%Rec":p_at_90, "Thr@90%Rec":thr_at_90})

    # --- We still won't save this yet, just use 'cal' for testing ---
    print("\nSample retraining complete! Model is in 'cal' variable.")
    # --- Save the HIGH RECALL THRESHOLD for later use in the UI ---
    high_recall_threshold = thr_at_90
    print(f"High Recall (90%) Threshold calculated: {high_recall_threshold:.4f}")


else:
    print("Skipping retraining due to data loading error or missing TLD_CATS.")

Using TLD categories from previous schema.
Using a sample of 20000 URLs for quick retraining.
Rebuilding training data with 22 features using final featureizer...


Featureizing URLs (Sample):   0%|          | 0/20000 [00:00<?, ?it/s]

Sample training data rebuilt. Splitting data...
Starting sample model training...
Starting sample model calibration...
Calculating sample metrics...

--- Sample Model Performance (22 Features) ---
{'AUC': np.float64(0.9985258154399239), 'AP': np.float64(0.9973532797330447), 'F1@0.5': 0.9963179553822828, 'Prec@90%Rec': 0.576, 'Thr@90%Rec': np.float64(2.0217563472981963e-06)}

Sample retraining complete! Model is in 'cal' variable.
High Recall (90%) Threshold calculated: 0.0000




In [55]:
# --- Run this test cell AFTER the sample retraining with lexical features ---
if 'cal' in locals() and cal is not None and 'high_recall_threshold' in locals():
    print("\n--- Running Tests with Sample Model (22 Features + Lexical) ---")
    tests = [
        "https://www.google.com/",
        "http://192.168.1.50/login.php?user=guest", # IP Address
        "https://secure-paypa1.com.verify-account.co/reset?session=abc123", # Phishing
        "http://example.com/%2F%2E%2E/redirect?to=http://bad.ru", # Obfuscation/Phishing
        "https://www.uni-mainz.de/", # Benign university
    ]
    # Make sure the LATEST featureize_url is defined above
    for u in tests:
        # Use the featureizer defined in Cell 1
        Xinf = featureize_url(u)
        if not Xinf.empty and cal:
            try:
                # Use the 'cal' model retrained in Cell 2
                p = float(cal.predict_proba(Xinf)[:,1][0])
                label_std = "PHISHING" if p > 0.5 else "benign"
                # Use the calculated high recall threshold
                label_hr = "PHISHING" if p > high_recall_threshold else "benign"
                print(f"{u} -> {round(p, 4)} (Std: {label_std}, HR: {label_hr})")
            except Exception as e:
                print(f"Error predicting for {u}: {e}")
        else:
            print(f"Featureization failed or model not loaded for {u}, skipping prediction.")
else:
    print("\nTests skipped because sample model ('cal') or 'high_recall_threshold' is not available.")


--- Running Tests with Sample Model (22 Features + Lexical) ---
https://www.google.com/ -> 0.9683 (Std: PHISHING, HR: PHISHING)
http://192.168.1.50/login.php?user=guest -> 0.0 (Std: benign, HR: PHISHING)
https://secure-paypa1.com.verify-account.co/reset?session=abc123 -> 0.0 (Std: benign, HR: PHISHING)
http://example.com/%2F%2E%2E/redirect?to=http://bad.ru -> 0.0 (Std: benign, HR: PHISHING)
https://www.uni-mainz.de/ -> 0.9853 (Std: PHISHING, HR: PHISHING)


In [56]:
# --- Cell to Retrain on FULL Dataset ---
import joblib
import json
import re
from urllib.parse import urlparse
import tldextract
import pandas as pd
import numpy as np
from pathlib import Path
from tqdm.notebook import tqdm # Use notebook version
from sklearn.model_selection import train_test_split
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import roc_auc_score, average_precision_score, f1_score, precision_recall_curve
import os

# --- 1. Define Updated Feature Columns (MUST match featureize_url) ---
UPDATED_FEATURE_COLS = [
    "URLLength", "DomainLength", "IsDomainIP", "TLD", "TLDLength",
    "NoOfSubDomain", "HasObfuscation", "NoOfObfuscatedChar", "ObfuscationRatio",
    "NoOfLettersInURL", "LetterRatioInURL", "NoOfDegitsInURL", "DegitRatioInURL",
    "NoOfEqualsInURL", "NoOfQMarkInURL", "NoOfAmpersandInURL",
    "NoOfOtherSpecialCharsInURL", "SpacialCharRatioInURL", "IsHTTPS",
    "ContainsAt", "HasRedirectWord", "BrandMismatchHint"
]

# --- 2. Load Schema (for TLD Cats) and FULL Original Data ---
try:
    SCHEMA_PATH = Path("models/schema.json")
    with open(SCHEMA_PATH, "r") as f:
        SCHEMA = json.load(f)
    # Use TLD_CATS from the global scope (loaded when defining featureize_url)
    if not TLD_CATS: raise FileNotFoundError("TLD_CATS not available")
    print("Using TLD categories from previous schema.")

    # Load FULL original data
    DATA_PATH = Path("PhiUSIIL_Phishing_URL_Dataset.csv") # Adjust if needed
    df_orig = pd.read_csv(DATA_PATH, usecols=["URL", "label"]).dropna()
    print(f"Using FULL dataset of {len(df_orig)} URLs for final retraining.")

except FileNotFoundError:
    print(f"FATAL: Could not load '{SCHEMA_PATH}' or '{DATA_PATH}'. Cannot proceed.")
    df_orig = pd.DataFrame() # Make df empty


# --- 3. Rebuild X using the final featureizer on FULL data ---
if not df_orig.empty and TLD_CATS:
    print("Rebuilding training data with 22 features (Full Dataset)...")
    urls = df_orig["URL"].tolist()
    rows = []
    # Make sure the LATEST featureize_url (with fixes) is defined above!
    for u in tqdm(urls, desc="Featureizing URLs (Full)"):
        feature_dict = featureize_url(u).iloc[0].to_dict()
        rows.append(feature_dict)

    X_full = pd.DataFrame(rows)
    # Ensure all columns exist and enforce order
    for col in UPDATED_FEATURE_COLS:
        if col not in X_full.columns: X_full[col] = 0
    X_full = X_full[UPDATED_FEATURE_COLS]
    y_full = df_orig["label"].astype(int)

    # Ensure dtypes
    for c in UPDATED_FEATURE_COLS:
        if c != "TLD":
            X_full[c] = pd.to_numeric(X_full[c], errors='coerce').fillna(0)
    X_full["TLD"] = pd.Categorical(X_full["TLD"], categories=TLD_CATS)

    print("Full training data rebuilt. Splitting data...")
    Xtr, Xva, ytr, yva = train_test_split(X_full, y_full, test_size=0.20, stratify=y_full, random_state=42)

    # --- 4. Retrain Final Model (HGBT + Calibration) ---
    print("Starting final model training...")
    try:
        cat_feature_index = UPDATED_FEATURE_COLS.index("TLD")
        categorical_features = [cat_feature_index]
    except ValueError:
        print("Warning: TLD column not found. Training without categorical feature.")
        categorical_features = None

    hgb = HistGradientBoostingClassifier(
        max_leaf_nodes=31, learning_rate=0.08, max_iter=300, # Consider slightly increasing max_iter if needed
        validation_fraction=0.1, early_stopping=True, random_state=42,
        categorical_features=categorical_features
    )
    hgb.fit(Xtr, ytr)

    print("Starting final model calibration...")
    # Overwrite 'cal' with the FINAL model
    cal = CalibratedClassifierCV(hgb, cv="prefit", method="sigmoid")
    cal.fit(Xva, yva)

    print("Calculating final metrics...")
    proba_final = cal.predict_proba(Xva)[:,1]
    pred_final = (proba_final >= 0.5).astype(int)
    auc_final = roc_auc_score(yva, proba_final)
    ap_final = average_precision_score(yva, proba_final)
    f1_final = f1_score(yva, pred_final)
    prec, rec, th = precision_recall_curve(yva, proba_final)
    idx = np.where(rec >= 0.90)[0]
    p_at_90 = float(prec[idx[0]]) if len(idx) else float("nan")
    thr_at_90 = float(th[idx[0]-1]) if len(idx) > 0 and idx[0] > 0 else (th[0] if len(th) > 0 else np.nan)


    print("\n--- Final Model Performance (22 Features, Full Data) ---")
    print({"AUC":auc_final, "AP":ap_final, "F1@0.5":f1_final, "Prec@90%Rec":p_at_90, "Thr@90%Rec":thr_at_90})

    # --- SAVE the FINAL Model and Schema ---
    print("\nSaving final model and schema...")
    os.makedirs("models", exist_ok=True)
    joblib.dump(cal, "models/phish_url_hgb_cal.joblib") # Save the final 'cal' model
    # Use the UPDATED_FEATURE_COLS for the final schema
    FINAL_SCHEMA = {"feature_cols": UPDATED_FEATURE_COLS, "tld_categories": TLD_CATS}
    json.dump(FINAL_SCHEMA, open("models/schema.json","w"), ensure_ascii=False) # Overwrite schema

    # --- Store the final High Recall Threshold ---
    high_recall_threshold = thr_at_90
    print(f"Final High Recall (90%) Threshold calculated: {high_recall_threshold:.4f}")
    print("\nFull retraining complete! Final model saved.")

else:
    print("Skipping retraining due to data loading error or missing TLD_CATS.")

Using TLD categories from previous schema.
Using FULL dataset of 235795 URLs for final retraining.
Rebuilding training data with 22 features (Full Dataset)...


Featureizing URLs (Full):   0%|          | 0/235795 [00:00<?, ?it/s]

Full training data rebuilt. Splitting data...
Starting final model training...
Starting final model calibration...




Calculating final metrics...

--- Final Model Performance (22 Features, Full Data) ---
{'AUC': np.float64(0.9988087682633816), 'AP': np.float64(0.9986167563924571), 'F1@0.5': 0.9971135699219184, 'Prec@90%Rec': 0.5718950783519583, 'Thr@90%Rec': np.float64(2.8381343290608353e-11)}

Saving final model and schema...
Final High Recall (90%) Threshold calculated: 0.0000

Full retraining complete! Final model saved.


In [57]:
import joblib
import json
import pandas as pd
import numpy as np
from pathlib import Path

# --- Make sure the FINAL featureize_url function definition is active ---
# (It should be defined in a cell above this one)
print("Using the featureize_url function defined in the notebook session.")

# --- 1. Load the FINAL Model and Schema ---
try:
    SCHEMA_PATH = Path("models/schema.json")
    MODEL_PATH = Path("models/phish_url_hgb_cal.joblib") # Final saved model

    with open(SCHEMA_PATH, "r") as f:
        SCHEMA = json.load(f)
    # Load the FINAL calibrated model
    cal_final = joblib.load(MODEL_PATH)
    # Load the high recall threshold from the schema if saved, else use the value from training output
    # NOTE: Adjust this value if needed based on your final training output
    high_recall_threshold = SCHEMA.get("high_recall_threshold", 2.8381343290608353e-11)

    print("Final model and schema loaded successfully.")
    print(f"Using High Recall Threshold: {high_recall_threshold:.4e}")

except FileNotFoundError:
    print(f"FATAL: Could not load '{SCHEMA_PATH}' or '{MODEL_PATH}'. Cannot test.")
    cal_final = None
except Exception as e:
    print(f"Error loading model/schema: {e}")
    cal_final = None

# --- 2. Run Tests ---
if cal_final:
    print("\n--- Running Tests with FINAL Model ---")
    tests = [
        "https://www.google.com/",
        "http://192.168.1.50/login.php?user=guest", # IP Address
        "https://secure-paypa1.com.verify-account.co/reset?session=abc123", # Phishing
        "http://example.com/%2F%2E%2E/redirect?to=http://bad.ru", # Obfuscation/Phishing
        "https://www.uni-mainz.de/", # Benign university
    ]
    for u in tests:
        # Use the featureize_url defined in this notebook session
        Xinf = featureize_url(u)
        if not Xinf.empty and cal_final:
            try:
                # Use the final loaded model
                p = float(cal_final.predict_proba(Xinf)[:,1][0])
                label_std = "PHISHING" if p > 0.5 else "benign"
                # Use the final high recall threshold
                label_hr = "PHISHING" if p > high_recall_threshold else "benign"
                print(f"{u} -> {round(p, 4)} (Std: {label_std}, HR: {label_hr})")
            except Exception as e:
                print(f"Error predicting for {u}: {e}")
        else:
            print(f"Featureization failed or model not loaded for {u}, skipping prediction.")
else:
    print("\nTests skipped because final model ('cal_final') could not be loaded.")

Using the featureize_url function defined in the notebook session.
Final model and schema loaded successfully.
Using High Recall Threshold: 2.8381e-11

--- Running Tests with FINAL Model ---
https://www.google.com/ -> 0.1945 (Std: benign, HR: PHISHING)
http://192.168.1.50/login.php?user=guest -> 0.0 (Std: benign, HR: PHISHING)
https://secure-paypa1.com.verify-account.co/reset?session=abc123 -> 0.0 (Std: benign, HR: PHISHING)
http://example.com/%2F%2E%2E/redirect?to=http://bad.ru -> 0.0 (Std: benign, HR: PHISHING)
https://www.uni-mainz.de/ -> 0.9923 (Std: PHISHING, HR: PHISHING)


In [58]:
import json
import re
import pandas as pd
import numpy as np
import tldextract
from urllib.parse import urlparse
from pathlib import Path

# --- Load the schema file ---
try:
    # This path logic works when imported by main.py (which is in the root)
    SCHEMA_PATH = Path("models/schema.json")
    with open(SCHEMA_PATH, "r", encoding="utf-8") as f:
        SCHEMA = json.load(f)
    
    FEATURE_COLS = SCHEMA["feature_cols"]
    TLD_CATS = SCHEMA["tld_categories"]
    print("Featureizer: Schema loaded successfully.")

except FileNotFoundError:
    print(f"FATAL (Featureizer): Could not load schema from {SCHEMA_PATH}.")
    FEATURE_COLS = []
    TLD_CATS = []

# --- Helper Function for IP Check (Corrected Regex) ---
def _is_ipv4(host: str) -> bool:
    if not host: return False
    # CORRECTED REGEX: Use {1,3} for repetition
    m = re.fullmatch(r"\d{1,3}(\.\d{1,3}){3}", host)
    if not m: return False
    try:
        return all(0 <= int(p) <= 255 for p in host.split("."))
    except ValueError:
        return False

# --- Helper Function for Refined Special Char Count ---
def _count_other_specials(u: str) -> int:
    # Exclude: letters, numbers, and common chars: / : . - _ ? = & %
    other_chars = re.findall(r"[^A-Za-z0-9/:.\-_\?=&%]", u)
    return len(other_chars)

# --- Final Patched Featureizer Function ---
def featureize_url(url: str) -> pd.DataFrame:
    global TLD_CATS, FEATURE_COLS

    # Use the full list of 22 features from the loaded schema
    CURRENT_FEATURE_COLS = FEATURE_COLS 

    if not TLD_CATS or not CURRENT_FEATURE_COLS:
         print("Error (Featureizer): Schema not loaded, cannot featureize.")
         return pd.DataFrame(columns=CURRENT_FEATURE_COLS)

    u = (url or "").strip()
    if not re.match(r"^[a-zA-Z]+://", u):
        u = "http://" + u

    # Initialize with defaults
    row = {col: 0 for col in CURRENT_FEATURE_COLS if col != 'TLD'}
    row['TLD'] = "other"

    try:
        parsed = urlparse(u)
        host = parsed.hostname or ""
        ext = tldextract.extract(u)
        is_ip = 1 if _is_ipv4(host) else 0
        tld_raw = ""

        if is_ip:
            tld_norm = "other"
        else:
            tld_raw = (ext.suffix or "unknown").lower()
            if tld_raw in TLD_CATS:
                tld_norm = tld_raw
            elif "other" in TLD_CATS:
                tld_norm = "other"
            else:
                tld_norm = TLD_CATS[0] # Fallback

        url_len = len(u)
        domain_parts = [p for p in [ext.subdomain, ext.domain, ext.suffix] if p]
        domain = ".".join(domain_parts) if not is_ip else host
        domain_len = len(domain)
        is_https = 1 if (parsed.scheme or "").lower() == "https" else 0
        subdomain = ext.subdomain or ""
        no_sub = 0 if is_ip or not subdomain else subdomain.count(".") + 1

        letters = len(re.findall(r"[A-Za-z]", u))
        digits  = len(re.findall(r"[0-9]", u))
        others = _count_other_specials(u) # Use refined count
        letter_ratio  = (letters/url_len) if url_len else 0.0
        digit_ratio   = (digits/url_len)  if url_len else 0.0
        special_ratio = (others / url_len) if url_len else 0.0

        qmarks = u.count("?")
        amps   = u.count("&")
        equals = u.count("=")

        U = u.upper()
        has_obf = 1 if ("@" in u or re.search(r"%[0-9A-F]{2}", U)) else 0
        no_obf = len(re.findall(r"%[0-9A-F]{2}", U))
        obf_ratio = (no_obf/url_len) if url_len else 0.0

        # --- Create the initial 'row' dictionary ---
        row = {
            "URLLength": url_len, "DomainLength": domain_len, "IsDomainIP": is_ip,
            "TLD": tld_norm, "TLDLength": len(tld_raw),
            "NoOfSubDomain": no_sub,
            "HasObfuscation": has_obf, "NoOfObfuscatedChar": no_obf, "ObfuscationRatio": obf_ratio,
            "NoOfLettersInURL": letters, "LetterRatioInURL": letter_ratio,
            "NoOfDegitsInURL": digits, "DegitRatioInURL": digit_ratio,
            "NoOfEqualsInURL": equals, "NoOfQMarkInURL": qmarks, "NoOfAmpersandInURL": amps,
            "NoOfOtherSpecialCharsInURL": others,
            "SpacialCharRatioInURL": special_ratio,
            "IsHTTPS": is_https
        }

        # --- Calculate and add lexical flags ---
        L = u.lower()
        contains_at = 1 if "@" in u else 0
        has_redirect_word = 1 if any(w in L for w in ["redirect", "verify", "login", "account", "update", "signin", "auth", "confirm"]) else 0
        brand_mismatch_hint = 1 if any(p in L for p in ["paypa1", "secure-paypa", "verify-account", "confirm-account", "support-", "-login", "account-update"]) else 0

        row.update({
            "ContainsAt": contains_at,
            "HasRedirectWord": has_redirect_word,
            "BrandMismatchHint": brand_mismatch_hint
        })
        # --- End lexical flags ---

        X = pd.DataFrame([row])

        # Enforce schema: order, missing columns, dtypes
        for c in CURRENT_FEATURE_COLS:
            if c not in X.columns:
                X[c] = 0
        X = X[CURRENT_FEATURE_COLS]

        for c in CURRENT_FEATURE_COLS:
            if c != "TLD":
                X[c] = pd.to_numeric(X[c], errors='coerce').fillna(0).astype(np.float64)

        if 'TLD' in X.columns:
             current_tld = X['TLD'].iloc[0]
             if current_tld not in TLD_CATS:
                  fallback_tld = "other" if "other" in TLD_CATS else TLD_CATS[0]
                  X['TLD'] = fallback_tld
             X["TLD"] = pd.Categorical(X["TLD"], categories=TLD_CATS)
        
        numeric_cols = X.select_dtypes(include=np.number).columns
        X[numeric_cols] = X[numeric_cols].fillna(0)

        return X

    except Exception as e:
        print(f"Error during featureization for URL {url}: {e}")
        error_df = pd.DataFrame(columns=CURRENT_FEATURE_COLS)
        error_df.loc[0] = 0
        error_df["TLD"] = pd.Categorical(["other"], categories=TLD_CATS)
        numeric_cols = error_df.select_dtypes(include=np.number).columns
        error_df[numeric_cols] = error_df[numeric_cols].fillna(0)
        return error_df

# --- Self-Test (to run this file directly) ---
if __name__ == "__main__":
    # This test will only work if models/schema.json is in the parent directory
    # relative to where you run this script.
    print("Running featureizer self-test...")
    test_url = "https://secure-paypa1.com.verify-account.co/reset?session=abc123"
    features_df = featureize_url(test_url)
    print(f"--- Features for: {test_url} ---")
    print(features_df.T.to_string())
    print("\nSelf-test complete. Check 'BrandMismatchHint' and 'HasRedirectWord' are 1.")

Featureizer: Schema loaded successfully.
Running featureizer self-test...
--- Features for: https://secure-paypa1.com.verify-account.co/reset?session=abc123 ---
                                   0
URLLength                       64.0
DomainLength                    35.0
IsDomainIP                       0.0
TLD                               co
TLDLength                        2.0
NoOfSubDomain                    2.0
HasObfuscation                   0.0
NoOfObfuscatedChar               0.0
ObfuscationRatio                 0.0
NoOfLettersInURL                49.0
LetterRatioInURL            0.765625
NoOfDegitsInURL                  4.0
DegitRatioInURL               0.0625
NoOfEqualsInURL                  1.0
NoOfQMarkInURL                   1.0
NoOfAmpersandInURL               0.0
NoOfOtherSpecialCharsInURL       0.0
SpacialCharRatioInURL            0.0
IsHTTPS                          1.0
ContainsAt                       0.0
HasRedirectWord                  1.0
BrandMismatchHint        

In [63]:
# Make the current directory importable and import featureizer.py
import sys, pathlib, importlib

project_root = pathlib.Path(".").resolve()
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

# If it's inside a 'scripts' folder, add that instead:
# scripts_dir = project_root / "scripts"
# scripts_dir.mkdir(exist_ok=True)
# if str(scripts_dir) not in sys.path:
#     sys.path.insert(0, str(scripts_dir))

import featureizer as FZ
importlib.reload(FZ)
print("featureizer imported. has features_from_url:", hasattr(FZ, "features_from_url"))


ModuleNotFoundError: No module named 'featureizer'

In [64]:
# Self-check: confirm 22 features and correct order
df = featureize_url("https://secure-paypa1.com.verify-account.co/reset?session=abc123")
print("Cols:", len(df.columns), list(df.columns))
print("TLD dtype:", df["TLD"].dtype)
print("Sample row:\n", df.iloc[0].to_dict())


Cols: 22 ['URLLength', 'DomainLength', 'IsDomainIP', 'TLD', 'TLDLength', 'NoOfSubDomain', 'HasObfuscation', 'NoOfObfuscatedChar', 'ObfuscationRatio', 'NoOfLettersInURL', 'LetterRatioInURL', 'NoOfDegitsInURL', 'DegitRatioInURL', 'NoOfEqualsInURL', 'NoOfQMarkInURL', 'NoOfAmpersandInURL', 'NoOfOtherSpecialCharsInURL', 'SpacialCharRatioInURL', 'IsHTTPS', 'ContainsAt', 'HasRedirectWord', 'BrandMismatchHint']
TLD dtype: category
Sample row:
 {'URLLength': 64.0, 'DomainLength': 35.0, 'IsDomainIP': 0.0, 'TLD': 'co', 'TLDLength': 2.0, 'NoOfSubDomain': 2.0, 'HasObfuscation': 0.0, 'NoOfObfuscatedChar': 0.0, 'ObfuscationRatio': 0.0, 'NoOfLettersInURL': 49.0, 'LetterRatioInURL': 0.765625, 'NoOfDegitsInURL': 4.0, 'DegitRatioInURL': 0.0625, 'NoOfEqualsInURL': 1.0, 'NoOfQMarkInURL': 1.0, 'NoOfAmpersandInURL': 0.0, 'NoOfOtherSpecialCharsInURL': 0.0, 'SpacialCharRatioInURL': 0.0, 'IsHTTPS': 1.0, 'ContainsAt': 0.0, 'HasRedirectWord': 1.0, 'BrandMismatchHint': 1.0}


In [67]:
# === Final Model Smoke Test (Jupyter) ===
import sys, pathlib, importlib, json, joblib, pandas as pd, numpy as np, time

# 1) Resolve paths: set repo root and common folders
ROOT = pathlib.Path(".").resolve()  # if notebook is nested, adjust: pathlib.Path("..").resolve()
MODELS = ROOT / "models"
SCRIPTS = ROOT / "scripts"
APP = ROOT / "app"

assert (MODELS / "schema.json").exists(), f"Missing: {MODELS/'schema.json'}"
assert (MODELS / "phish_url_hgb_cal.joblib").exists(), f"Missing: {MODELS/'phish_url_hgb_cal.joblib'}"

# 2) Load schema + model
SCHEMA = json.load(open(MODELS / "schema.json","r",encoding="utf-8"))
FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATS = SCHEMA["tld_categories"]
cal = joblib.load(MODELS / "phish_url_hgb_cal.joblib")
print("Loaded model + schema")

# 3) Import featureizer: try scripts/featureizer.py then app/featureizer.py
FZ = None
for candidate in [SCRIPTS, APP]:
    if candidate.exists():
        if str(candidate) not in sys.path:
            sys.path.insert(0, str(candidate))
        try:
            import featureizer as _tmp
            importlib.reload(_tmp)
            FZ = _tmp
            print(f"Imported featureizer from: {candidate}")
            break
        except Exception as e:
            pass
assert FZ is not None, "Could not import featureizer.py from scripts/ or app/"

assert hasattr(FZ, "features_from_url"), "featureizer.features_from_url not found"

# 4) Wrapper to enforce schema order + dtypes (parity guard)
def to_aligned_df(url: str) -> pd.DataFrame:
    out = FZ.features_from_url(url)
    X = pd.DataFrame([out]) if isinstance(out, dict) else out.copy()
    # add missing and enforce order
    for c in FEATURE_COLS:
        if c not in X.columns: X[c] = 0
    X = X[FEATURE_COLS]
    # numerics then categorical
    for c in FEATURE_COLS:
        if c != "TLD":
            X[c] = pd.to_numeric(X[c], errors="coerce").fillna(0)
    X["TLD"] = X["TLD"].astype(str).str.lower().apply(
        lambda t: t if t in TLD_CATS else ("other" if "other" in TLD_CATS else TLD_CATS[0])
    )
    X["TLD"] = pd.Categorical(X["TLD"], categories=TLD_CATS)
    # final numeric safety
    num_cols = [c for c in FEATURE_COLS if c != "TLD"]
    X[num_cols] = X[num_cols].replace([np.inf, -np.inf], 0).fillna(0)
    return X

# 5) Schema/dtype sanity
X0 = to_aligned_df("https://example.com/?q=1")
assert list(X0.columns) == FEATURE_COLS, "Column order mismatch"
for c in FEATURE_COLS:
    if c != "TLD":
        assert pd.api.types.is_numeric_dtype(X0[c]), f"{c} not numeric"
assert pd.api.types.is_categorical_dtype(X0["TLD"]), "TLD not categorical"
print("Schema/dtype checks: OK")

# 6) Choose a demo-friendly High Recall threshold (saved one was ~0)
HR_THRESHOLD = 0.20
print(f"HR threshold (demo): {HR_THRESHOLD:.2f}")

# 7) Score a small set
tests = [
    "https://www.google.com/",
    "http://192.168.1.50/login.php?user=guest",
    "https://secure-paypa1.com.verify-account.co/reset?session=abc123",
    "http://example.com/%2F%2E%2E/redirect?to=http://bad.ru",
    "https://www.uni-mainz.de/",
]
t0 = time.time()
for u in tests:
    X = to_aligned_df(u)
    p = float(cal.predict_proba(X)[:,1][0])
    std = "PHISHING" if p>=0.5 else "benign"
    hr  = "PHISHING" if p>=HR_THRESHOLD else "benign"
    print(f"{u}\n  prob={p:.4f} | Std={std} | HR={hr}")
print(f"Total latency: {(time.time()-t0)*1000:.1f} ms for {len(tests)} URLs")


Loaded model + schema
Imported featureizer from: C:\Python313\Scripts\ AI Phishing & Malicious Link Analyzer\scripts
Schema/dtype checks: OK
HR threshold (demo): 0.20
https://www.google.com/
  prob=0.1170 | Std=benign | HR=benign
http://192.168.1.50/login.php?user=guest
  prob=0.0000 | Std=benign | HR=benign
https://secure-paypa1.com.verify-account.co/reset?session=abc123
  prob=0.0000 | Std=benign | HR=benign
http://example.com/%2F%2E%2E/redirect?to=http://bad.ru
  prob=0.0000 | Std=benign | HR=benign
https://www.uni-mainz.de/
  prob=0.9871 | Std=PHISHING | HR=PHISHING
Total latency: 172.7 ms for 5 URLs


  assert pd.api.types.is_categorical_dtype(X0["TLD"]), "TLD not categorical"


HTML(value="<h3 style='margin:8px 0;color:#e5e7eb;'>PhishGuard — URL Risk (Notebook)</h3>")

VBox(children=(Text(value='https://www.google.com/', description='Input', layout=Layout(width='100%'), placeho…

In [69]:
from IPython.display import display, HTML, Javascript
import json
import joblib
import pandas as pd
import numpy as np
from pathlib import Path
import sys

# Load your model and featureizer
ROOT = Path(".").resolve()
MODELS = ROOT / "models"

# Load schema + model
SCHEMA = json.load(open(MODELS / "schema.json", "r", encoding="utf-8"))
FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATS = SCHEMA["tld_categories"]
cal = joblib.load(MODELS / "phish_url_hgb_cal.joblib")

# Import featureizer
sys.path.insert(0, str(ROOT / "scripts"))
from featureizer import features_from_url

# Wrapper function (same as your working version)
def to_aligned_df(url: str) -> pd.DataFrame:
    out = features_from_url(url)
    X = pd.DataFrame([out]) if isinstance(out, dict) else out.copy()
    for c in FEATURE_COLS:
        if c not in X.columns: X[c] = 0
    X = X[FEATURE_COLS]
    for c in FEATURE_COLS:
        if c != "TLD":
            X[c] = pd.to_numeric(X[c], errors="coerce").fillna(0)
    X["TLD"] = X["TLD"].astype(str).str.lower().apply(
        lambda t: t if t in TLD_CATS else ("other" if "other" in TLD_CATS else TLD_CATS[0])
    )
    X["TLD"] = pd.Categorical(X["TLD"], categories=TLD_CATS)
    num_cols = [c for c in FEATURE_COLS if c != "TLD"]
    X[num_cols] = X[num_cols].replace([np.inf, -np.inf], 0).fillna(0)
    return X

# Prediction function
def predict_url(url):
    try:
        X = to_aligned_df(url)
        probability = float(cal.predict_proba(X)[:, 1][0])
        return probability
    except Exception as e:
        return f"Error: {str(e)}"

print("✅ Model loaded and ready for UI integration!")

✅ Model loaded and ready for UI integration!


In [74]:
pip install Flask

Collecting Flask
  Downloading flask-3.1.2-py3-none-any.whl.metadata (3.2 kB)
Collecting blinker>=1.9.0 (from Flask)
  Using cached blinker-1.9.0-py3-none-any.whl.metadata (1.6 kB)
Collecting itsdangerous>=2.2.0 (from Flask)
  Using cached itsdangerous-2.2.0-py3-none-any.whl.metadata (1.9 kB)
Downloading flask-3.1.2-py3-none-any.whl (103 kB)
Using cached blinker-1.9.0-py3-none-any.whl (8.5 kB)
Using cached itsdangerous-2.2.0-py3-none-any.whl (16 kB)
Installing collected packages: itsdangerous, blinker, Flask

   -------------------------- ------------- 2/3 [Flask]
   ---------------------------------------- 3/3 [Flask]

Successfully installed Flask-3.1.2 blinker-1.9.0 itsdangerous-2.2.0
Note: you may need to restart the kernel to use updated packages.


In [75]:
# CELL 1: Setup backend API in Jupyter
from IPython.display import display, HTML, Javascript
import json
import joblib
import pandas as pd
import numpy as np
from pathlib import Path
import sys
from flask import Flask, jsonify, request
import threading

# Load your model (same as your working code)
ROOT = Path(".").resolve()
MODELS = ROOT / "models"

# Load schema + model
SCHEMA = json.load(open(MODELS / "schema.json", "r", encoding="utf-8"))
FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATS = SCHEMA["tld_categories"]
cal = joblib.load(MODELS / "phish_url_hgb_cal.joblib")

# Import featureizer
sys.path.insert(0, str(ROOT / "scripts"))
from featureizer import features_from_url

# Wrapper function (same as your working version)
def to_aligned_df(url: str) -> pd.DataFrame:
    out = features_from_url(url)
    X = pd.DataFrame([out]) if isinstance(out, dict) else out.copy()
    for c in FEATURE_COLS:
        if c not in X.columns: X[c] = 0
    X = X[FEATURE_COLS]
    for c in FEATURE_COLS:
        if c != "TLD":
            X[c] = pd.to_numeric(X[c], errors="coerce").fillna(0)
    X["TLD"] = X["TLD"].astype(str).str.lower().apply(
        lambda t: t if t in TLD_CATS else ("other" if "other" in TLD_CATS else TLD_CATS[0])
    )
    X["TLD"] = pd.Categorical(X["TLD"], categories=TLD_CATS)
    num_cols = [c for c in FEATURE_COLS if c != "TLD"]
    X[num_cols] = X[num_cols].replace([np.inf, -np.inf], 0).fillna(0)
    return X

# Prediction function
def predict_url(url):
    try:
        X = to_aligned_df(url)
        probability = float(cal.predict_proba(X)[:, 1][0])
        return probability
    except Exception as e:
        return f"Error: {str(e)}"

print("✅ Model loaded and ready for UI integration!")

✅ Model loaded and ready for UI integration!


In [116]:
# === Final Model Smoke Test (Jupyter) ===
import sys, pathlib, importlib, json, joblib, pandas as pd, numpy as np, time

# 1) Resolve paths: set repo root and common folders
ROOT = pathlib.Path(".").resolve()  # if notebook is nested, adjust: pathlib.Path("..").resolve()
MODELS = ROOT / "models"
SCRIPTS = ROOT / "scripts"
APP = ROOT / "app"

assert (MODELS / "schema.json").exists(), f"Missing: {MODELS/'schema.json'}"
assert (MODELS / "phish_url_hgb_cal.joblib").exists(), f"Missing: {MODELS/'phish_url_hgb_cal.joblib'}"

# 2) Load schema + model
SCHEMA = json.load(open(MODELS / "schema.json","r",encoding="utf-8"))
FEATURE_COLS = SCHEMA["feature_cols"]
TLD_CATS = SCHEMA["tld_categories"]
cal = joblib.load(MODELS / "phish_url_hgb_cal.joblib")
print("Loaded model + schema")

# 3) Import featureizer: try scripts/featureizer.py then app/featureizer.py
FZ = None
for candidate in [SCRIPTS, APP]:
    if candidate.exists():
        if str(candidate) not in sys.path:
            sys.path.insert(0, str(candidate))
        try:
            import featureizer as _tmp
            importlib.reload(_tmp)
            FZ = _tmp
            print(f"Imported featureizer from: {candidate}")
            break
        except Exception as e:
            pass
assert FZ is not None, "Could not import featureizer.py from scripts/ or app/"

assert hasattr(FZ, "features_from_url"), "featureizer.features_from_url not found"

# 4) Wrapper to enforce schema order + dtypes (parity guard)
def to_aligned_df(url: str) -> pd.DataFrame:
    out = FZ.features_from_url(url)
    X = pd.DataFrame([out]) if isinstance(out, dict) else out.copy()
    # add missing and enforce order
    for c in FEATURE_COLS:
        if c not in X.columns: X[c] = 0
    X = X[FEATURE_COLS]
    # numerics then categorical
    for c in FEATURE_COLS:
        if c != "TLD":
            X[c] = pd.to_numeric(X[c], errors="coerce").fillna(0)
    X["TLD"] = X["TLD"].astype(str).str.lower().apply(
        lambda t: t if t in TLD_CATS else ("other" if "other" in TLD_CATS else TLD_CATS[0])
    )
    X["TLD"] = pd.Categorical(X["TLD"], categories=TLD_CATS)
    # final numeric safety
    num_cols = [c for c in FEATURE_COLS if c != "TLD"]
    X[num_cols] = X[num_cols].replace([np.inf, -np.inf], 0).fillna(0)
    return X

# 5) Schema/dtype sanity
X0 = to_aligned_df("https://example.com/?q=1")
assert list(X0.columns) == FEATURE_COLS, "Column order mismatch"
for c in FEATURE_COLS:
    if c != "TLD":
        assert pd.api.types.is_numeric_dtype(X0[c]), f"{c} not numeric"
assert pd.api.types.is_categorical_dtype(X0["TLD"]), "TLD not categorical"
print("Schema/dtype checks: OK")

# 6) Choose a demo-friendly High Recall threshold (saved one was ~0)
HR_THRESHOLD = 0.20
print(f"HR threshold (demo): {HR_THRESHOLD:.2f}")

# 7) Score a small set
tests = [
    "https://www.google.com/",
    "http://192.168.1.50/login.php?user=guest",
    "https://secure-paypa1.com.verify-account.co/reset?session=abc123",
    "http://example.com/%2F%2E%2E/redirect?to=http://bad.ru",
    "https://www.uni-mainz.de/",
]
t0 = time.time()
for u in tests:
    X = to_aligned_df(u)
    p = float(cal.predict_proba(X)[:,1][0])
    std = "PHISHING" if p>=0.5 else "benign"
    hr  = "PHISHING" if p>=HR_THRESHOLD else "benign"
    print(f"{u}\n  prob={p:.4f} | Std={std} | HR={hr}")
print(f"Total latency: {(time.time()-t0)*1000:.1f} ms for {len(tests)} URLs")

# === FIXED INTERFACE WITH BUTTON ===
from ipywidgets import interact_manual, widgets, Layout, Button, Output
import matplotlib.pyplot as plt
from IPython.display import display, clear_output

print("\n" + "="*60)
print("🎯 LIVE PHISHGUARD INTERFACE")
print("="*60)

# Create widgets
url_input = widgets.Text(
    value='https://secure-paypa1.com.verify-account.co',
    placeholder='Enter URL to analyze...',
    description='URL:',
    layout=Layout(width='80%')
)

analyze_button = Button(
    description="🔍 Analyze URL",
    button_style='primary',
    layout=Layout(width='200px', height='40px')
)

output = Output()

def on_analyze_click(button):
    with output:
        clear_output()
        url = url_input.value
        if not url:
            print("❌ Please enter a URL")
            return
            
        X = to_aligned_df(url)
        p = float(cal.predict_proba(X)[:,1][0])
        
        # Calculate verdict
        if p >= 0.5:
            verdict = "🛑 MALICIOUS"
            color = "red"
            risk_level = "HIGH RISK"
        elif p >= 0.2:
            verdict = "⚠️ SUSPICIOUS" 
            color = "orange"
            risk_level = "MEDIUM RISK"
        else:
            verdict = "✅ SAFE"
            color = "green"
            risk_level = "LOW RISK"
        
        # Display results
        print(f"\n📊 ANALYSIS RESULTS:")
        print(f"🔗 URL: {url}")
        print(f"🎯 Risk Score: {p:.4f}")
        print(f"📈 Risk Level: {risk_level}")
        print(f"⚖️ Verdict: {verdict}")
        print(f"💪 Confidence: {max(p, 1-p)*100:.1f}%")
        
        # Simple gauge visualization
        plt.figure(figsize=(10, 2))
        plt.barh([0], [p], color=color, alpha=0.7, height=0.5)
        plt.barh([0], [1-p], left=[p], color='lightgray', alpha=0.5, height=0.5)
        plt.xlim(0, 1)
        plt.title(f'Phishing Risk: {p*100:.1f}%', fontsize=14, fontweight='bold')
        plt.axis('off')
        
        # Add risk labels
        plt.text(0.1, 0, 'SAFE', ha='center', va='center', fontweight='bold')
        plt.text(0.9, 0, 'MALICIOUS', ha='center', va='center', fontweight='bold')
        plt.axvline(x=0.5, color='red', linestyle='--', alpha=0.5)
        
        plt.tight_layout()
        plt.show()
        
        # Risk factors
        print(f"\n🔍 RISK FACTORS:")
        if p > 0.7:
            factors = [
                "• High domain suspiciousness",
                "• Multiple phishing indicators", 
                "• Unusual URL structure",
                "• Likely malicious destination"
            ]
        elif p > 0.4:
            factors = [
                "• Moderate risk indicators",
                "• Some suspicious elements",
                "• Verify before clicking",
                "• Potential security risk"
            ]
        else:
            factors = [
                "• Clean URL structure",
                "• Trusted domain patterns", 
                "• Low risk indicators",
                "• Likely legitimate website"
            ]
        
        for factor in factors:
            print(factor)

# Connect button
analyze_button.on_click(on_analyze_click)

# Display interface
print("\n📱 INTERACTIVE PHISHING DETECTOR")
print("Enter URL and click Analyze:")
display(url_input)
display(analyze_button)
display(output)

print("\n💡 QUICK TEST URLS:")
test_urls = [
    ("https://www.google.com/", "Safe"),
    ("https://secure-paypa1.com.verify-account.co", "Suspicious"),
    ("http://192.168.1.1/login.php", "High Risk"), 
    ("https://www.uni-mainz.de/", "Phishing - should be 0.9871"),
    ("https://www.paypal.com/", "Safe")
]

for url, desc in test_urls:
    test_btn = Button(description=f"Test: {desc}", layout=Layout(width='200px', margin='5px'))
    test_btn.on_click(lambda x, u=url: (setattr(url_input, 'value', u), on_analyze_click(None)))
    display(test_btn)

# Auto-analyze the first test URL
print("\n🔄 Auto-analyzing test URL...")
url_input.value = "https://www.uni-mainz.de/"
on_analyze_click(None)

Loaded model + schema
Imported featureizer from: C:\Python313\Scripts\ AI Phishing & Malicious Link Analyzer\scripts
Schema/dtype checks: OK
HR threshold (demo): 0.20
https://www.google.com/
  prob=0.1170 | Std=benign | HR=benign
http://192.168.1.50/login.php?user=guest
  prob=0.0000 | Std=benign | HR=benign
https://secure-paypa1.com.verify-account.co/reset?session=abc123
  prob=0.0000 | Std=benign | HR=benign
http://example.com/%2F%2E%2E/redirect?to=http://bad.ru
  prob=0.0000 | Std=benign | HR=benign
https://www.uni-mainz.de/
  prob=0.9871 | Std=PHISHING | HR=PHISHING
Total latency: 159.7 ms for 5 URLs

🎯 LIVE PHISHGUARD INTERFACE

📱 INTERACTIVE PHISHING DETECTOR
Enter URL and click Analyze:


Text(value='https://secure-paypa1.com.verify-account.co', description='URL:', layout=Layout(width='80%'), plac…

Button(button_style='primary', description='🔍 Analyze URL', layout=Layout(height='40px', width='200px'), style…

Output()


💡 QUICK TEST URLS:


Button(description='Test: Safe', layout=Layout(margin='5px', width='200px'), style=ButtonStyle())

Button(description='Test: Suspicious', layout=Layout(margin='5px', width='200px'), style=ButtonStyle())

Button(description='Test: High Risk', layout=Layout(margin='5px', width='200px'), style=ButtonStyle())

Button(description='Test: Phishing - should be 0.9871', layout=Layout(margin='5px', width='200px'), style=Butt…

Button(description='Test: Safe', layout=Layout(margin='5px', width='200px'), style=ButtonStyle())


🔄 Auto-analyzing test URL...
