#Mount google drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

file_path = '/content/drive/My Drive/'

print("‚úÖ Google Drive mounted successfully!")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
‚úÖ Google Drive mounted successfully!



#install package

In [2]:
!pip install fastapi uvicorn nest-asyncio python-multipart openai requests tensorflow scikit-learn tldextract whois

print("‚úÖ Dependencies installed successfully!")

‚úÖ Dependencies installed successfully!


#import library

In [3]:
import nest_asyncio
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
import requests
import numpy as np
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Layer
import re
import math
from collections import Counter
from urllib.parse import urlparse
import joblib
from openai import OpenAI
import json
import tensorflow as tf
from tensorflow.keras.utils import register_keras_serializable
import os
import warnings
from getpass import getpass
from threading import Thread
import time
warnings.filterwarnings('ignore')

print("‚úÖ All libraries imported successfully!")

‚úÖ All libraries imported successfully!


#setup colab public url

In [4]:
nest_asyncio.apply()

# Get Colab's public URL
try:
    from google.colab.output import eval_js
    colab_public_url = eval_js("google.colab.kernel.proxyPort(8000)")
    print(f"üåê Colab Public URL: {colab_public_url}")
except:
    colab_public_url = "http://localhost:8000"
    print("üîß Using localhost")

print("‚úÖ Colab setup complete!")

üåê Colab Public URL: https://8000-gpu-t4-s-21wrve7j1hugn-c.us-east1-1.prod.colab.dev
‚úÖ Colab setup complete!


#Setup fastapi

In [5]:
app = FastAPI(title="Phishing URL Analyzer - Colab Version")

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

print("‚úÖ FastAPI app initialized!")

‚úÖ FastAPI app initialized!


In [6]:
try:
    api_key = getpass("üîë Enter your OpenAI API key: ")
    client = OpenAI(api_key=api_key)
    print("‚úÖ OpenAI client configured!")
except:
    print("‚ö†Ô∏è No OpenAI API key provided. LLM features will be disabled.")
    client = None

üîë Enter your OpenAI API key: ¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑
‚úÖ OpenAI client configured!


#Check files

In [None]:
print("üìÅ Please upload your model files:")
print("   - scaler.joblib")
print("   - tokenizer.joblib")
print("   - labelencoder.joblib")
print("   - model.keras")
print()
print("üí° If you don't have these files, the system will use demo predictions.")

existing_files = []
for file in ['scaler path',
             'tokenizer path',
             'label_encoder path',
             'model path']:
    if os.path.exists(file):
        existing_files.append(file)

if existing_files:
    print(f"‚úÖ Found files: {existing_files}")
else:
    print("üîß No model files found. Using demo mode.")

üìÅ Please upload your model files:
   - scaler.joblib
   - tokenizer.joblib
   - labelencoder.joblib
   - model.keras

üí° If you don't have these files, the system will use demo predictions.
‚úÖ Found files: ['/content/drive/MyDrive/Project/phissing_research/LLMAntiPhish/models/scaler.joblib', '/content/drive/MyDrive/Project/phissing_research/LLMAntiPhish/models/tokenizer.joblib', '/content/drive/MyDrive/Project/phissing_research/LLMAntiPhish/models/label_encoder.joblib', '/content/drive/MyDrive/Project/phissing_research/LLMAntiPhish/models/bilstm_model.keras']


#Setup Model

In [None]:
maxlen = 50

# Custom Attention Layer
@register_keras_serializable()
class Attention(Layer):
    def build(self, input_shape):
        self.W = self.add_weight(shape=(input_shape[-1], input_shape[-1]),
                                 initializer='glorot_uniform', trainable=True)
        self.b = self.add_weight(shape=(input_shape[-1],), initializer='zeros', trainable=True)
        self.u = self.add_weight(shape=(input_shape[-1], 1),
                                 initializer='glorot_uniform', trainable=True)

    def call(self, x):
        u_it = tf.tanh(tf.tensordot(x, self.W, axes=1) + self.b)
        a_it = tf.nn.softmax(tf.tensordot(u_it, self.u, axes=1), axis=1)
        return tf.reduce_sum(x * a_it, axis=1)

#Fallback model
class FallbackModel:
    def __init__(self):
        self.pattern_probs = {
            # Safe domains
            'google.com': [0.85, 0.15],
            'facebook.com': [0.82, 0.18],
            'github.com': [0.88, 0.12],
            'amazon.com': [0.80, 0.20],
            'microsoft.com': [0.83, 0.17],
            'example.com': [0.75, 0.25],
            # Suspicious patterns
            'paypal': [0.25, 0.75],
            'login': [0.35, 0.65],
            'verify': [0.30, 0.70],
            'banking': [0.28, 0.72],
            'secure': [0.32, 0.68],
            # Default
            'default': [0.60, 0.40]
        }

    def predict(self, x):
        if hasattr(self, 'last_url'):
            url = self.last_url.lower()

            for pattern, prob in self.pattern_probs.items():
                if pattern in url and pattern != 'default':
                    return np.array([prob])

            suspicious_keywords = ['paypal', 'login', 'verify', 'banking', 'secure', 'account', 'password']
            safe_keywords = ['google', 'facebook', 'amazon', 'microsoft', 'github', 'official']

            if any(suspicious in url for suspicious in suspicious_keywords):
                return np.array([[0.3, 0.7]])
            elif any(safe in url for safe in safe_keywords):
                return np.array([[0.8, 0.2]])

        return np.array([[0.6, 0.4]])

scaler, tokenizer, le, model = None, None, None, None
model_files = {
    'scaler': 'scaler path',
    'tokenizer': 'tokenzizer path',
    'labelencoder': 'label_encoder path',
    'model': 'model path'
}

print("üîç Loading REAL models from Google Drive...")

try:
    # Load Scaler
    if os.path.exists(model_files['scaler']):
        scaler = joblib.load(model_files['scaler'])
        print("‚úÖ Scaler loaded successfully!")
    else:
        print("‚ùå Scaler file not found")

    # Load Tokenizer
    if os.path.exists(model_files['tokenizer']):
        tokenizer = joblib.load(model_files['tokenizer'])
        print("‚úÖ Tokenizer loaded successfully!")
    else:
        print("‚ùå Tokenizer file not found")

    # Load LabelEncoder
    if os.path.exists(model_files['labelencoder']):
        le = joblib.load(model_files['labelencoder'])
        print("‚úÖ LabelEncoder loaded successfully!")
    else:
        print("‚ùå LabelEncoder file not found")

    # Load Main Model
    if os.path.exists(model_files['model']):
        model = load_model(model_files['model'], custom_objects={"Attention": Attention})
        print("‚úÖ BiLSTM Model loaded successfully!")
    else:
        print("‚ùå Model file not found")

except Exception as e:
    print(f"‚ö†Ô∏è Error loading model files: {e}")

# Check if all models loaded successfully
if all([scaler, tokenizer, le, model]):
    print("\nüéâ SUCCESS: All REAL models loaded! Using trained BiLSTM model.")
    print("üìä Model will provide consistent and accurate predictions.")
else:
    print("\n‚ö†Ô∏è WARNING: Some model files are missing. Using fallback model.")
    missing_components = []
    if scaler is None: missing_components.append("scaler")
    if tokenizer is None: missing_components.append("tokenizer")
    if le is None: missing_components.append("labelencoder")
    if model is None: missing_components.append("model")

    print(f"‚ùå Missing: {', '.join(missing_components)}")
    print("üîß Using fallback pattern-based model (consistent predictions)")
    model = FallbackModel()

print("\n‚úÖ Model components setup complete!")

üîç Loading REAL models from Google Drive...
‚úÖ Scaler loaded successfully!
‚úÖ Tokenizer loaded successfully!
‚úÖ LabelEncoder loaded successfully!
‚úÖ BiLSTM Model loaded successfully!

üéâ SUCCESS: All REAL models loaded! Using trained BiLSTM model.
üìä Model will provide consistent and accurate predictions.

‚úÖ Model components setup complete!


#Build FallbackModel

In [None]:
class FallbackModel:
    def __init__(self):
        self.pattern_probs = {
            # Safe domains
            'google.com': [0.85, 0.15],
            'facebook.com': [0.82, 0.18],
            'github.com': [0.88, 0.12],
            'amazon.com': [0.80, 0.20],
            'microsoft.com': [0.83, 0.17],
            'example.com': [0.75, 0.25],
            # Suspicious patterns
            'paypal': [0.25, 0.75],
            'login': [0.35, 0.65],
            'verify': [0.30, 0.70],
            'banking': [0.28, 0.72],
            'secure': [0.32, 0.68],
            # Default
            'default': [0.60, 0.40]
        }

    def predict_proper(self, url):
        url_lower = url.lower()

        for pattern, prob in self.pattern_probs.items():
            if pattern in url_lower and pattern != 'default':
                return np.array([prob])

        suspicious_keywords = ['paypal', 'login', 'verify', 'banking', 'secure', 'account', 'password']
        safe_keywords = ['google', 'facebook', 'amazon', 'microsoft', 'github', 'official']

        if any(suspicious in url_lower for suspicious in suspicious_keywords):
            return np.array([[0.3, 0.7]])
        elif any(safe in url_lower for safe in safe_keywords):
            return np.array([[0.8, 0.2]])

        return np.array([[0.6, 0.4]])

    def predict(self, x):
        return np.array([[0.5, 0.5]])

#Feature extraction

In [10]:
BRAND_KEYWORDS = ["paypal","apple","amazon","bank","chase","facebook","meta","google","microsoft",
                  "outlook","office365","instagram","line","kbank","scb","krungsri","kplus"]

COMMON_TLDS = set([
 "com","net","org","info","biz","co","io","ai","app","edu","gov","mil","ru","de","uk","cn","fr","jp","br","in","it","es","au","nl","se","no"
])

def parse_host_and_scheme(url: str):
    try:
        p = urlparse(url if '://' in url else 'http://' + url)
        return (p.hostname or "").lower(), (p.scheme or "").lower()
    except:
        return "", ""

def is_ip_host(host: str):
    return bool(re.fullmatch(r"(?:\d{1,3}\.){3}\d{1,3}", host or ""))

def count_subdomains(host: str):
    if not host: return 0
    return max(0, len(host.split(".")) - 2)

def has_double_slash_in_path(url: str):
    return "//" in (urlparse(url if '://' in url else 'http://' + url).path or "")

def has_tld_in_path(url: str):
    path = (urlparse(url if '://' in url else 'http://' + url).path or "").lower()
    return any(("."+tld) in path for tld in COMMON_TLDS)

def has_symbols_in_domain(host: str):
    return bool(re.search(r"[^a-z0-9\.-]", host or ""))

def domain_prefix_suffix_like_brand(host: str):
    if not host: return False
    first = host.split(".")[0]
    return any(b in first and "-" in first for b in BRAND_KEYWORDS)

def brand_in_path_or_subdomain(host: str, url: str):
    text = ((host or "") + " " + (urlparse(url).path or "") + " " + (urlparse(url).query or "")).lower()
    return any(b in text for b in BRAND_KEYWORDS)

def digit_count(url: str):
    return sum(c.isdigit() for c in url)

def url_length(url: str):
    return len(url)

def url_entropy(url: str):
    if not url: return 0.0
    counts = Counter(url)
    total = len(url)
    return -sum((c/total) * math.log2(c/total) for c in counts.values())

def fetch_html(url):
    try:
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
        r = requests.get(url, timeout=5, headers=headers)
        return r.text
    except Exception as e:
        print(f"‚ö†Ô∏è Could not fetch HTML: {e}")
        return ""

def extract_html_features(html):
    hrefs = re.findall(r'href=[\"\'](.*?)[\"\']', html or '', flags=re.IGNORECASE)
    forms = re.findall(r'<form[^>]+action=[\"\'](.*?)[\"\']', html or '', flags=re.IGNORECASE)
    imgs = re.findall(r'<img[^>]+src=[\"\'](.*?)[\"\']', html or '', flags=re.IGNORECASE)
    scripts = re.findall(r'<script[^>]+src=[\"\'](.*?)[\"\']', html or '', flags=re.IGNORECASE)
    links_tag = re.findall(r'<link[^>]+href=[\"\'](.*?)[\"\']', html or '', flags=re.IGNORECASE)
    meta_keywords = re.findall(r'<meta[^>]+name=[\"\']keywords[\"\'][^>]+content=[\"\'](.*?)[\"\']', html or '', flags=re.IGNORECASE)
    return {'hrefs': hrefs, 'forms': forms, 'imgs': imgs, 'scripts': scripts, 'links_tag': links_tag, 'meta_keywords': meta_keywords}

def abnormal_links(hrefs):
    return any(h.strip().lower().startswith(('javascript:','mailto:','data:')) for h in hrefs)

def forms_action_abnormal(forms, host):
    for a in forms:
        if a and host not in a and not a.startswith('/') and not a.startswith('#'):
            return True
    return False

def anchors_point_elsewhere(hrefs, host):
    count = sum(1 for h in hrefs if host and host not in h and h.startswith('http'))
    total = max(1, len(hrefs))
    return (count / total) > 0.5

def meta_keyword_mismatch(meta_keywords, host):
    if not meta_keywords: return False
    for kw in meta_keywords:
        if host and host.split('.')[0] not in kw:
            return True
    return False

print("‚úÖ Feature extraction functions defined!")

‚úÖ Feature extraction functions defined!


#Veirify Model

In [None]:
print("üîç Verifying model status...")

def verify_model():
    if hasattr(model, 'predict'):
        print("‚úÖ Model has predict method")
    else:
        print("‚ùå Model missing predict method")

    if scaler is not None:
        print("‚úÖ Scaler: Loaded")
    else:
        print("‚ùå Scaler: Not loaded")

    if tokenizer is not None:
        print("‚úÖ Tokenizer: Loaded")
    else:
        print("‚ùå Tokenizer: Not loaded")

    if le is not None:
        print("‚úÖ LabelEncoder: Loaded")
        print(f"   Classes: {le.classes_}")
    else:
        print("‚ùå LabelEncoder: Not loaded")

    # Check model type more accurately
    if hasattr(model, '_is_graph_network') or hasattr(model, 'layers'):
        print("üìä Model Type: BiLSTM Model (TensorFlow/Keras)")
        print(f"üìê Model input layers: {[layer.name for layer in model.inputs]}")
    else:
        print("üìä Model Type: Fallback Model")

verify_model()

# Test prediction with PROPER input for BiLSTM model
print("\nüß™ Testing prediction with sample URL...")
test_url = "https://www.google.com"

try:
    # Check if this is a TensorFlow model
    if hasattr(model, '_is_graph_network') or hasattr(model, 'layers'):
        print("üéØ Using BiLSTM model prediction...")

        # Extract features like in the analyze function
        host, scheme = parse_host_and_scheme(test_url)
        print(f"üîç Extracted host: {host}")

        # Prepare structural features
        structural_features = [
            int(is_ip_host(host)),
            count_subdomains(host),
            int(has_double_slash_in_path(test_url)),
            int(has_tld_in_path(test_url)),
            int(has_symbols_in_domain(host)),
            int(domain_prefix_suffix_like_brand(host)),
            int(brand_in_path_or_subdomain(host, test_url)),
            len(test_url),
            1 if scheme == 'https' else 0,
            digit_count(test_url),
            url_entropy(test_url)
        ]

        print(f"üìä Structural features: {structural_features}")

        # Scale the features
        struct_feat = scaler.transform([structural_features])
        print(f"üìà Scaled features shape: {struct_feat.shape}")

        # Prepare sequence features
        if tokenizer:
            sequences = tokenizer.texts_to_sequences([test_url])
            print(f"üî§ Tokenized sequences: {sequences}")
            seq = pad_sequences(sequences, maxlen=maxlen)
            print(f"üìè Padded sequence shape: {seq.shape}")
        else:
            print("‚ùå Tokenizer not available")
            seq = np.zeros((1, maxlen))  # Fallback

        # Make prediction
        print("ü§ñ Making prediction...")
        prediction = model.predict([seq, struct_feat], verbose=0)
        print(f"üìà Prediction shape: {prediction.shape}")
        print(f"üéØ Prediction values: {prediction}")

        # Decode prediction
        predicted_class_idx = np.argmax(prediction[0])
        predicted_class = le.inverse_transform([predicted_class_idx])[0]
        confidence = np.max(prediction[0])

        class_names = {0: 'Safe', 1: 'Phishing'}
        human_readable_class = class_names.get(predicted_class_idx, predicted_class)

        print(f"üîÆ Predicted: {human_readable_class}")
        print(f"üìä Confidence: {confidence:.4f}")
        print(f"üî¢ Raw probabilities: Safe={prediction[0][0]:.4f}, Phishing={prediction[0][1]:.4f}")

    else:
        print("üéØ Using fallback model prediction...")
        if hasattr(model, 'predict_proper'):
            prediction = model.predict_proper(test_url)
        else:
            safe_prob = 0.8 if 'google' in test_url else 0.4
            phishing_prob = 1 - safe_prob
            prediction = np.array([[safe_prob, phishing_prob]])

        print(f"üìà Prediction values: {prediction}")
        safe_prob = prediction[0][0]
        phishing_prob = prediction[0][1]
        label = "Safe" if safe_prob > phishing_prob else "Phishing"
        print(f"üîÆ Predicted: {label}")
        print(f"üìä Probabilities: Safe={safe_prob:.4f}, Phishing={phishing_prob:.4f}")

except Exception as e:
    print(f"‚ùå Prediction test failed: {e}")
    import traceback
    print(f"üîç Detailed error: {traceback.format_exc()}")

üîç Verifying model status...
‚úÖ Model has predict method
‚úÖ Scaler: Loaded
‚úÖ Tokenizer: Loaded
‚úÖ LabelEncoder: Loaded
   Classes: [0 1]
üìä Model Type: BiLSTM Model (TensorFlow/Keras)
üìê Model input layers: ['input_layer_1', 'input_layer_2']

üß™ Testing prediction with sample URL...
üéØ Using BiLSTM model prediction...
üîç Extracted host: www.google.com
üìä Structural features: [0, 1, 0, 0, 0, 0, 1, 22, 1, 0, 3.6635327548042547]
üìà Scaled features shape: (1, 11)
üî§ Tokenized sequences: [[10, 3, 3, 7, 5, 22, 2, 2, 16, 16, 16, 9, 20, 6, 6, 20, 15, 4, 9, 11, 6, 12]]
üìè Padded sequence shape: (1, 50)
ü§ñ Making prediction...
üìà Prediction shape: (1, 2)
üéØ Prediction values: [[0.8994778 0.1005222]]
üîÆ Predicted: Safe
üìä Confidence: 0.8995
üî¢ Raw probabilities: Safe=0.8995, Phishing=0.1005


#Load data from phishtank

In [None]:
import gzip
import csv
import io
import requests

def load_phishtank_database():
    print("[INFO] Downloading PhishTank dataset...")
    url = "http://data.phishtank.com/data/online-valid.csv.gz"
    r = requests.get(url, timeout=15)
    r.raise_for_status()

    data = gzip.decompress(r.content)
    csv_data = csv.DictReader(io.StringIO(data.decode()))

    urls = {row['url'] for row in csv_data}
    print(f"[INFO] Loaded {len(urls)} phishing URLs from PhishTank.")
    return urls

phishtank_cache = load_phishtank_database()


[INFO] Downloading PhishTank dataset...
[INFO] Loaded 49525 phishing URLs from PhishTank.


#load data and setup cache interval openphish

In [None]:
OPENPHISH_FEED_URL = "https://openphish.com/feed.txt"
_openphish_cache = None
_openphish_last_loaded = None

def load_openphish_database(feed_url=OPENPHISH_FEED_URL, timeout=20):

    global _openphish_cache, _openphish_last_loaded
    try:
        print(f"[INFO] Downloading OpenPhish feed from {feed_url} ...")
        r = requests.get(feed_url, timeout=timeout)
        r.raise_for_status()
        text = r.text
        urls = set()
        for line in text.splitlines():
            u = line.strip()
            if not u:
                continue
            u_norm = normalize_url_for_lookup(u)
            urls.add(u_norm)
        _openphish_cache = urls
        _openphish_last_loaded = time.time()
        print(f"[INFO] Loaded {len(urls)} URLs from OpenPhish.")
        return urls
    except Exception as e:
        print(f"[ERROR] Failed to load OpenPhish feed: {e}")
        return _openphish_cache if _openphish_cache is not None else set()

def normalize_url_for_lookup(url):

    u = url.strip().lower()
    if '#' in u:
        u = u.split('#', 1)[0]
    if u.endswith('/'):
        u = u[:-1]
    return u

def ensure_openphish_loaded(force_reload=False):

    global _openphish_cache, _openphish_last_loaded
    if _openphish_cache is None or force_reload:
        load_openphish_database()
    return _openphish_cache

def refresh_openphish_cache_interval(hours=24):
    global _openphish_last_loaded
    if _openphish_last_loaded is None:
        load_openphish_database()
        return
    age = (time.time() - _openphish_last_loaded) / 3600.0
    if age >= hours:
        print(f"[INFO] OpenPhish cache older than {hours}h (age={age:.2f}h). Reloading...")
        load_openphish_database()

#Analysis extraction data

In [None]:
import tldextract
import datetime
import whois
import re
import math
import requests

def digit_count(url):
    return sum(c.isdigit() for c in url)

def url_length(url):
    return len(url)

def url_entropy(url):
    # Shannon entropy
    prob = [float(url.count(c)) / len(url) for c in set(url)]
    return -sum(p * math.log2(p) for p in prob)

def is_shortened_url(url):
    shortened_domains = ["bit.ly", "tinyurl.com", "goo.gl", "t.co", "ow.ly"]
    for d in shortened_domains:
        if d in url:
            return True
    return False

def suspicious_tld(host):
    suspicious = ["xyz", "top", "club", "online"]
    ext = tldextract.extract(host).suffix
    return ext in suspicious

def domain_age_months(host):
    try:
        w = whois.whois(host)
        creation_date = w.creation_date
        if isinstance(creation_date, list):
            creation_date = creation_date[0]
        if creation_date is None:
            return 0
        delta = datetime.datetime.now() - creation_date
        return delta.days // 30
    except:
        return 0

def homoglyph_in_domain(host):
    pattern = r"[0-9@!$]"
    return bool(re.search(pattern, host))


def in_phishtank(url):

    normalized_url = url.strip().lower()
    return normalized_url in phishtank_cache

def in_openphish(url):

    ensure_openphish_loaded()
    if not _openphish_cache:
        return False
    u_norm = normalize_url_for_lookup(url)
    return u_norm in _openphish_cache




def phishing_score(url, html):
    host, scheme = parse_host_and_scheme(url)
    features = extract_html_features(html)
    score = 0
    reasons = []

    # ---- Rule-based scoring ----
    if is_ip_host(host):
        score += 2; reasons.append("Host is an IP address")
    if count_subdomains(host) > 2:
        score += 1; reasons.append("Too many subdomains")
    if has_symbols_in_domain(host):
        score += 1; reasons.append("Suspicious symbols in domain")
    if domain_prefix_suffix_like_brand(host):
        score += 2; reasons.append("Domain mimics brand with hyphens")
    if brand_in_path_or_subdomain(host, url):
        score += 1; reasons.append("Brand keywords in path or subdomain")
    if has_double_slash_in_path(url):
        score += 1; reasons.append("Double slash in path")
    if has_tld_in_path(url):
        score += 1; reasons.append("TLD in path")
    if abnormal_links(features['hrefs']):
        score += 1; reasons.append("Suspicious links found")
    if forms_action_abnormal(features['forms'], host):
        score += 2; reasons.append("Suspicious form actions")
    if anchors_point_elsewhere(features['hrefs'], host):
        score += 1; reasons.append("Many anchors point elsewhere")
    if meta_keyword_mismatch(features['meta_keywords'], host):
        score += 1; reasons.append("Meta keywords mismatch")

    # ---- URL-based scoring ----
    dcount = digit_count(url)
    ulen = url_length(url)
    uentropy = url_entropy(url)

    if dcount > 5:
        score += 1; reasons.append(f"Too many digits ({dcount})")
    if ulen > 75:
        score += 1; reasons.append(f"URL too long ({ulen} chars)")
    if uentropy > 4.0:
        score += 1; reasons.append(f"High URL entropy ({uentropy:.2f})")

    if is_shortened_url(url):
        score += 1; reasons.append("Uses URL shortening service")
    if suspicious_tld(host):
        score += 1; reasons.append("Suspicious TLD")
    if domain_age_months(host) < 6:
        score += 1; reasons.append("Domain is newly registered (<6 months)")
    if homoglyph_in_domain(host):
        score += 2; reasons.append("Homoglyph/symbol characters in domain")

    # ---- ‡∏ï‡∏£‡∏ß‡∏à‡∏™‡∏≠‡∏ö‡∏Å‡∏±‡∏ö PhishTank dataset ----
    in_phishtank_flag = in_phishtank(url)
    if in_phishtank_flag:
        score += 5
        reasons.append("URL found in PhishTank blacklist")

    try:
        in_openphish_flag = in_openphish(url)
    except Exception as e:
        print(f"[WARN] OpenPhish lookup failed: {e}")
        in_openphish_flag = False

    if in_openphish_flag:
        score += 5
        reasons.append("URL found in OpenPhish feed")

    # update features
    features.update({
        "digit_count": dcount,
        "url_length": ulen,
        "url_entropy": uentropy,
        "in_phishtank": in_phishtank_flag,
        "in_openphish": in_openphish_flag
    })

    return score, reasons, features, host, scheme




def predict_url(url):
    host, scheme = parse_host_and_scheme(url)

    # Store URL for demo model
    if hasattr(model, 'last_url'):
        model.last_url = url

    if scaler is None or tokenizer is None or le is None:
        pred = model.predict([None])[0]
        label = "Likely Safe" if pred[0] > pred[1] else "Likely Phishing"
        return label, pred

    # Actual prediction with real model
    try:
        struct_feat = scaler.transform([[
            int(is_ip_host(host)),
            count_subdomains(host),
            int(has_double_slash_in_path(url)),
            int(has_tld_in_path(url)),
            int(has_symbols_in_domain(host)),
            int(domain_prefix_suffix_like_brand(host)),
            int(brand_in_path_or_subdomain(host, url)),
            len(url),
            1 if scheme == 'https' else 0,
            digit_count(url),
            url_entropy(url)
        ]])

        seq = pad_sequences(tokenizer.texts_to_sequences([url]), maxlen=maxlen)
        pred = model.predict([seq, struct_feat])[0]
        label = le.inverse_transform([np.argmax(pred)])[0]
        return label, pred

    except Exception as e:
        print(f"‚ö†Ô∏è Prediction error: {e}")
        pred = model.predict([None])[0]
        label = "Likely Safe" if pred[0] > pred[1] else "Likely Phishing"
        return label, pred

print("‚úÖ Analysis functions defined!")

‚úÖ Analysis functions defined!


#Set API Endpoints

In [None]:
from fastapi import FastAPI
from pydantic import BaseModel
import numpy as np
import json

app = FastAPI()

class URLRequest(BaseModel):
    url: str
    call_llm: bool = True

def clean_numpy(obj):
    if isinstance(obj, dict):
        return {k: clean_numpy(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [clean_numpy(x) for x in obj]
    elif isinstance(obj, (np.integer, np.int64)):
        return int(obj)
    elif isinstance(obj, (np.floating, np.float64)):
        return float(obj)
    else:
        return obj

@app.post("/analyze")
def analyze(request: URLRequest):
    url = request.url
    print(f"üîç Analyzing URL: {url}")

    html = fetch_html(url)
    score, reasons, features, host, scheme = phishing_score(url, html)

    # BiLSTM prediction
    bilstm_label, bilstm_prob_array = predict_url(url)
    label_idx = np.argmax(bilstm_prob_array)
    bilstm_prob = float(bilstm_prob_array[label_idx])
    score = int(score)

    llm_result = None
    if request.call_llm and client:
        prompt = f"""
Analyze this URL for phishing potential:

URL: {url}
Host: {host}
Scheme: {scheme}

AI Prediction: {bilstm_label} (confidence={bilstm_prob:.2f})
Rule-based Risk Score: {score}/15

Triggered Alerts:
- {"\n- ".join(reasons)}

Technical Features:
- Digit count: {features.get('digit_count')}
- URL length: {features.get('url_length')}
- URL entropy: {features.get('url_entropy'):.2f}
- External links: {len(features.get('hrefs', []))}
- Images: {len(features.get('imgs', []))}
- Scripts: {len(features.get('scripts', []))}
- Forms: {len(features.get('forms', []))}

Provide a concise analysis (2-3 sentences) and final verdict as 'Likely Phishing' or 'Likely Safe'.
Return JSON format:
{{
    "verdict": "...",
    "reason_list": ["...","..."],
    "summary": "..."
}}
"""
        try:
            response = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role":"user", "content": prompt}],
                temperature=0
            )
            raw_text = response.choices[0].message.content.strip()

            # Clean JSON response
            if raw_text.startswith("```json"):
                raw_text = raw_text[7:-3].strip()
            elif raw_text.startswith("```"):
                raw_text = raw_text[3:-3].strip()

            llm_result = json.loads(raw_text)

        except Exception as e:
            llm_result = {
                "verdict": "Analysis Error",
                "reason_list": [f"LLM processing failed: {str(e)}"],
                "summary": "Could not complete AI analysis"
            }
    elif request.call_llm and not client:
        llm_result = {
            "verdict": "No API Key",
            "reason_list": ["OpenAI API key not provided"],
            "summary": "LLM analysis disabled. Please provide an OpenAI API key."
        }

    response = {
        "url": url,
        "score": score,
        "reasons": reasons,
        "features": features,
        "bilstm_label": bilstm_label,
        "bilstm_prob": bilstm_prob,
        "llm_result": llm_result,
        "host": host,
        "scheme": scheme,
        "in_phishtank": features.get("in_phishtank", False),
    }

    # convert numpy type to Python type recursive
    response_clean = clean_numpy(response)
    return response_clean


@app.get("/")
def root():
    return {"message": "Phishing URL Analyzer API is running!", "status": "active"}


@app.get("/test")
def test_analysis():
    test_urls = ["https://www.google.com", "https://www.github.com", "http://example.com"]
    results = []
    for test_url in test_urls:
        try:
            result = analyze(URLRequest(url=test_url, call_llm=False))
            results.append({
                "url": test_url,
                "bilstm_label": result["bilstm_label"],
                "score": result["score"]
            })
        except Exception as e:
            results.append({"url": test_url, "error": f"Analysis failed: {str(e)}"})
    return {
        "test_results": results,
        "message": "API is working!",
        "web_interface": f"{colab_public_url}/web"
    }

print("‚úÖ API endpoints defined!")


‚úÖ API endpoints defined!


#Create web interface in "/web"

In [16]:
@app.get("/web", response_class=HTMLResponse)
def web_interface():
    return """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Phishing URL Analyzer</title>
<style>
* { margin:0; padding:0; box-sizing:border-box; }
body { font-family:'Segoe UI',Tahoma,Verdana,sans-serif; background:linear-gradient(135deg,#667eea,#764ba2); min-height:100vh; padding:20px; }
.container { max-width:900px; margin:0 auto; background:white; border-radius:15px; box-shadow:0 20px 40px rgba(0,0,0,0.1); overflow:hidden; }
.header { background:linear-gradient(135deg,#007BFF,#0056b3); color:white; padding:30px; text-align:center; }
.header h1 { font-size:2.2em; margin-bottom:10px; }
.demo-notice { background:#fff3cd; color:#856404; padding:10px; border-radius:5px; margin-top:10px; }
.input-section { padding:30px; background:#f8f9fa; border-bottom:1px solid #e9ecef; }
.input-group { display:flex; gap:10px; }
#urlInput { flex:1; padding:15px; border:2px solid #e9ecef; border-radius:8px; font-size:16px; }
button { padding:15px 25px; background:#007BFF; color:white; border:none; border-radius:8px; font-size:16px; cursor:pointer; }
button:hover { background:#0056b3; }
.loading { display:none; text-align:center; padding:20px; color:#007BFF; }
.spinner { border:4px solid #f3f3f3; border-top:4px solid #007BFF; border-radius:50%; width:40px; height:40px; animation:spin 1s linear infinite; margin:0 auto 10px; }
@keyframes spin { 0%{transform:rotate(0);}100%{transform:rotate(360deg);} }
.result-section { padding:30px; }
.result-card { background:white; border-radius:10px; padding:25px; margin-bottom:20px; box-shadow:0 5px 15px rgba(0,0,0,0.1); border-left:5px solid #007BFF; }
.section-title { font-size:1.3em; font-weight:bold; color:#333; margin-bottom:10px; }
.reason-list, .feature-list { list-style:none; padding:0; }
.reason-list li, .feature-list li { padding:6px 0; border-bottom:1px solid #f8f9fa; }
.error-message { background:#f8d7da; color:#721c24; padding:15px; border-radius:8px; margin:20px 0; }
.url-display { background:#e7f3ff; padding:15px; border-radius:8px; margin-bottom:20px; word-break:break-all; }

/* üé® Gradient risk bar styles */
.probability-bar {
  background: #e9ecef;
  border-radius: 10px;
  height: 25px;
  position: relative;
  overflow: hidden;
}

.probability-fill {
  height: 100%;
  transition: width .5s, background-color .5s;
  border-radius: 10px 0 0 10px;
}

.probability-labels {
  display: flex;
  justify-content: space-between;
  font-size: 0.9em;
  color: #555;
  margin-top: 5px;
}
</style>
</head>
<body>
<div class="container">
    <div class="header">
        <h1>üîç Phishing URL Analyzer</h1>
        <p>Analyze URLs for potential phishing threats using AI and rules</p>
        <div class="demo-notice"><strong>Demo Mode:</strong> Upload model files for real predictions.</div>
    </div>
    <div class="input-section">
        <div class="input-group">
            <input type="text" id="urlInput" placeholder="Enter URL (https://example.com)">
            <button onclick="analyzeUrl()">Analyze URL</button>
        </div>
        <div id="loading" class="loading">
            <div class="spinner"></div>
            <p>Analyzing URL...</p>
        </div>
    </div>
    <div id="resultSection" class="result-section">
        <div class="result-card">
            <div class="section-title">üéØ How to Use</div>
            <p>Enter a URL to analyze its phishing potential using AI + rules.</p>
            <p><strong>Examples:</strong> https://www.google.com, https://github.com</p>
        </div>
    </div>
</div>

<script>
function analyzeUrl() {
    const url = document.getElementById('urlInput').value.trim();
    const loading = document.getElementById('loading');
    const resultSection = document.getElementById('resultSection');
    const button = document.querySelector('button');

    if (!url) return alert('Please enter a URL.');
    if (!url.startsWith('http')) return alert('URL must start with http:// or https://');

    loading.style.display = 'block';
    resultSection.innerHTML = '';
    button.disabled = true;

    fetch('/analyze', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ url: url, call_llm: true })
    })
    .then(r => r.json())
    .then(data => displayResults(data))
    .catch(err => {
        resultSection.innerHTML = `<div class="error-message"><strong>Error:</strong> ${err}</div>`;
    })
    .finally(() => { loading.style.display = 'none'; button.disabled = false; });
}

function getColor(prob) {
    if (prob < 0.65) return "#28a745";   // Green (Safe)
    if (prob < 0.75) return "#ffc107";   // Yellow
    if (prob < 0.85) return "#fd7e14";   // Orange
    return "#dc3545";                    // Red (Phishing)
}


function displayResults(data) {
    const resultSection = document.getElementById('resultSection');

    const probNum = Number(data.bilstm_prob) || 0;
    const prob = (probNum * 100).toFixed(1);
    const score = Number(data.score) || 0;
    const risk = score < 3 ? 'Low' : score < 7 ? 'Medium' : 'High';
    const llm = data.llm_result;

    const html = `
        <div class="result-card">
            <div class="section-title">üìã URL Info</div>
            <div class="url-display"><strong>URL:</strong> ${data.url}</div>
            <p><strong>Host:</strong> ${data.host || 'N/A'}</p>
            <p><strong>Scheme:</strong> ${data.scheme || 'N/A'}</p>
        </div>

        <div class="result-card">
            <div class="section-title">ü§ñ AI Prediction</div>
            <div class="probability-bar">
                <div class="probability-fill" style="width:${prob}%; background-color:${getColor(probNum)}"></div>
            </div>
            <div class="probability-labels">
                <span>Safe</span><span>Phishing</span>
            </div>
        </div>

        <div class="result-card">
            <div class="section-title">‚öñÔ∏è Rule-based Analysis</div>
            <p><strong>Risk Level:</strong> ${risk} (${score}/15)</p>
            ${data.reasons && data.reasons.length ?
                `<ul class="reason-list">${data.reasons.map(r=>`<li>${r}</li>`).join('')}</ul>` :
                '<p>‚úÖ No suspicious indicators found.</p>'}
        </div>

        <div class="result-card">
            <div class="section-title">üìä Extracted Features</div>
            <ul class="feature-list">
                <li><strong>URL Length:</strong> ${data.features?.url_length || 0}</li>
                <li><strong>Digit Count:</strong> ${data.features?.digit_count || 0}</li>
                <li><strong>Entropy:</strong> ${(data.features?.url_entropy||0).toFixed(2)}</li>
                <li><strong>Links:</strong> ${data.features?.hrefs?.length || 0}</li>
                <li><strong>Images:</strong> ${data.features?.imgs?.length || 0}</li>
                <li><strong>Scripts:</strong> ${data.features?.scripts?.length || 0}</li>
                <li><strong>Forms:</strong> ${data.features?.forms?.length || 0}</li>
            </ul>
        </div>

        ${llm ? `
        <div class="result-card">
            <div class="section-title">üß† LLM Analysis</div>
            <p><strong>Verdict:</strong> ${llm.verdict}</p>
            <p><strong>Summary:</strong> ${llm.summary}</p>
            ${llm.reason_list?.length ?
                `<ul class="reason-list">${llm.reason_list.map(r=>`<li>${r}</li>`).join('')}</ul>` : ''}
        </div>` : ''}

        <!-- ‚úÖ ‡∏à‡∏∏‡∏î‡πÅ‡∏™‡∏î‡∏á‡∏ú‡∏• PhishTank -->
        <div id="phishTankResult" class="result-card"></div>
    `;

    resultSection.innerHTML = html;

    // ‚úÖ ‡∏ï‡∏£‡∏ß‡∏à PhishTank ‡πÅ‡∏•‡∏∞‡∏≠‡∏±‡∏õ‡πÄ‡∏î‡∏ï‡∏Ç‡πâ‡∏≠‡∏Ñ‡∏ß‡∏≤‡∏°
    if (data.in_phishtank) {
        document.getElementById("phishTankResult").innerHTML =
            "üß† PhishTank Status: <span style='color:red;'>‚ö†Ô∏è Found in PhishTank</span>";
    } else {
        document.getElementById("phishTankResult").innerHTML =
            "üß† PhishTank Status: <span style='color:green;'>‚úÖ Not listed in PhishTank</span>";
    }
}


document.getElementById('urlInput').addEventListener('keypress', e => {
    if (e.key === 'Enter') analyzeUrl();
});
</script>
</body>
</html>
"""


#Start ngrok server

In [None]:
!pip install pyngrok --quiet

In [None]:
!ngrok authtoken #authToken

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [None]:
print("üöÄ Starting Server with Ngrok...")


from pyngrok import ngrok
import threading
import time
import requests

ngrok.kill()

public_url = ngrok.connect(8000, bind_tls=True)
print(f"üåê Ngrok Public URL: {public_url}")

def run_server():
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")

# Start server in background
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()

print("‚è≥ Waiting for server to start...")
time.sleep(8)

print("\n" + "="*60)
print("üéØ PHISHING URL ANALYZER - READY!")
print("="*60)
print(f"üåê Web Interface: {public_url}/web")
print(f"üîß API Test: {public_url}/test")
print("üí° Open the Web Interface URL above in a NEW TAB!")
print("="*60)

üöÄ Starting Server with Ngrok...


INFO:     Started server process [8327]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


üåê Ngrok Public URL: NgrokTunnel: "https://unprivileged-multinucleolate-dahlia.ngrok-free.dev" -> "http://localhost:8000"
‚è≥ Waiting for server to start...

üéØ PHISHING URL ANALYZER - READY!
üåê Web Interface: NgrokTunnel: "https://unprivileged-multinucleolate-dahlia.ngrok-free.dev" -> "http://localhost:8000"/web
üîß API Test: NgrokTunnel: "https://unprivileged-multinucleolate-dahlia.ngrok-free.dev" -> "http://localhost:8000"/test
üí° Open the Web Interface URL above in a NEW TAB!


#Test ngrok server

In [20]:
print("üß™ Testing with Ngrok...")

def test_with_ngrok():
    try:
        # Get the ngrok URL
        from pyngrok import ngrok
        tunnels = ngrok.get_tunnels()
        if tunnels:
            public_url = tunnels[0].public_url
            print(f"üåê Using Ngrok URL: {public_url}")
        else:
            print("‚ùå No ngrok tunnel found")
            return False

        # Test basic connectivity
        print("üîç Testing server connectivity...")
        response = requests.get(f"{public_url}/", timeout=15)
        if response.status_code == 200:
            print("‚úÖ Server is running via Ngrok!")
        else:
            print(f"‚ùå Server error: {response.status_code}")
            return False

        # Test analysis
        test_url = "https://www.google.com/"
        print(f"üîç Testing analysis for: {test_url}")

        response = requests.post(
            f"{public_url}/analyze",
            json={"url": test_url, "call_llm": False},
            timeout=30
        )

        if response.status_code == 200:
            results = response.json()
            print("‚úÖ Analysis successful!")
            print(f"   üìä Result: {results['bilstm_label']}")
            print(f"   üéØ Confidence: {results['bilstm_prob']:.2%}")
            print(f"   ‚öñÔ∏è Risk Score: {results['score']}/15")
            print(f"\nüéâ System is working! Open the web interface:")
            print(f"üëâ {public_url}/web")
            return True
        else:
            print(f"‚ùå Analysis failed: {response.status_code}")
            print(f"Response: {response.text}")
            return False

    except Exception as e:
        print(f"‚ùå Test failed: {e}")
        return False

# Run test
test_with_ngrok()

üß™ Testing with Ngrok...
üåê Using Ngrok URL: https://unprivileged-multinucleolate-dahlia.ngrok-free.dev
üîç Testing server connectivity...
INFO:     34.26.122.64:0 - "GET / HTTP/1.1" 200 OK
‚úÖ Server is running via Ngrok!
üîç Testing analysis for: https://www.google.com/
üîç Analyzing URL: https://www.google.com/
[1m1/1[0m [32m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[37m[0m [1m0s[0m 33ms/step
INFO:     34.26.122.64:0 - "POST /analyze HTTP/1.1" 200 OK
‚úÖ Analysis successful!
   üìä Result: 0
   üéØ Confidence: 98.77%
   ‚öñÔ∏è Risk Score: 2/15

üéâ System is working! Open the web interface:
üëâ https://unprivileged-multinucleolate-dahlia.ngrok-free.dev/web


True