

 Phishing Detection - Feature Extraction


In [1]:

!pip install python-whois tldextract ipwhois imagehash pillow requests beautifulsoup4

Collecting python-whois
  Downloading python_whois-0.9.5-py3-none-any.whl.metadata (2.6 kB)
Collecting tldextract
  Downloading tldextract-5.3.0-py3-none-any.whl.metadata (11 kB)
Collecting ipwhois
  Downloading ipwhois-1.3.0-py2.py3-none-any.whl.metadata (21 kB)
Collecting imagehash
  Downloading ImageHash-4.3.2-py2.py3-none-any.whl.metadata (8.4 kB)
Collecting requests-file>=1.4 (from tldextract)
  Downloading requests_file-2.1.0-py2.py3-none-any.whl.metadata (1.7 kB)
Collecting dnspython (from ipwhois)
  Downloading dnspython-2.8.0-py3-none-any.whl.metadata (5.7 kB)
Downloading python_whois-0.9.5-py3-none-any.whl (104 kB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m104.2/104.2 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading tldextract-5.3.0-py3-none-any.whl (107 kB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚

In [2]:

import pandas as pd, numpy as np, re, math, hashlib, socket, ssl, io
import tldextract
from urllib.parse import urlparse, urljoin
from datetime import datetime
import requests
from bs4 import BeautifulSoup
import imagehash
from PIL import Image
import whois
from ipwhois import IPWhois
import socket
import requests
from urllib.parse import urljoin
import time

from google.colab import files

In [4]:
_rdap_cache = {}

def _query_json(url, timeout=10):
    try:
        r = requests.get(url, timeout=timeout, headers={"User-Agent":"feature-extractor/1.0"})
        r.raise_for_status()
        return r.json()
    except Exception as e:
        return None

def get_asn_info(ip):

    if not ip:
        return {"asn": None, "rir": None, "name": None, "country": None, "error": "no ip"}

    if ip in _rdap_cache:
        return _rdap_cache[ip]

    try:
        iana_url = f"https://rdap.iana.org/ip/{ip}"
        iana_json = _query_json(iana_url)
        if not iana_json:
            out = {"asn": None, "rir": None, "name": None, "country": None, "error": "iana lookup failed"}
            _rdap_cache[ip] = out
            return out

        links = iana_json.get("links") or []
        if not links:
            out = {"asn": None, "rir": None, "name": None, "country": None, "error": "no rdap links from iana"}
            _rdap_cache[ip] = out
            return out


        last_exc = None
        for link in links:
            rdap_url = link.get("href")
            if not rdap_url:
                continue

            rdap_json = _query_json(rdap_url)
            if not rdap_json:
                try_url = rdap_url
                if not try_url.endswith("/"):
                    try_url = try_url + "/"
                try_url = urljoin(try_url, f"ip/{ip}")
                rdap_json = _query_json(try_url)
                if not rdap_json:
                    last_exc = f"failed rdap at {rdap_url}"
                    continue

            asn = None
            name = rdap_json.get("name") or rdap_json.get("handle")
            country = rdap_json.get("country") or None

            asn_info = get_asn_info(ip)
            if asn_info.get('error'):
               time.sleep(0.5)

            if "asn" in rdap_json and rdap_json["asn"]:
                asn = str(rdap_json["asn"])
            # network blocks often under "network" or "objectClassName"
            network = rdap_json.get("network") or rdap_json.get("object") or rdap_json
            # look for 'handle' or 'rdapConformance' etc
            if isinstance(network, dict):
                handle = network.get("handle") or network.get("name")
                if handle and not asn:
                    asn = handle  # fallback, not always numeric

            out = {"asn": asn, "rir": rdap_url, "name": name, "country": country, "error": None}
            _rdap_cache[ip] = out
            return out

        # if we reach here, all RIR queries failed
        out = {"asn": None, "rir": None, "name": None, "country": None, "error": last_exc or "rdap failed"}
        _rdap_cache[ip] = out
        return out

    except Exception as e:
        out = {"asn": None, "rir": None, "name": None, "country": None, "error": str(e)}
        _rdap_cache[ip] = out
        return out


In [12]:
# ==============================
# üìå Data Collection / Upload / Fetch Layer
# ==============================

import pandas as pd

# User choice
print("Choose data source:")
print("1 - Upload files manually")
print("2 - Fetch data automatically from web")
choice = input("Enter 1 or 2: ")

df = pd.DataFrame()  # Empty DataFrame to store final dataset

if choice == "1":
    # ---------- Upload Files ----------
    from google.colab import files
    uploaded = files.upload()  # Multiple file upload

    for filename in uploaded.keys():
        print(f"Loading file: {filename}")
        if filename.endswith(".csv"):
            temp_df = pd.read_csv(filename)
        elif filename.endswith((".xls", ".xlsx")):
            temp_df = pd.read_excel(filename)
        else:
            print(f"Skipping unsupported file type: {filename}")
            continue

        df = pd.concat([df, temp_df], ignore_index=True)

    print("Combined uploaded dataset shape:", df.shape)

elif choice == "2":
    # ---------- Fetch Data from Web ----------
    import requests

    try:
        # Phishing URLs
        phishing_urls = requests.get("https://openphish.com/feed.txt", timeout=10).text.splitlines()
        df_phish = pd.DataFrame(phishing_urls, columns=["url"])
        df_phish["label"] = "phishing"
    except Exception as e:
        print("Error fetching phishing URLs:", e)
        df_phish = pd.DataFrame(columns=["url","label"])

    try:
        # Legitimate URLs (Top 500)
        legit_urls = requests.get("https://tranco-list.eu/top-1m.csv", timeout=10).text.splitlines()[:500]
        legit_urls = [line.split(",")[1] for line in legit_urls if "," in line]
        df_legit = pd.DataFrame(["http://" + u for u in legit_urls], columns=["url"])
        df_legit["label"] = "legitimate"
    except Exception as e:
        print("Error fetching legitimate URLs:", e)
        df_legit = pd.DataFrame(columns=["url","label"])

    # Combine
    df = pd.concat([df_phish, df_legit], ignore_index=True)
    print("Fetched dataset shape:", df.shape)

else:
    print("Invalid choice! Please run again and select 1 or 2.")

# Display top rows
df.head()


Choose data source:
1 - Upload files manually
2 - Fetch data automatically from web
Enter 1 or 2: 1


Saving PS-02  Phishing Detection CSE_Domains_Dataset_for_Stage_1.xlsx to PS-02  Phishing Detection CSE_Domains_Dataset_for_Stage_1 (1).xlsx
Loading file: PS-02  Phishing Detection CSE_Domains_Dataset_for_Stage_1 (1).xlsx
Combined uploaded dataset shape: (29, 4)


Unnamed: 0,S. No,Sector,Organisation Name,Whitelisted Domains
0,1,BFSI,State Bank of India (SBI),onlinesbi.sbi
1,2,,,sbi.co.in
2,3,,,sbicard.com
3,4,,,yonobusiness.sbi
4,5,,,sbiepay.sbi


In [13]:

def auto_detect_url_column(df, provided=None):
    if provided and provided in df.columns:
        return provided
    candidates = [c for c in df.columns if any(k in c.lower() for k in ["url","link","website","site"])]
    if candidates:
        return candidates[0]
    for c in df.columns:
        sample = df[c].astype(str).dropna().head(50).tolist()
        http_like = sum(1 for v in sample if v.startswith("http") or v.startswith("www") or "." in v)
        if http_like > 5:
            return c
    raise KeyError("No URL-like column found.")

URL_COLUMN = auto_detect_url_column(df, "Whitelisted Domains")
print("Using URL column:", URL_COLUMN)

Using URL column: Whitelisted Domains


In [14]:

def url_length(url): return len(url) if isinstance(url,str) else np.nan
def count_chars(url,ch): return url.count(ch) if isinstance(url,str) else 0
def count_digits(url): return sum(c.isdigit() for c in str(url))
def count_letters(url): return sum(c.isalpha() for c in str(url))
def count_hyphens(url): return count_chars(url,'-')
def count_dots(url): return count_chars(url,'.')
def count_special_chars(url): return len(re.findall(r'[^A-Za-z0-9]', str(url)))

def shannon_entropy(s):
    s = str(s)
    if len(s) == 0: return 0.0
    prob = [float(s.count(c))/len(s) for c in dict.fromkeys(list(s))]
    return -sum([p*math.log(p,2) for p in prob])

def has_suspicious_keyword(url):
    suspicious = ['login','signin','secure','update','verify','account','bank','ebay','paypal','click']
    url = str(url).lower()
    return int(any(k in url for k in suspicious))

def num_subdomains(url):
    try:
        ext = tldextract.extract(url)
        return len(ext.subdomain.split('.')) if ext.subdomain else 0
    except: return 0

def top_domain_under_public_suffix(url):
    try: return tldextract.extract(url).top_domain_under_public_suffix
    except: return None

def tld_suffix(url):
    try: return tldextract.extract(url).suffix
    except: return ''

In [15]:


def get_rdap_info(url):
    """
    Extract registrar and domain age using RDAP (via IPWhois).
    Works on IP-level info, not domain names.
    """
    try:
        # Resolve domain ‚Üí IP
        domain = tldextract.extract(url).top_domain_under_public_suffix
        ip = socket.gethostbyname(domain)

        # Query RDAP for IP info
        obj = IPWhois(ip)
        rdap = obj.lookup_rdap(asn_methods=["whois"])

        # Extract creation date (some RDAP responses may vary)
        events = rdap.get("network", {}).get("events", [])
        created = None
        for e in events:
            if e.get("event_action") in ["registration", "registered"]:
                created = e.get("event_date")
                break

        domain_age_days = None
        if created:
            created_dt = datetime.fromisoformat(created.replace("Z", "+00:00"))
            domain_age_days = (datetime.now(created_dt.tzinfo) - created_dt).days

        # Registrar (might appear under entities)
        registrar = None
        entities = rdap.get("entities", [])
        if entities:
            registrar = ",".join(entities)

        return {
            "domain_age_days": domain_age_days,
            "registrar": registrar,
            "rdap_source": "ipwhois"
        }

    except Exception as e:
        return {
            "domain_age_days": None,
            "registrar": None,
            "rdap_source": f"error: {e}"
        }

In [16]:
#  SSL / DNS Features
def resolve_ip(domain):
    try: return socket.gethostbyname(domain)
    except: return None

def ip_whois_country(ip):
    try:
        obj=IPWhois(ip)
        res=obj.lookup_rdap(depth=1)
        return res.get('network',{}).get('country')
    except: return None

def ssl_valid(domain):
    try:
        ctx=ssl.create_default_context()
        with ctx.wrap_socket(socket.socket(),server_hostname=domain) as s:
            s.settimeout(4.0)
            s.connect((domain,443))
            cert=s.getpeercert()
            return 1 if cert else 0
    except: return 0

def ssl_days_remaining(domain):
    try:
        ctx=ssl.create_default_context()
        with ctx.wrap_socket(socket.socket(),server_hostname=domain) as s:
            s.settimeout(4.0)
            s.connect((domain,443))
            cert=s.getpeercert()
        if cert:
            not_after=cert.get('notAfter')
            expire_dt=datetime.strptime(not_after,'%b %d %H:%M:%S %Y %Z')
            return (expire_dt-datetime.now()).days
    except: return np.nan

In [17]:
#  Content / Visual Features
def safe_request_get(url,timeout=6):
    try: return requests.get("http://"+url if not url.startswith("http") else url,timeout=timeout,allow_redirects=True)
    except: return None

def fetch_page(url):
    r=safe_request_get(url)
    return (r,r.text) if r else (None,None)

def count_meta_tags(html):
    try: return len(BeautifulSoup(html,'html.parser').find_all('meta'))
    except: return 0

def has_iframe(html):
    try: return int(bool(BeautifulSoup(html,'html.parser').find('iframe')))
    except: return 0

def count_forms(html):
    try: return len(BeautifulSoup(html,'html.parser').find_all('form'))
    except: return 0

def has_password_input(html):
    try:
        for inp in BeautifulSoup(html,'html.parser').find_all('input',attrs={'type':True}):
            if inp.get('type','').lower()=='password': return 1
        return 0
    except: return 0

def count_external_links(html,base_domain):
    try:
        anchors=BeautifulSoup(html,'html.parser').find_all('a',href=True)
        return sum(1 for a in anchors if a['href'].startswith('http') and base_domain not in a['href'])
    except: return 0

def favicon_hash(url):
    try:
        r=safe_request_get(url)
        if not r: return None
        soup=BeautifulSoup(r.text,'html.parser')
        icon=soup.find('link',rel=lambda x: x and 'icon' in x.lower())
        if icon and icon.get('href'):
            icon_url=urljoin(url,icon['href'])
            ir=safe_request_get(icon_url)
            if ir and ir.content:
                img=Image.open(io.BytesIO(ir.content)).convert('RGB')
                return str(imagehash.average_hash(img))
    except: return None
    return None

# Placeholders
def logo_similarity_stub(url):
    """
    Simple placeholder that returns the favicon perceptual hash string (if available).
    This is NOT a similarity score; it's a fingerprint you can later compare
    against a database of known-good favicon hashes (or compute Hamming distance).
    """
    try:
        # favicon_hash(url) should already be defined in your notebook
        return favicon_hash(url)
    except Exception:
        return None

def text_similarity_stub(url, reference_texts=None):
    """
    Simple placeholder for text-similarity:
     - If reference_texts is None: compute a small 'keyword-presence' score (0..1)
       using banking/secure-related keywords ‚Äî useful as a quick heuristic.
     - If reference_texts is provided (list of strings), this stub will compute
       a simple TF-IDF cosine similarity *if sklearn is available* (best-effort).
       If sklearn is not installed it falls back to the keyword score.
    Returns:
      - float between 0 and 1, or None on failure.
    """
    try:
        resp, html = fetch_page(url)
        if not html:
            return None

        # clean visible text
        soup = BeautifulSoup(html, 'html.parser')
        for s in soup(['script', 'style', 'noscript']):
            s.decompose()
        page_text = ' '.join(soup.stripped_strings).lower()

        # quick keyword heuristic (always available)
        keywords = ['login', 'signin', 'secure', 'verify', 'account', 'password',
                    'bank', 'transfer', 'credit', 'debit', 'otp', 'reset']
        found = sum(1 for k in keywords if k in page_text)
        keyword_score = found / len(keywords)

        # if no reference_texts provided, return keyword_score as a simple measure
        if not reference_texts:
            return float(keyword_score)

        # if references provided, try TF-IDF similarity (if sklearn is available)
        try:
            from sklearn.feature_extraction.text import TfidfVectorizer
            from sklearn.metrics.pairwise import cosine_similarity
        except Exception:
            # sklearn not available ‚Äî fallback to keyword heuristic
            return float(keyword_score)

        # build vectorizer and compute similarity
        texts = [page_text] + [t.lower() for t in reference_texts]
        vect = TfidfVectorizer(stop_words='english', max_features=10000)
        tf = vect.fit_transform(texts)
        sims = cosine_similarity(tf[0:1], tf[1:]).flatten()
        return float(sims.max()) if len(sims) else float(keyword_score)

    except Exception:
        return None

In [19]:
# --- Run pipeline ---
OUTPUT_PATH = "final_dataset.csv"

rows = []
for i, raw_url in enumerate(df[URL_COLUMN].astype(str)):
    row = {'url': raw_url.strip()}
    url = row['url']

    try:
        # --- Lexical Features ---
        row['url_length']        = url_length(url)
        row['num_dots']          = count_dots(url)
        row['num_hyphens']       = count_hyphens(url)
        row['num_digits']        = count_digits(url)
        row['num_letters']       = count_letters(url)
        row['num_special_chars'] = count_special_chars(url)
        row['entropy']           = shannon_entropy(url)
        row['suspicious_keyword']= has_suspicious_keyword(url)
        row['num_subdomains']    = num_subdomains(url)
        row['top_domain_under_public_suffix'] = top_domain_under_public_suffix(url)
        row['tld']               = tld_suffix(url)

        # --- WHOIS (safe) ---
        rdap_data = get_rdap_info(row['top_domain_under_public_suffix']) or {}
        row['registrar']        = rdap_data.get('registrar')
        row['domain_age_days']  = rdap_data.get('domain_age_days')
        row['rdap_source']      = rdap_data.get('rdap_source')

        # --- DNS / IP ---
        ip = resolve_ip(row['top_domain_under_public_suffix'])
        row['ip'] = ip
        row['ip_country'] = ip_whois_country(ip)

        # new RDAP/ASN lookup (IANA bootstrap -> correct RIR)
        asn_info = get_asn_info(ip)   # call the function provided above
        row['asn'] = asn_info.get('asn')
        row['asn_rir'] = asn_info.get('rir')
        row['asn_name'] = asn_info.get('name')
        row['asn_country'] = asn_info.get('country')
        row['asn_error'] = asn_info.get('error')   # useful for debugging, optional

        # --- SSL ---
        row['ssl_valid']          = ssl_valid(row['top_domain_under_public_suffix'])
        row['ssl_days_remaining'] = ssl_days_remaining(row['top_domain_under_public_suffix'])

        # --- HTTP / Content ---
        resp, html = fetch_page(url)
        row['status_code']       = resp.status_code if resp else None
        row['num_redirects']     = len(resp.history) if resp and hasattr(resp, 'history') else 0
        row['content_length']    = len(resp.content) if resp and resp.content else 0
        row['meta_tags']         = count_meta_tags(html)
        row['has_iframe']        = has_iframe(html)
        row['external_links']    = count_external_links(html, row['top_domain_under_public_suffix'])
        row['has_password_input']= has_password_input(html)
        row['form_count']        = count_forms(html)
        row['favicon_hash']      = favicon_hash(url)

        # --- Hashes ---
        row['sha1'] = hashlib.sha1(url.encode()).hexdigest()
        row['md5']  = hashlib.md5(url.encode()).hexdigest()

        # --- Placeholders ---
        row['logo_similarity'] = logo_similarity_stub(url)
        row['text_similarity'] = text_similarity_stub(url)

    except Exception as e:
        print("Error on URL", url, ":", e)
        # Ensure all expected keys exist even if error
        for col in [
            'url_length','num_dots','num_hyphens','num_digits','num_letters',
            'num_special_chars','entropy','suspicious_keyword','num_subdomains',
            'registered_domain','tld','whois_registrar','whois_creation_date',
            'whois_expiration_date','domain_age_days','is_privacy_protected',
            'ip','ip_country','asn','ssl_valid','ssl_days_remaining',
            'status_code','num_redirects','content_length','meta_tags',
            'has_iframe','external_links','has_password_input','form_count',
            'favicon_hash','sha1','md5','logo_similarity','text_similarity'
        ]:
            row.setdefault(col, None)

    rows.append(row)
    if (i+1) % 10 == 0 or (i+1) == len(df):
        print(f"Processed {i+1}/{len(df)} rows")

# --- Save results ---
features = pd.DataFrame(rows)
out = pd.concat([df.reset_index(drop=True), features.reset_index(drop=True)], axis=1)
out.to_csv(OUTPUT_PATH, index=False)
print("Saved features to", OUTPUT_PATH)
out.head()


Processed 10/29 rows
Processed 20/29 rows




Processed 29/29 rows
Saved features to final_dataset.csv


Unnamed: 0,S. No,Sector,Organisation Name,Whitelisted Domains,url,url_length,num_dots,num_hyphens,num_digits,num_letters,...,meta_tags,has_iframe,external_links,has_password_input,form_count,favicon_hash,sha1,md5,logo_similarity,text_similarity
0,1,BFSI,State Bank of India (SBI),onlinesbi.sbi,onlinesbi.sbi,13,1,0,0,12,...,6,0,74,0,1,,20068c4cfeec0839de4bcbbfe9e3f27c8613b30a,9569114d03419d20412df8c18278b703,,0.583333
1,2,,,sbi.co.in,sbi.co.in,9,2,0,0,7,...,0,0,0,0,0,,9b18b673fe5573149e617767e94d822afee9de70,3e8407c29a5dc6261d67eb80dd55c7b7,,
2,3,,,sbicard.com,sbicard.com,11,1,0,0,10,...,0,0,0,0,0,,4c58e00fce12dd8a4f8b1db819880663225dc2f6,c58fa33686936800ead8ab76a4f7a4ed,,
3,4,,,yonobusiness.sbi,yonobusiness.sbi,16,1,0,0,15,...,4,0,0,0,0,,3d57ba3bc5d57089e22490975c2eb16b553d9ea6,7fd28b32d79607b6245bdead7f2228d6,,0.0
4,5,,,sbiepay.sbi,sbiepay.sbi,11,1,0,0,10,...,0,0,0,0,0,,edfafea5fdf06ae8d3cfba0a8c68aad17b42ba30,7374781714cfd67dbd8bab0d4aef7a0b,,


In [21]:
from google.colab import files
files.download("final_dataset.csv")   # must match the saved filename



<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>