# PREPROCESS

In [1]:
import pandas as pd
from pathlib import Path
import dns.resolver
import requests

In [2]:
DATA_DIR = Path('../data')
PHISH_DIR = DATA_DIR / 'phishtank'
LEGIT_DIR = DATA_DIR / 'common_crawl'
PROCESSED_DIR = DATA_DIR / 'processed'
PHISH_DATA = PHISH_DIR / 'collected_data.csv'
LEGIT_DATA = LEGIT_DIR / 'collected_data.csv'

In [5]:
df_phish = pd.read_csv(PHISH_DATA)
df_valid = pd.read_csv(LEGIT_DATA)

In [3]:
df_processed = pd.read_csv(PROCESSED_DIR / 'phish_data.csv')

# Reapplying Rules
Some address based indicators were updated after they were collected

In [4]:
from urllib.parse import urlparse, urljoin
import whois
import tldextract

In [5]:
def get_whois(url):
    """Gets the WHOIS information of a URL."""
    try:
        whois_data = whois.whois(url.lower())
        return whois_data
    except Exception:
        return False

# CloudFlare
top_phishing_tlds = [
    # Cheap and Open TLDs
    ".xyz", ".top", ".club", ".online", ".shop", ".site", ".vip", ".buzz",

    # Freenom TLDs (Free Domains)
    ".tk", ".ml", ".ga", ".cf", ".gq",

    # Geographic and Niche TLDs less commonly used for legitimate purposes
    ".ly", ".to", ".ru", ".cn", ".su"
]

def is_statistical_report(url):
    """Determines if the URL has a suspicious statistical report based on phishing domains or IPs."""
    ext = tldextract.extract(url)
    if f".{ext.suffix}" in top_phishing_tlds:
        return -1  # Phishing
    
    return 1 

def core_domain(url):
    """Normalize the URL by extracting only the core domain using tldextract."""
    extracted = tldextract.extract(url)
    core_domain = f"{extracted.domain}.{extracted.suffix}"
    return core_domain

def domain_name(url):
    """Normalize the URL by extracting only the domain name using tldextract."""
    extracted = tldextract.extract(url)
    return extracted.domain

url_shortening_services = [
    # Legitimate Shortening Services
    "bit.ly", "tinyurl.com", "t.co", "is.gd", "ow.ly",
    "buff.ly", "rebrand.ly", "sh.st", "adf.ly", "bl.ink",
    "clck.ru", "mcaf.ee", "tiny.cc", "fb.me", "amzn.to",
    "lnkd.in", "yt.be", "wp.me", "git.io", "nyti.ms",
    "es.pn", "cnn.it",

    # Known Suspicious or Exploited Shorteners
    "goo.gl", "cut.ly", "rb.gy", "soo.gd", "t.ly",
    "v.gd", "qr.ae", "x.co", "zl.gg", "tr.im",
    "linktr.ee", "phurl.me", "short.cm", "cutt.ly"
]
def is_shortening_service(url):
    """Determines if the URL uses a URL shortening service."""
    core = core_domain(url)
    if core in url_shortening_services:
        return -1
    return 1

def is_url_long(url):
    """Determines if the URL length is suspicious or phishing based on length."""
    url_length = len(url)
    
    if url_length < 54:
        return 1  # Legitimate
    elif 54 <= url_length <= 75:
        return 0  # Suspicious
    else:
        return -1  # Phishing

def is_double(url):
    """Determines if the URL redirects using '//'."""
    parsed_url = urlparse(url)
    if parsed_url.scheme == "http":
        limit_position = 6
    elif parsed_url.scheme == "https":
        limit_position = 7
    else:
        return 1

    last_occurrence_index = url.rfind("//")

    if last_occurrence_index > limit_position:
        return -1
    else:
        return 1
    
def is_dns_record(url, timeout=5):
    """Check if the domain or subdomain has DNS records."""
    ext = tldextract.extract(url)
    domain = f"{ext.domain}.{ext.suffix}"
    
    resolver = dns.resolver.Resolver()
    resolver.timeout = timeout
    resolver.lifetime = timeout  # Set a timeout for the entire resolution process
    
    try:
        a_records = resolver.resolve(domain, 'A')
        if a_records:
            return 1
    except dns.resolver.NoAnswer:
        pass
    except dns.resolver.NXDOMAIN:
        return -1
    except dns.exception.Timeout:
        return -1
    except dns.resolver.NoNameservers:
        return -1
    
    try:
        aaaa_records = resolver.resolve(domain, 'AAAA')
        if aaaa_records:
            return 1
    except dns.resolver.NoAnswer:
        pass
    except dns.resolver.NXDOMAIN:
        return -1
    except dns.exception.Timeout:
        return -1
    except dns.resolver.NoNameservers:
        return -1
    
    return -1
    
def is_having_sub_domain(url):
    """Classifies a URL based on the number of subdomains."""
    ext = tldextract.extract(url)
    subdomain = ext.subdomain
    num_subdomains = len(subdomain.split('.')) if subdomain else 0
    
    if num_subdomains == 0:
        return 1  # Legitimate
    elif num_subdomains == 1:
        return 1  # Legitimate
    elif num_subdomains == 2:
        return 0  # Suspicious
    else:
        return -1
    
def is_abnormal_url(url):
    """Determines if the URL is abnormal."""
    ext = tldextract.extract(url)
    host_name = ext.domain + '.' + ext.suffix
    w = whois.whois(url)
    if w and 'domain_name' in w:
        domain_names = w['domain_name']
        print(f"Domain names: {domain_names}")
        if isinstance(domain_names, list):
            for domain in domain_names:
                if host_name.lower() == domain.lower():
                    return 1
        elif isinstance(domain_names, str):
            if host_name.lower() == domain_names.lower():
                return 1
    return -1

In [None]:
def is_url_online(url):
    try:
        response = requests.get(url, timeout=10)
        # Check if the status code indicates success (2xx range)
        if response.status_code >= 200 and response.status_code < 300:
            return True  # URL is online
        else:
            print (f"URL is not online, or is responding with an error code: {response.status_code}")
            return False  # URL is not online, or is responding with an error code
    except requests.RequestException:
        return False

In [None]:
from tqdm import tqdm
tqdm.pandas()  # This enables tqdm for pandas' apply method

# Apply the function with progress tracking
df_phish = df_phish[df_phish['website_url'].progress_apply(is_url_online)]

In [None]:
print(f"Number of phishing URLs after filtering: {len(df_phish)}")

In [None]:
df_phish.drop_duplicates(subset=['website_url'], inplace=True)
df_valid.drop_duplicates(subset=['website_url'], inplace=True)

df_phish['statistical_report'] = df_phish['website_url'].apply(is_statistical_report)
df_valid['statistical_report'] = df_valid['website_url'].apply(is_statistical_report)
df_phish['shortining_service'] = df_phish['website_url'].apply(is_shortening_service)
df_valid['shortining_service'] = df_valid['website_url'].apply(is_shortening_service)
df_phish['url_length'] = df_phish['website_url'].apply(is_url_long)
df_valid['url_length'] = df_valid['website_url'].apply(is_url_long)
df_phish['having_sub_domain'] = df_phish['website_url'].apply(is_having_sub_domain)
df_valid['having_sub_domain'] = df_valid['website_url'].apply(is_having_sub_domain)
df_phish['double_slash_redirecting'] = df_phish['website_url'].apply(is_double)
df_valid['double_slash_redirecting'] = df_valid['website_url'].apply(is_double)
df_phish['dnsrecord'] = df_phish['website_url'].progress_apply(is_dns_record)
df_valid['dnsrecord'] = df_valid['website_url'].progress_apply(is_dns_record)
df_phish['abnormal_url'] = df_phish['website_url'].progress_apply(is_abnormal_url)
df_valid['abnormal_url'] = df_valid['website_url'].progress_apply(is_abnormal_url)
df_phish['result'] = -1
df_valid['result'] = 1

In [None]:
df_phish.to_csv(PROCESSED_DIR / 'phish_data.csv', index=False)
df_valid.to_csv(PROCESSED_DIR / 'valid_data.csv', index=False)

# Stats

In [7]:
from tabulate import tabulate

def create_table(data, headers):
    print(tabulate(data, headers=headers, tablefmt="fancy_grid"))

In [8]:
def statistics(df):
    metrics = []
    data = []
    for col in df.columns:
        if col == 'result':
            continue
        
        value_counts = df[col].value_counts().reindex([-1, 0, 1], fill_value=0)
        phishing = value_counts.get(-1, 0)
        suspicious = value_counts.get(0, 0)
        legitimate = value_counts.get(1, 0)
        
        df[col] = pd.to_numeric(df[col], errors='coerce')
        df['result'] = pd.to_numeric(df['result'], errors='coerce')
        
        TP = ((df[col] == -1) & (df['result'] == -1)).sum()
        FP = (((df[col] == -1) | (df[col] == 0)) & (df['result'] == 1)).sum()
        TN = ((df[col] == 1) & (df['result'] == 1)).sum()
        FN = (((df[col] == 1) | (df[col] == 0)) & (df['result'] == -1)).sum()
        
        precision = TP / (TP + FP) if (TP + FP) > 0 else 0
        recall = TP / (TP + FN) if (TP + FN) > 0 else 0
        accuracy = (TP + TN) / (TP + FP + TN + FN) if (TP + FP + TN + FN) > 0 else 0
        metrics.append({
            'Feature': col,
            'Precision': precision,
            'Recall': recall,
            'Accuracy': accuracy
        })
        data.append([col, phishing, suspicious, legitimate, precision, recall, accuracy])


    headers = ["Feature","Phishing (-1)", "Suspicious (0)","Legitimate (1)", "Precision (%)", "Recall (%)", "Accuracy (%)"]
    create_table(data, headers)

In [None]:
statistics(df_phish)


In [None]:
df_processed['result'] = -1
statistics(df_processed)

╒════════════════════════════╤═════════════════╤══════════════════╤══════════════════╤═════════════════╤══════════════╤════════════════╕
│ Feature                    │   Phishing (-1) │   Suspicious (0) │   Legitimate (1) │   Precision (%) │   Recall (%) │   Accuracy (%) │
╞════════════════════════════╪═════════════════╪══════════════════╪══════════════════╪═════════════════╪══════════════╪════════════════╡
│ website_url                │               0 │                0 │                0 │               0 │   0          │     0          │
├────────────────────────────┼─────────────────┼──────────────────┼──────────────────┼─────────────────┼──────────────┼────────────────┤
│ having_ip_address          │               0 │                0 │             2338 │               0 │   0          │     0          │
├────────────────────────────┼─────────────────┼──────────────────┼──────────────────┼─────────────────┼──────────────┼────────────────┤
│ url_length                 │           

In [13]:
df_processed['statistical_report'] =  df_processed['website_url'].apply(is_statistical_report)

In [20]:
def check_for_subdomain(url):
    """Check if the URL has a subdomain."""
    ext = tldextract.extract(url)
    subdomain = ext.subdomain
    if subdomain:
        return True

In [21]:
df_processed['subdomain_count'] = df_processed['website_url'].apply(check_for_subdomain)
true_count = df_processed['subdomain_count'].sum()
print(f"Number of URLs with subdomains: {true_count}")

Number of URLs with subdomains: 1888


In [14]:
df_processed.to_csv(PROCESSED_DIR / 'phish_data.csv', index=False)

# Combine

In [None]:
df_combined = pd.concat([df_phish, df_valid], ignore_index=True)
statistics(df_combined)
df_combined.drop(columns=['website_url'], inplace=True)

# Feature Selection

In [None]:
drop_features = {
    'website_url',          # NOT NEEDED
    'sslfinal_state',       # BAD
    'having_ip_address',    # USELESS
    'port'                  # USELESS
}

In [1]:
import requests
import json

api_key = "f7ee35680b6b481fa77a02156fe59bfd"

def get_digital_rank(domain):
    try:
        if domain.startswith("www."):
            domain = domain[4:]
            
        url = f"https://api.similarweb.com/v1/similar-rank/{domain}/rank?api_key={api_key}"
        response = requests.get(url)

        if response.status_code == 200:
            return response.json()
        else:
            return response.text
    except Exception:
        print(f"An error occurred while fetching the digital rank for {domain} {str(Exception.__name__)}")
        return None
    

print(get_digital_rank("contec.com"))

{'meta': {'request': {'domain': 'contec.com', 'limit': 10, 'format': 'json'}, 'status': 'Success', 'last_updated': '2024-10-31'}, 'similar_rank': {'rank': 219447}}


In [7]:
from urllib.parse import urlparse
url = "https://brausuario.seuprogramabra.biz.ua/html/?cliente=bWlndWVsbWdsQHVvbC5jb20uYnI=&amp;key=29dea2cacd927a9c3b30e2f027fd0564&amp"
parsed_url = urlparse(url)
full_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
print(full_url)

https://brausuario.seuprogramabra.biz.ua/html/


In [12]:
import os
def get_open_page_rank(url):
    try:
        parsed_url = urlparse(url)
        domain = parsed_url.netloc
        url = "https://openpagerank.com/api/v1.0/getPageRank"
        params = {
            "domains[]": domain
        }

        headers = {
            "API-OPR": os.getenv('OPEN_PAGE_RANK_API_KEY')
        }

        response = requests.get(url, params=params, headers=headers)

        if response.status_code == 200:
            return response.json()
        else:
            return None
    except Exception:
        return None
    
print(get_open_page_rank("https://help.contec.com/pc-helper/api-tool-wdm/en/mergedProjects/CAIO/sample_program/VC/Ao/AoGeneratingRingMemory.htm"))

{'status_code': 200, 'response': [{'status_code': 200, 'error': '', 'page_rank_integer': 2, 'page_rank_decimal': 2.28, 'rank': '49033383', 'domain': 'help.contec.com'}], 'last_updated': '26th Oct 2024'}
