<a href="https://colab.research.google.com/github/sameerraj09/Capstone_Project/blob/main/DataCollection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**DATA COLLECTION**

In [None]:
import re
import requests
import tldextract
import numpy as np
import pandas as pd
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from googlesearch import search
from Levenshtein import distance as levenshtein_distance
from waybackpy import WaybackMachineCDXServerAPI
import whois

def get_wayback_snapshot(url):
    """Fetch the latest archived snapshot URL from the Wayback Machine."""
    try:
        wayback = WaybackMachineCDXServerAPI(url)
        snapshot = wayback.newest()
        return snapshot.archive_url if snapshot else None
    except:
        return None

def extract_numerical_features(url):
    try:
        parsed_url = urlparse(url)
        domain = parsed_url.netloc
        ext = tldextract.extract(url)

        url_length = len(url)
        domain_length = len(domain)
        tld_length = len(ext.suffix)

        letter_ratio = sum(c.isalpha() for c in url) / url_length if url_length > 0 else 0
        digit_ratio = sum(c.isdigit() for c in url) / url_length if url_length > 0 else 0
        special_char_ratio = sum(not c.isalnum() for c in url) / url_length if url_length > 0 else 0

        return [url_length, domain_length, tld_length, letter_ratio, digit_ratio, special_char_ratio]
    except:
        return None

def is_https(url):
    try:
        return 1 if urlparse(url).scheme == "https" else -1
    except:
        return None

def is_domain_ip(url):
    try:
        return 1 if re.match(r"\d+\.\d+\.\d+\.\d+", urlparse(url).netloc) else -1
    except:
        return None

def extract_page_features(url):
    features = {}

    snapshot_url = get_wayback_snapshot(url) or url  # Use Wayback snapshot if available
    try:
        response = requests.get(snapshot_url, timeout=5, headers={"User-Agent": "Mozilla/5.0"})
        if response.status_code != 200:
            return None  # Skip URL if it's inaccessible

        soup = BeautifulSoup(response.text, 'html.parser')

        features["HasTitle"] = 1 if soup.title else -1
        features["HasDescription"] = 1 if soup.find("meta", attrs={"name": "description"}) else -1
        features["HasFavicon"] = 1 if soup.find("link", rel="icon") else -1
        features["HasSubmitButton"] = 1 if soup.find("input", {"type": "submit"}) else -1
        features["HasPasswordField"] = 1 if soup.find("input", {"type": "password"}) else -1
        features["NoOfImage"] = len(soup.find_all("img"))
        features["NoOfJS"] = len(soup.find_all("script"))
        features["NoOfCSS"] = len(soup.find_all("link", {"rel": "stylesheet"}))

        # Count internal and external links
        links = [a.get("href", "") for a in soup.find_all("a", href=True)]
        domain = tldextract.extract(url).registered_domain
        features["NoOfSelfRef"] = sum(1 for link in links if tldextract.extract(link).registered_domain == domain)
        features["NoOfExternalRef"] = sum(1 for link in links if tldextract.extract(link).registered_domain != domain)

        # Largest Line Length
        features["LargestLineLength"] = max((len(line) for line in response.text.split("\n")), default=0)

        # JavaScript analysis
        script_text = response.text.lower()
        features["RightClick"] = 1 if "event.button==2" in script_text else -1
        features["popUpWindow"] = 1 if "window.open" in script_text else -1
        features["Iframe"] = 1 if "<iframe" in script_text else -1
        features["Redirect"] = 1 if "window.location" in script_text or "meta http-equiv=\"refresh\"" in script_text else -1
        features["on_mouseover"] = 1 if "onmouseover" in script_text else -1

        # Check for obfuscation
        features["HasObfuscation"] = 1 if re.search(r"eval\(unescape|base64", script_text) else -1

        # Check for copyright
        features["HasCopyrightInfo"] = 1 if any(
            term in script_text for term in ["©", "copyright", "all rights reserved"]
        ) else -1

        # Check Google indexing
        try:
            features["Google_Index"] = 1 if list(search(url, num_results=1)) else -1
        except:
            features["Google_Index"] = -1

    except:
        return None  # Skip this URL if any error occurs

    return features

def extract_features_from_urls(urls):
    data = []

    for url in urls:
        numerical_features = extract_numerical_features(url)
        if numerical_features is None:
            continue  # Skip this URL if numerical features couldn't be extracted

        is_https_val = is_https(url)
        is_domain_ip_val = is_domain_ip(url)
        if is_https_val is None or is_domain_ip_val is None:
            continue  # Skip if either of these fails

        page_features = extract_page_features(url)
        if page_features is None:
            continue  # Skip if webpage features couldn't be extracted

        # Combine all feature values
        row = numerical_features + [is_https_val, is_domain_ip_val] + list(page_features.values())
        data.append(row)

    # Column names
    columns = [
        "URLLength", "DomainLength", "TLDLength", "LetterRatioInURL", "DigitRatioInURL", "SpacialCharRatioInURL",
        "IsHTTPS", "IsDomainIP", "HasTitle", "HasDescription", "HasFavicon", "HasSubmitButton", "HasPasswordField",
        "NoOfImage", "NoOfJS", "NoOfCSS", "NoOfSelfRef", "NoOfExternalRef", "LargestLineLength", "RightClick",
        "popUpWindow", "Iframe", "Redirect", "on_mouseover", "HasObfuscation", "HasCopyrightInfo", "Google_Index"
    ]

    return pd.DataFrame(data, columns=columns) if data else None

# Example execution
input_file = "data.xlsx"
output_file = "phishing_features.xlsx"

df_urls = pd.read_csv(input_file)
urls = df_urls.iloc[:, 0].dropna().tolist()

df = extract_features_from_urls(urls)
if df is not None:
    df.to_excel(output_file, index=False)
    print(f"Feature extraction completed! Data saved to {output_file}")
else:
    print("No valid URLs processed. Please check your input data.")

In [None]:
!pip install tldextract

Collecting tldextract
  Downloading tldextract-5.1.3-py3-none-any.whl.metadata (11 kB)
Collecting requests-file>=1.4 (from tldextract)
  Downloading requests_file-2.1.0-py2.py3-none-any.whl.metadata (1.7 kB)
Downloading tldextract-5.1.3-py3-none-any.whl (104 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.9/104.9 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading requests_file-2.1.0-py2.py3-none-any.whl (4.2 kB)
Installing collected packages: requests-file, tldextract
Successfully installed requests-file-2.1.0 tldextract-5.1.3


In [None]:
!pip install Levenshtein

Collecting Levenshtein
  Downloading levenshtein-0.27.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.6 kB)
Collecting rapidfuzz<4.0.0,>=3.9.0 (from Levenshtein)
  Downloading rapidfuzz-3.13.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Downloading levenshtein-0.27.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (161 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m161.7/161.7 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading rapidfuzz-3.13.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m48.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz, Levenshtein
Successfully installed Levenshtein-0.27.1 rapidfuzz-3.13.0


In [None]:
!pip install waybackpy

Collecting waybackpy
  Downloading waybackpy-3.0.6-py3-none-any.whl.metadata (9.9 kB)
Downloading waybackpy-3.0.6-py3-none-any.whl (34 kB)
Installing collected packages: waybackpy
Successfully installed waybackpy-3.0.6


In [None]:
!pip install whois

Collecting whois
  Downloading whois-1.20240129.2-py3-none-any.whl.metadata (1.3 kB)
Downloading whois-1.20240129.2-py3-none-any.whl (61 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/61.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.8/61.8 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: whois
Successfully installed whois-1.20240129.2


In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Lasso, Ridge
from sklearn.metrics import accuracy_score

# Load dataset
def load_data(file_path):
    df = pd.read_excel(file_path)
    return df

# Preprocessing: Handle missing values, normalize, and separate features/labels
def preprocess_data(df):
    df = df.dropna()  # Drop rows with missing values

    X = df.drop(columns=["label"])  # Features
    y = df["label"]  # Target variable

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    return X, X_scaled, y, scaler

# Perform Lasso and Ridge regression
def feature_importance_lasso_ridge(X, X_scaled, y):
    X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)

    # Lasso Regression
    lasso = Lasso(alpha=0.01)
    lasso.fit(X_train, y_train)

    # Ridge Regression
    ridge = Ridge(alpha=0.01)
    ridge.fit(X_train, y_train)

    # Get feature importance
    feature_names = X.columns
    lasso_importance = np.abs(lasso.coef_)
    ridge_importance = np.abs(ridge.coef_)

    # Print sorted importance
    lasso_features = sorted(zip(feature_names, lasso_importance), key=lambda x: x[1], reverse=True)
    ridge_features = sorted(zip(feature_names, ridge_importance), key=lambda x: x[1], reverse=True)

    print("Lasso Feature Importance:")
    for feature, importance in lasso_features:
        print(f"{feature}: {importance:.5f}")

    print("\nRidge Feature Importance:")
    for feature, importance in ridge_features:
        print(f"{feature}: {importance:.5f}")

    return lasso, ridge

# Main function
def main():

    df = pd.read_excel("testdata.xlsx")
    X, X_scaled, y, scaler = preprocess_data(df)
    lasso, ridge = feature_importance_lasso_ridge(X, X_scaled, y)

if __name__ == "__main__":
    main()


Lasso Feature Importance:
URLSimilarityIndex: 0.23357
IsHTTPS: 0.10304
HasSocialNet: 0.07442
HasCopyrightInfo: 0.06628
HasDescription: 0.04068
HasSubmitButton: 0.02653
DomainTitleMatchScore: 0.01616
HasFavicon: 0.01410
SpacialCharRatioInURL: 0.00939
NoOfQMarkInURL: 0.00891
URLLength: 0.00502
HasHiddenFields: 0.00422
HasTitle: 0.00311
Robots: 0.00219
NoOfJS: 0.00105
DomainLength: 0.00000
CharContinuationRate: 0.00000
URLCharProb: 0.00000
LetterRatioInURL: 0.00000
DegitRatioInURL: 0.00000
LineOfCode: 0.00000
IsResponsive: 0.00000
NoOfiFrame: 0.00000
HasExternalFormSubmit: 0.00000
NoOfImage: 0.00000
NoOfSelfRef: 0.00000
NoOfExternalRef: 0.00000

Ridge Feature Importance:
URLSimilarityIndex: 0.23547
IsHTTPS: 0.10850
HasSocialNet: 0.06880
HasCopyrightInfo: 0.06196
HasDescription: 0.04016
SpacialCharRatioInURL: 0.02912
LetterRatioInURL: 0.02872
HasSubmitButton: 0.02557
NoOfQMarkInURL: 0.02192
DomainLength: 0.02122
URLLength: 0.02007
HasFavicon: 0.01834
DomainTitleMatchScore: 0.01683
DegitRat

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Lasso, Ridge
from sklearn.metrics import accuracy_score

# Load dataset
df = pd.read_csv("captsonedatafinal.csv")  # Replace with your actual file

# Select important features based on Lasso and Ridge results
selected_features = [
    "URLSimilarityIndex", "IsHTTPS", "HasSocialNet", "HasCopyrightInfo",
    "HasDescription", "SpacialCharRatioInURL", "HasSubmitButton", "NoOfQMarkInURL",
    "DomainTitleMatchScore", "HasFavicon", "URLLength", "LetterRatioInURL"
]

# Extract features and target variable
X = df[selected_features]
y = df["label"]  # Assuming 'label' is the target column

# Normalize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Split data
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)

# Train Lasso Model
lasso = Lasso(alpha=0.01)  # Adjust alpha if needed
lasso.fit(X_train, y_train)

# Train Ridge Model
ridge = Ridge(alpha=1.0)
ridge.fit(X_train, y_train)

# Evaluate models
y_pred_lasso = np.round(lasso.predict(X_test))
y_pred_ridge = np.round(ridge.predict(X_test))

print("Lasso Accuracy:", accuracy_score(y_test, y_pred_lasso))
print("Ridge Accuracy:", accuracy_score(y_test, y_pred_ridge))


Lasso Accuracy: 0.9980915625861447
Ridge Accuracy: 0.9981127674462987


In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.linear_model import Lasso, Ridge
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.preprocessing import StandardScaler

# Load dataset
df = pd.read_csv("captsonedatafinal.csv")
X = df.drop(columns=["label"])  # Features
y = df["label"]  # Target

# Normalize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Define models
models = {
    "Lasso": Lasso(alpha=0.01),
    "Ridge": Ridge(alpha=1.0),
    "RandomForest": RandomForestClassifier(n_estimators=100, random_state=42),
    "XGBoost": XGBClassifier(use_label_encoder=False, eval_metric='logloss')
}

# Perform cross-validation
kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

for name, model in models.items():
    scores = cross_val_score(model, X_scaled, y, cv=kf, scoring='accuracy')
    print(f"{name} Accuracy: {np.mean(scores):.4f} (+/- {np.std(scores):.4f})")


Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/sklearn/metrics/_scorer.py", line 140, in __call__
    score = scorer._score(
            ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/sklearn/metrics/_scorer.py", line 388, in _score
    return self._sign * self._score_func(y_true, y_pred, **scoring_kwargs)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/sklearn/utils/_param_validation.py", line 216, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/sklearn/metrics/_classification.py", line 227, in accuracy_score
    y_type, y_true, y_pred = _check_targets(y_true, y_pred)
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/sklearn/metrics/_classification.py", line 107, in _check_targets
    raise ValueError(
ValueError: Classificatio

Lasso Accuracy: nan (+/- nan)


Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/sklearn/metrics/_scorer.py", line 140, in __call__
    score = scorer._score(
            ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/sklearn/metrics/_scorer.py", line 388, in _score
    return self._sign * self._score_func(y_true, y_pred, **scoring_kwargs)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/sklearn/utils/_param_validation.py", line 216, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/sklearn/metrics/_classification.py", line 227, in accuracy_score
    y_type, y_true, y_pred = _check_targets(y_true, y_pred)
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/sklearn/metrics/_classification.py", line 107, in _check_targets
    raise ValueError(
ValueError: Classificatio

Ridge Accuracy: nan (+/- nan)
RandomForest Accuracy: 1.0000 (+/- 0.0000)


Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.



XGBoost Accuracy: 1.0000 (+/- 0.0000)


In [None]:
import re
import socket
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from tld import get_tld
import pandas as pd
import concurrent.futures
from functools import partial
from tqdm.notebook import tqdm
import time
import os

class URLFeatureExtractor:
    def __init__(self, url, timeout=10):
        self.url = url
        self.timeout = timeout
        self.parsed_url = self.safe_parse(url)
        self.domain = self.parsed_url.netloc if self.parsed_url else None
        self.soup = None
        self.page_content = None
        self.response = None
        self.error = None

        try:
            headers = {'User-Agent': 'Mozilla/5.0'}
            self.response = requests.get(
                url,
                headers=headers,
                timeout=self.timeout,
                allow_redirects=True
            )
            self.page_content = self.response.text
            self.soup = BeautifulSoup(self.page_content, 'html.parser')
        except Exception as e:
            self.error = str(e)

    def safe_parse(self, url):
        try:
            return urlparse(url)
        except:
            return None

    # Fast numerical features
    def get_url_length(self):
        return len(self.url) if self.url else 0

    def get_domain_length(self):
        return len(self.domain) if self.domain else 0

    def get_tld_length(self):
        try:
            tld = get_tld(self.url, fail_silently=True)
            return len(tld) if tld else 0
        except:
            return 0

    def get_letter_ratio_in_url(self):
        letters = sum(c.isalpha() for c in self.url)
        return letters / len(self.url) if self.url else 0

    def get_digit_ratio_in_url(self):
        digits = sum(c.isdigit() for c in self.url)
        return digits / len(self.url) if self.url else 0

    def get_special_char_ratio_in_url(self):
        special = sum(not c.isalnum() for c in self.url)
        return special / len(self.url) if self.url else 0

    # Page content features
    def get_largest_line_length(self):
        if not self.page_content:
            return 0
        lines = self.page_content.split('\n')
        return max(len(line) for line in lines) if lines else 0

    def get_no_of_images(self):
        return len(self.soup.find_all('img')) if self.soup else 0

    def get_no_of_js(self):
        return len(self.soup.find_all('script')) if self.soup else 0

    def get_no_of_css(self):
        return len(self.soup.find_all('link', {'rel': 'stylesheet'})) if self.soup else 0

    def get_no_of_self_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        self_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if abs_url.startswith(base_url):
                    self_ref += 1
        return self_ref

    def get_no_of_external_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        external_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if not abs_url.startswith(base_url) and urlparse(abs_url).netloc:
                    external_ref += 1
        return external_ref

    # Security/behavior features
    def is_https(self):
        if not self.parsed_url:
            return 0
        return 1 if self.parsed_url.scheme == 'https' else (-1 if self.parsed_url.scheme == 'http' else 0)

    def has_obfuscation(self):
        if not self.page_content:
            return 0
        patterns = [
            r'%[0-9a-fA-F]{2}',
            r'\\x[0-9a-fA-F]{2}',
            r'&#x[0-9a-fA-F]+;',
            r'javascript:',
            r'eval\s*\(',
            r'document\.write',
            r'String\.fromCharCode'
        ]
        return 1 if any(re.search(pattern, self.page_content) for pattern in patterns) else -1

    def has_title(self):
        if not self.soup:
            return 0
        title = self.soup.title
        return 1 if title and title.string and title.string.strip() else -1

    def has_description(self):
        if not self.soup:
            return 0
        meta = self.soup.find('meta', attrs={'name': 'description'})
        return 1 if meta and meta.get('content', '').strip() else -1

    def has_password_field(self):
        if not self.soup:
            return 0
        password_fields = self.soup.find_all('input', {'type': 'password'})
        return 1 if password_fields else -1

    def is_domain_ip(self):
        if not self.domain:
            return 0
        try:
            socket.inet_aton(self.domain.split(':')[0])
            return 1
        except (socket.error, ValueError):
            return -1
        except:
            return 0

    def has_copyright_info(self):
        if not self.soup:
            return 0
        copyright_texts = self.soup.find_all(string=re.compile(r'copyright|©', re.I))
        return 1 if copyright_texts else -1

    def has_right_click_disabled(self):
        if not self.page_content:
            return 0
        patterns = [
            r'oncontextmenu\s*=\s*["\']return false["\']',
            r'document\.oncontextmenu\s*=\s*function\(\)\s*{\s*return false',
            r'event\.button\s*==\s*2'
        ]
        return 1 if any(re.search(pattern, self.page_content, re.I) for pattern in patterns) else -1

    def has_popup_window(self):
        if not self.page_content:
            return 0
        return 1 if re.search(r'window\.open\s*\(|alert\s*\(|confirm\s*\(|prompt\s*\(', self.page_content, re.I) else -1

    def has_iframe(self):
        if not self.soup:
            return 0
        iframes = self.soup.find_all('iframe')
        return 1 if iframes else -1

    def is_abnormal_url(self):
        if not self.url:
            return 0
        abnormal_patterns = [
            r'@',
            r'//\w+@',
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
            r'https?://[^/]+/',
            r'\.(exe|zip|rar|js|jar|dll|bat|cmd|msi)$'
        ]
        return 1 if any(re.search(pattern, self.url, re.I) for pattern in abnormal_patterns) else -1

    def has_redirect(self):
        if not self.response:
            return 0
        return 1 if len(self.response.history) > 0 else -1

    def extract_all_features(self):
        features = {
            # Basic URL features
            'url_length': self.get_url_length(),
            'domain_length': self.get_domain_length(),
            'tld_length': self.get_tld_length(),
            'letter_ratio': self.get_letter_ratio_in_url(),
            'digit_ratio': self.get_digit_ratio_in_url(),
            'special_char_ratio': self.get_special_char_ratio_in_url(),

            # Page content features
            'largest_line_length': self.get_largest_line_length(),
            'num_images': self.get_no_of_images(),
            'num_js': self.get_no_of_js(),
            'num_css': self.get_no_of_css(),
            'num_self_ref': self.get_no_of_self_ref(),
            'num_external_ref': self.get_no_of_external_ref(),

            # Security/behavior features
            'is_https': self.is_https(),
            'has_obfuscation': self.has_obfuscation(),
            'has_title': self.has_title(),
            'has_description': self.has_description(),
            'has_password_field': self.has_password_field(),
            'is_domain_ip': self.is_domain_ip(),
            'has_copyright': self.has_copyright_info(),
            'right_click_disabled': self.has_right_click_disabled(),
            'has_popup': self.has_popup_window(),
            'has_iframe': self.has_iframe(),
            'is_abnormal_url': self.is_abnormal_url(),
            'has_redirect': self.has_redirect(),

            # Status
            'error': self.error
        }
        return features

def process_single_url(url, timeout=10):
    try:
        extractor = URLFeatureExtractor(url, timeout)
        features = extractor.extract_all_features()
        features['url'] = url
        return features
    except Exception as e:
        return {'url': url, 'error': str(e)}

def process_urls_colab(input_file, output_file, max_workers=20, timeout=10, retries=2):
    """Colab-optimized URL processor with:
    - Parallel processing
    - Automatic retries
    - Progress tracking
    - Error handling
    """
    try:
        # Read input file
        df = pd.read_excel("testdata.xlsx")
        if 'url' not in df.columns:
            raise ValueError("Input file must contain 'url' column")

        print("Colab Resource Info:")
        !nvidia-smi  # GPU info
        !free -h     # RAM info

        urls = df['url'].tolist()
        results = []

        # Processing with retries
        for attempt in range(retries + 1):
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                # Only process URLs that haven't succeeded yet
                remaining_urls = [url for url in urls if url not in [r.get('url') for r in results]]
                futures = {executor.submit(process_single_url, url, timeout): url
                          for url in remaining_urls}

                try:
                    for future in tqdm(concurrent.futures.as_completed(futures),
                                     total=len(futures), desc=f"Attempt {attempt+1}"):
                        url = futures[future]
                        try:
                            results.append(future.result())
                        except Exception as e:
                            print(f"\nError processing {url}: {e}")
                            if attempt == retries:  # Final attempt
                                results.append({'url': url, 'error': str(e)})
                except Exception as e:
                    print(f"\nBatch processing error (attempt {attempt+1}): {e}")
                    if attempt == retries:
                        raise
                    max_workers = max(1, max_workers // 2)  # Reduce workers on failure
                    print(f"Reducing workers to {max_workers} for next attempt")
                    time.sleep(5)  # Cool-down period

        # Merge results with original data
        output_df = pd.merge(df, pd.DataFrame(results), on='url', how='left')
        output_df.to_excel(output_file, index=False)
        print(f"\nSuccessfully processed {len([r for r in results if r.get('error') is None])}/{len(urls)} URLs")
        print(f"Results saved to {output_file}")

    except Exception as e:
        print(f"\nFatal error: {e}")
        raise

# Example usage in Colab:
# process_urls_colab("input_urls.xlsx", "output_features.xlsx", max_workers=15)

In [None]:
import re
import requests
import tldextract
import numpy as np
import pandas as pd
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from googlesearch import search
from Levenshtein import distance as levenshtein_distance
from waybackpy import WaybackMachineCDXServerAPI
import whois

def get_wayback_snapshot(url):
    """Fetch the latest archived snapshot URL from the Wayback Machine."""
    try:
        wayback = WaybackMachineCDXServerAPI(url)
        snapshot = wayback.newest()
        return snapshot.archive_url if snapshot else None
    except:
        return None

def get_url_similarity_index(url):
    try:
        ext = tldextract.extract(url)
        domain = ext.domain
        registered_domain = ext.registered_domain
        return 1 - (levenshtein_distance(domain, registered_domain) / max(len(domain), len(registered_domain)))
    except:
        return 0

def extract_numerical_features(url):
    try:
        parsed_url = urlparse(url)
        domain = parsed_url.netloc
        ext = tldextract.extract(url)

        url_length = len(url)
        domain_length = len(domain)
        tld_length = len(ext.suffix)

        letter_ratio = sum(c.isalpha() for c in url) / url_length if url_length > 0 else 0
        digit_ratio = sum(c.isdigit() for c in url) / url_length if url_length > 0 else 0
        special_char_ratio = sum(not c.isalnum() for c in url) / url_length if url_length > 0 else 0

        url_similarity = get_url_similarity_index(url)

        return [url_length, domain_length, tld_length, letter_ratio, digit_ratio, special_char_ratio, url_similarity]
    except:
        return None

def is_https(url):
    return 1 if urlparse(url).scheme == "https" else -1

def is_domain_ip(url):
    return 1 if re.match(r"\d+\.\d+\.\d+\.\d+", urlparse(url).netloc) else -1

def extract_page_features(url):
    features = {}
    snapshot_url = get_wayback_snapshot(url) or url
    try:
        response = requests.get(snapshot_url, timeout=5, headers={"User-Agent": "Mozilla/5.0"})
        if response.status_code != 200:
            return None

        soup = BeautifulSoup(response.text, 'html.parser')
        features["HasTitle"] = 1 if soup.title else -1
        features["HasDescription"] = 1 if soup.find("meta", attrs={"name": "description"}) else -1
        features["HasFavicon"] = 1 if soup.find("link", rel="icon") else -1
        features["HasSubmitButton"] = 1 if soup.find("input", {"type": "submit"}) else -1
        features["HasPasswordField"] = 1 if soup.find("input", {"type": "password"}) else -1
        features["NoOfImage"] = len(soup.find_all("img"))
        features["NoOfJS"] = len(soup.find_all("script"))
        features["NoOfCSS"] = len(soup.find_all("link", {"rel": "stylesheet"}))

        links = [a.get("href", "") for a in soup.find_all("a", href=True)]
        domain = tldextract.extract(url).registered_domain
        features["NoOfSelfRef"] = sum(1 for link in links if tldextract.extract(link).registered_domain == domain)
        features["NoOfExternalRef"] = sum(1 for link in links if tldextract.extract(link).registered_domain != domain)

        features["LargestLineLength"] = max((len(line) for line in response.text.split("\n")), default=0)

        script_text = response.text.lower()
        features["RightClick"] = 1 if "event.button==2" in script_text else -1
        features["popUpWindow"] = 1 if "window.open" in script_text else -1
        features["Iframe"] = 1 if "<iframe" in script_text else -1
        features["Redirect"] = 1 if "window.location" in script_text or "meta http-equiv=\"refresh\"" in script_text else -1
        features["on_mouseover"] = 1 if "onmouseover" in script_text else -1
        features["HasObfuscation"] = 1 if re.search(r"eval\(unescape|base64", script_text) else -1
        features["HasCopyrightInfo"] = 1 if any(term in script_text for term in ["©", "copyright", "all rights reserved"]) else -1
        features["Google_Index"] = 1 if list(search(url, num_results=1)) else -1
    except:
        return None
    return features

def extract_features_from_urls(urls):
    data = []
    for url in urls:
        numerical_features = extract_numerical_features(url)
        if numerical_features is None:
            continue
        is_https_val = is_https(url)
        is_domain_ip_val = is_domain_ip(url)
        if is_https_val is None or is_domain_ip_val is None:
            continue
        page_features = extract_page_features(url)
        if page_features is None:
            continue

        row = [url] + numerical_features + [is_https_val, is_domain_ip_val] + list(page_features.values())
        data.append(row)

    columns = [
        "URL", "URLLength", "DomainLength", "TLDLength", "LetterRatioInURL", "DigitRatioInURL", "SpacialCharRatioInURL", "URLSimilarityIndex",
        "IsHTTPS", "IsDomainIP", "HasTitle", "HasDescription", "HasFavicon", "HasSubmitButton", "HasPasswordField", "NoOfImage", "NoOfJS", "NoOfCSS", "NoOfSelfRef", "NoOfExternalRef", "LargestLineLength",
        "RightClick", "popUpWindow", "Iframe", "Redirect", "on_mouseover", "HasObfuscation", "HasCopyrightInfo", "Google_Index"
    ]
    return pd.DataFrame(data, columns=columns) if data else None

input_file = "data.xlsx"
output_file = "phishing_features.xlsx"
df_urls = pd.read_excel(input_file)
urls = df_urls.iloc[:, 0].dropna().tolist()
df = extract_features_from_urls(urls)
if df is not None:
    df.to_excel(output_file, index=False)
    print(f"Feature extraction completed! Data saved to {output_file}")
else:
    print("No valid URLs processed. Please check your input data.")



No valid URLs processed. Please check your input data.


In [None]:
!pip install whois
import whois

# Optional: Add your own API keys if required for page rank or traffic data


def extract_numerical_features(url):
    parsed_url = urlparse(url)
    domain = parsed_url.netloc
    ext = tldextract.extract(url)

    url_length = len(url)
    domain_length = len(domain)
    tld_length = len(ext.suffix)

    letter_ratio = sum(c.isalpha() for c in url) / url_length if url_length > 0 else 0
    digit_ratio = sum(c.isdigit() for c in url) / url_length if url_length > 0 else 0
    special_char_ratio = sum(not c.isalnum() for c in url) / url_length if url_length > 0 else 0

    return [url_length, domain_length, tld_length, letter_ratio, digit_ratio, special_char_ratio]

def is_https(url):
    return 1 if urlparse(url).scheme == "https" else -1

def is_domain_ip(url):
    return 1 if re.match(r"\d+\.\d+\.\d+\.\d+", urlparse(url).netloc) else -1

def get_domain_similarity(url):
    parsed = urlparse(url)
    domain = parsed.netloc
    ext = tldextract.extract(domain)
    subdomain = ext.subdomain
    registered_domain = ext.registered_domain
    return 1 - (levenshtein_distance(subdomain, registered_domain) / max(len(subdomain), len(registered_domain))) if registered_domain and subdomain else 0

def extract_page_rank_and_traffic(domain):
    # Simulated values: Replace with real API if available
    page_rank = np.random.uniform(0, 1)
    web_traffic = np.random.randint(1000, 100000)
    return page_rank, web_traffic

def extract_page_features(url):
    features = {}
    try:
        response = requests.get(url, timeout=5, headers={"User-Agent": "Mozilla/5.0"})
        if response.status_code != 200:
            return None

        soup = BeautifulSoup(response.text, 'html.parser')

        features["HasTitle"] = 1 if soup.title else -1
        features["HasDescription"] = 1 if soup.find("meta", attrs={"name": "description"}) else -1
        features["HasFavicon"] = 1 if soup.find("link", rel="icon") else -1
        features["HasSubmitButton"] = 1 if soup.find("input", {"type": "submit"}) else -1
        features["HasPasswordField"] = 1 if soup.find("input", {"type": "password"}) else -1
        features["HasSocialNet"] = 1 if any(soup.find("a", href=re.compile(s)) for s in ["facebook", "twitter", "instagram", "linkedin"]) else -1

        features["NoOfImage"] = len(soup.find_all("img"))
        features["NoOfJS"] = len(soup.find_all("script"))
        features["NoOfCSS"] = len(soup.find_all("link", {"rel": "stylesheet"}))

        links = [a.get("href", "") for a in soup.find_all("a", href=True)]
        domain = tldextract.extract(url).registered_domain
        features["NoOfSelfRef"] = sum(1 for link in links if tldextract.extract(link).registered_domain == domain)
        features["NoOfExternalRef"] = sum(1 for link in links if tldextract.extract(link).registered_domain != domain)

        features["LargestLineLength"] = max((len(line) for line in response.text.split("\n")), default=0)

        script_text = response.text.lower()
        features["RightClick"] = 1 if "event.button==2" in script_text else -1
        features["popUpWindow"] = 1 if "window.open" in script_text else -1
        features["Iframe"] = 1 if "<iframe" in script_text else -1
        features["Redirect"] = 1 if "window.location" in script_text or "meta http-equiv=\"refresh\"" in script_text else -1
        features["on_mouseover"] = 1 if "onmouseover" in script_text else -1
        features["HasObfuscation"] = 1 if re.search(r"eval\\(unescape|base64", script_text) else -1
        features["HasCopyrightInfo"] = 1 if any(term in script_text for term in ["©", "copyright", "all rights reserved"]) else -1

        try:
            features["Google_Index"] = 1 if list(search(url, num_results=1)) else -1
        except:
            features["Google_Index"] = -1

        features["Abnormal_URL"] = 1 if len(urlparse(url).path) > 50 else -1
        features["Statistical_report"] = 1 if "phish" in url.lower() else -1

        page_rank, web_traffic = extract_page_rank_and_traffic(domain)
        features["Page_Rank"] = page_rank
        features["web_traffic"] = web_traffic

    except:
        return None

    return features

def extract_features_from_urls(urls):
    data = []
    for url in urls:
        numerical_features = extract_numerical_features(url)
        if numerical_features is None:
            continue

        is_https_val = is_https(url)
        is_domain_ip_val = is_domain_ip(url)
        similarity_index = get_domain_similarity(url)

        page_features = extract_page_features(url)
        if page_features is None:
            continue

        row = [url] + numerical_features + [similarity_index] + [is_https_val] + list(page_features.values()) + [is_domain_ip_val]
        data.append(row)

    columns = [
        "URL", "URLLength", "DomainLength", "TLDLength", "LetterRatioInURL", "DigitRatioInURL", "SpacialCharRatioInURL",
        "URLSimilarityIndex", "IsHTTPS",
        "HasTitle", "HasDescription", "HasFavicon", "HasSubmitButton", "HasPasswordField", "HasSocialNet",
        "NoOfImage", "NoOfJS", "NoOfCSS", "NoOfSelfRef", "NoOfExternalRef", "LargestLineLength",
        "RightClick", "popUpWindow", "Iframe", "Redirect", "on_mouseover", "HasObfuscation", "HasCopyrightInfo",
        "Google_Index", "Abnormal_URL", "Statistical_report", "Page_Rank", "web_traffic", "IsDomainIP"
    ]

    return pd.DataFrame(data, columns=columns) if data else None

input_file = "active_phishing.csv"
output_file = "phishing_features_updated.xlsx"

df_urls = pd.read_csv(input_file)
urls = df_urls.iloc[:, 0].dropna().tolist()

df = extract_features_from_urls(urls)
if df is not None:
    df.to_excel(output_file, index=False)
    print(f"Feature extraction completed! Data saved to {output_file}")
else:
    print("No valid URLs processed. Please check your input data.")

Collecting whois
  Downloading whois-1.20240129.2-py3-none-any.whl.metadata (1.3 kB)
Downloading whois-1.20240129.2-py3-none-any.whl (61 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/61.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.8/61.8 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: whois
Successfully installed whois-1.20240129.2


In [None]:
import re
import socket
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from tld import get_tld
import pandas as pd
import concurrent.futures
from functools import partial
from tqdm.notebook import tqdm
import time
import os

class URLFeatureExtractor:
    def __init__(self, url, timeout=10):
        self.url = url
        self.timeout = timeout
        self.parsed_url = self.safe_parse(url)
        self.domain = self.parsed_url.netloc if self.parsed_url else None
        self.soup = None
        self.page_content = None
        self.response = None
        self.error = None

        try:
            headers = {'User-Agent': 'Mozilla/5.0'}
            self.response = requests.get(
                url,
                headers=headers,
                timeout=self.timeout,
                allow_redirects=True
            )
            self.page_content = self.response.text
            self.soup = BeautifulSoup(self.page_content, 'html.parser')
        except Exception as e:
            self.error = str(e)

    def safe_parse(self, url):
        try:
            return urlparse(url)
        except:
            return None

    # Fast numerical features
    def get_url_length(self):
        return len(self.url) if self.url else 0

    def get_domain_length(self):
        return len(self.domain) if self.domain else 0

    def get_tld_length(self):
        try:
            tld = get_tld(self.url, fail_silently=True)
            return len(tld) if tld else 0
        except:
            return 0

    def get_letter_ratio_in_url(self):
        letters = sum(c.isalpha() for c in self.url)
        return letters / len(self.url) if self.url else 0

    def get_digit_ratio_in_url(self):
        digits = sum(c.isdigit() for c in self.url)
        return digits / len(self.url) if self.url else 0

    def get_special_char_ratio_in_url(self):
        special = sum(not c.isalnum() for c in self.url)
        return special / len(self.url) if self.url else 0

    # Page content features
    def get_largest_line_length(self):
        if not self.page_content:
            return 0
        lines = self.page_content.split('\n')
        return max(len(line) for line in lines) if lines else 0

    def get_no_of_images(self):
        return len(self.soup.find_all('img')) if self.soup else 0

    def get_no_of_js(self):
        return len(self.soup.find_all('script')) if self.soup else 0

    def get_no_of_css(self):
        return len(self.soup.find_all('link', {'rel': 'stylesheet'})) if self.soup else 0

    def get_no_of_self_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        self_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if abs_url.startswith(base_url):
                    self_ref += 1
        return self_ref

    def get_no_of_external_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        external_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if not abs_url.startswith(base_url) and urlparse(abs_url).netloc:
                    external_ref += 1
        return external_ref

    # Security/behavior features
    def is_https(self):
        if not self.parsed_url:
            return 0
        return 1 if self.parsed_url.scheme == 'https' else (-1 if self.parsed_url.scheme == 'http' else 0)

    def has_obfuscation(self):
        if not self.page_content:
            return 0
        patterns = [
            r'%[0-9a-fA-F]{2}',
            r'\\x[0-9a-fA-F]{2}',
            r'&#x[0-9a-fA-F]+;',
            r'javascript:',
            r'eval\s*\(',
            r'document\.write',
            r'String\.fromCharCode'
        ]
        return 1 if any(re.search(pattern, self.page_content) for pattern in patterns) else -1

    def has_title(self):
        if not self.soup:
            return 0
        title = self.soup.title
        return 1 if title and title.string and title.string.strip() else -1

    def has_description(self):
        if not self.soup:
            return 0
        meta = self.soup.find('meta', attrs={'name': 'description'})
        return 1 if meta and meta.get('content', '').strip() else -1

    def has_password_field(self):
        if not self.soup:
            return 0
        password_fields = self.soup.find_all('input', {'type': 'password'})
        return 1 if password_fields else -1

    def is_domain_ip(self):
        if not self.domain:
            return 0
        try:
            socket.inet_aton(self.domain.split(':')[0])
            return 1
        except (socket.error, ValueError):
            return -1
        except:
            return 0

    def has_copyright_info(self):
        if not self.soup:
            return 0
        copyright_texts = self.soup.find_all(string=re.compile(r'copyright|©', re.I))
        return 1 if copyright_texts else -1

    def has_right_click_disabled(self):
        if not self.page_content:
            return 0
        patterns = [
            r'oncontextmenu\s*=\s*["\']return false["\']',
            r'document\.oncontextmenu\s*=\s*function\(\)\s*{\s*return false',
            r'event\.button\s*==\s*2'
        ]
        return 1 if any(re.search(pattern, self.page_content, re.I) for pattern in patterns) else -1

    def has_popup_window(self):
        if not self.page_content:
            return 0
        return 1 if re.search(r'window\.open\s*\(|alert\s*\(|confirm\s*\(|prompt\s*\(', self.page_content, re.I) else -1

    def has_iframe(self):
        if not self.soup:
            return 0
        iframes = self.soup.find_all('iframe')
        return 1 if iframes else -1

    def is_abnormal_url(self):
        if not self.url:
            return 0
        abnormal_patterns = [
            r'@',
            r'//\w+@',
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
            r'https?://[^/]+/',
            r'\.(exe|zip|rar|js|jar|dll|bat|cmd|msi)$'
        ]
        return 1 if any(re.search(pattern, self.url, re.I) for pattern in abnormal_patterns) else -1

    def has_redirect(self):
        if not self.response:
            return 0
        return 1 if len(self.response.history) > 0 else -1

    def extract_all_features(self):
        features = {
            # Basic URL features
            'url_length': self.get_url_length(),
            'domain_length': self.get_domain_length(),
            'tld_length': self.get_tld_length(),
            'letter_ratio': self.get_letter_ratio_in_url(),
            'digit_ratio': self.get_digit_ratio_in_url(),
            'special_char_ratio': self.get_special_char_ratio_in_url(),

            # Page content features
            'largest_line_length': self.get_largest_line_length(),
            'num_images': self.get_no_of_images(),
            'num_js': self.get_no_of_js(),
            'num_css': self.get_no_of_css(),
            'num_self_ref': self.get_no_of_self_ref(),
            'num_external_ref': self.get_no_of_external_ref(),

            # Security/behavior features
            'is_https': self.is_https(),
            'has_obfuscation': self.has_obfuscation(),
            'has_title': self.has_title(),
            'has_description': self.has_description(),
            'has_password_field': self.has_password_field(),
            'is_domain_ip': self.is_domain_ip(),
            'has_copyright': self.has_copyright_info(),
            'right_click_disabled': self.has_right_click_disabled(),
            'has_popup': self.has_popup_window(),
            'has_iframe': self.has_iframe(),
            'is_abnormal_url': self.is_abnormal_url(),
            'has_redirect': self.has_redirect(),

            # Status
            'error': self.error
        }
        return features

def process_single_url(url, timeout=10):
    try:
        extractor = URLFeatureExtractor(url, timeout)
        features = extractor.extract_all_features()
        features['url'] = url
        return features
    except Exception as e:
        return {'url': url, 'error': str(e)}

def process_urls_colab(input_file, output_file, max_workers=20, timeout=10, retries=2):
    """Colab-optimized URL processor with:
    - Parallel processing
    - Automatic retries
    - Progress tracking
    - Error handling
    """
    try:
        # Read input file
        df = pd.read_excel("testdata")
        if 'url' not in df.columns:
            raise ValueError("Input file must contain 'url' column")

        print("Colab Resource Info:")
        !nvidia-smi  # GPU info
        !free -h     # RAM info

        urls = df['url'].tolist()
        results = []

        # Processing with retries
        for attempt in range(retries + 1):
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                # Only process URLs that haven't succeeded yet
                remaining_urls = [url for url in urls if url not in [r.get('url') for r in results]]
                futures = {executor.submit(process_single_url, url, timeout): url
                          for url in remaining_urls}

                try:
                    for future in tqdm(concurrent.futures.as_completed(futures),
                                     total=len(futures), desc=f"Attempt {attempt+1}"):
                        url = futures[future]
                        try:
                            results.append(future.result())
                        except Exception as e:
                            print(f"\nError processing {url}: {e}")
                            if attempt == retries:  # Final attempt
                                results.append({'url': url, 'error': str(e)})
                except Exception as e:
                    print(f"\nBatch processing error (attempt {attempt+1}): {e}")
                    if attempt == retries:
                        raise
                    max_workers = max(1, max_workers // 2)  # Reduce workers on failure
                    print(f"Reducing workers to {max_workers} for next attempt")
                    time.sleep(5)  # Cool-down period

        # Merge results with original data
        output_df = pd.merge(df, pd.DataFrame(results), on='url', how='left')
        output_df.to_excel(output_file, index=False)
        print(f"\nSuccessfully processed {len([r for r in results if r.get('error') is None])}/{len(urls)} URLs")
        print(f"Results saved to {output_file}")

    except Exception as e:
        print(f"\nFatal error: {e}")
        raise

# Example usage in Colab:
# process_urls_colab("input_urls.xlsx", "output_features.xlsx", max_workers=15)

In [None]:
!pip install tld

Collecting tld
  Downloading tld-0.13-py2.py3-none-any.whl.metadata (9.4 kB)
Downloading tld-0.13-py2.py3-none-any.whl (263 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m263.8/263.8 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tld
Successfully installed tld-0.13


In [None]:
import re
import socket
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from tld import get_tld
import pandas as pd
import concurrent.futures
from functools import partial
from tqdm.notebook import tqdm
import time
import os

class URLFeatureExtractor:
    def __init__(self, url, timeout=10):
        self.url = url
        self.timeout = timeout
        self.parsed_url = self.safe_parse(url)
        self.domain = self.parsed_url.netloc if self.parsed_url else None
        self.soup = None
        self.page_content = None
        self.response = None
        self.error = None

        try:
            headers = {'User-Agent': 'Mozilla/5.0'}
            self.response = requests.get(
                url,
                headers=headers,
                timeout=self.timeout,
                allow_redirects=True
            )
            self.page_content = self.response.text
            self.soup = BeautifulSoup(self.page_content, 'html.parser')
        except Exception as e:
            self.error = str(e)

    def safe_parse(self, url):
        try:
            return urlparse(url)
        except:
            return None

    # Fast numerical features
    def get_url_length(self):
        return len(self.url) if self.url else 0

    def get_domain_length(self):
        return len(self.domain) if self.domain else 0

    def get_tld_length(self):
        try:
            tld = get_tld(self.url, fail_silently=True)
            return len(tld) if tld else 0
        except:
            return 0

    def get_letter_ratio_in_url(self):
        letters = sum(c.isalpha() for c in self.url)
        return letters / len(self.url) if self.url else 0

    def get_digit_ratio_in_url(self):
        digits = sum(c.isdigit() for c in self.url)
        return digits / len(self.url) if self.url else 0

    def get_special_char_ratio_in_url(self):
        special = sum(not c.isalnum() for c in self.url)
        return special / len(self.url) if self.url else 0

    # Page content features
    def get_largest_line_length(self):
        if not self.page_content:
            return 0
        lines = self.page_content.split('\n')
        return max(len(line) for line in lines) if lines else 0

    def get_no_of_images(self):
        return len(self.soup.find_all('img')) if self.soup else 0

    def get_no_of_js(self):
        return len(self.soup.find_all('script')) if self.soup else 0

    def get_no_of_css(self):
        return len(self.soup.find_all('link', {'rel': 'stylesheet'})) if self.soup else 0

    def get_no_of_self_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        self_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if abs_url.startswith(base_url):
                    self_ref += 1
        return self_ref

    def get_no_of_external_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        external_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if not abs_url.startswith(base_url) and urlparse(abs_url).netloc:
                    external_ref += 1
        return external_ref

    # Security/behavior features
    def is_https(self):
        if not self.parsed_url:
            return 0
        return 1 if self.parsed_url.scheme == 'https' else (-1 if self.parsed_url.scheme == 'http' else 0)

    def has_obfuscation(self):
        if not self.page_content:
            return 0
        patterns = [
            r'%[0-9a-fA-F]{2}',
            r'\\x[0-9a-fA-F]{2}',
            r'&#x[0-9a-fA-F]+;',
            r'javascript:',
            r'eval\s*\(',
            r'document\.write',
            r'String\.fromCharCode'
        ]
        return 1 if any(re.search(pattern, self.page_content) for pattern in patterns) else -1

    def has_title(self):
        if not self.soup:
            return 0
        title = self.soup.title
        return 1 if title and title.string and title.string.strip() else -1

    def has_description(self):
        if not self.soup:
            return 0
        meta = self.soup.find('meta', attrs={'name': 'description'})
        return 1 if meta and meta.get('content', '').strip() else -1

    def has_password_field(self):
        if not self.soup:
            return 0
        password_fields = self.soup.find_all('input', {'type': 'password'})
        return 1 if password_fields else -1

    def is_domain_ip(self):
        if not self.domain:
            return 0
        try:
            socket.inet_aton(self.domain.split(':')[0])
            return 1
        except (socket.error, ValueError):
            return -1
        except:
            return 0

    def has_copyright_info(self):
        if not self.soup:
            return 0
        copyright_texts = self.soup.find_all(string=re.compile(r'copyright|©', re.I))
        return 1 if copyright_texts else -1

    def has_right_click_disabled(self):
        if not self.page_content:
            return 0
        patterns = [
            r'oncontextmenu\s*=\s*["\']return false["\']',
            r'document\.oncontextmenu\s*=\s*function\(\)\s*{\s*return false',
            r'event\.button\s*==\s*2'
        ]
        return 1 if any(re.search(pattern, self.page_content, re.I) for pattern in patterns) else -1

    def has_popup_window(self):
        if not self.page_content:
            return 0
        return 1 if re.search(r'window\.open\s*\(|alert\s*\(|confirm\s*\(|prompt\s*\(', self.page_content, re.I) else -1

    def has_iframe(self):
        if not self.soup:
            return 0
        iframes = self.soup.find_all('iframe')
        return 1 if iframes else -1

    def is_abnormal_url(self):
        if not self.url:
            return 0
        abnormal_patterns = [
            r'@',
            r'//\w+@',
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
            r'https?://[^/]+/',
            r'\.(exe|zip|rar|js|jar|dll|bat|cmd|msi)$'
        ]
        return 1 if any(re.search(pattern, self.url, re.I) for pattern in abnormal_patterns) else -1

    def has_redirect(self):
        if not self.response:
            return 0
        return 1 if len(self.response.history) > 0 else -1

    def extract_all_features(self):
        features = {
            # Basic URL features
            'url_length': self.get_url_length(),
            'domain_length': self.get_domain_length(),
            'tld_length': self.get_tld_length(),
            'letter_ratio': self.get_letter_ratio_in_url(),
            'digit_ratio': self.get_digit_ratio_in_url(),
            'special_char_ratio': self.get_special_char_ratio_in_url(),

            # Page content features
            'largest_line_length': self.get_largest_line_length(),
            'num_images': self.get_no_of_images(),
            'num_js': self.get_no_of_js(),
            'num_css': self.get_no_of_css(),
            'num_self_ref': self.get_no_of_self_ref(),
            'num_external_ref': self.get_no_of_external_ref(),

            # Security/behavior features
            'is_https': self.is_https(),
            'has_obfuscation': self.has_obfuscation(),
            'has_title': self.has_title(),
            'has_description': self.has_description(),
            'has_password_field': self.has_password_field(),
            'is_domain_ip': self.is_domain_ip(),
            'has_copyright': self.has_copyright_info(),
            'right_click_disabled': self.has_right_click_disabled(),
            'has_popup': self.has_popup_window(),
            'has_iframe': self.has_iframe(),
            'is_abnormal_url': self.is_abnormal_url(),
            'has_redirect': self.has_redirect(),

            # Status
            'error': self.error
        }
        return features

def process_single_url(url, timeout=10):
    try:
        extractor = URLFeatureExtractor(url, timeout)
        features = extractor.extract_all_features()
        features['url'] = url
        return features
    except Exception as e:
        return {'url': url, 'error': str(e)}

def process_urls_colab(input_file, output_file, max_workers=20, timeout=10, retries=2):
    """Colab-optimized URL processor with:
    - Parallel processing
    - Automatic retries
    - Progress tracking
    - Error handling
    """
    try:
        # Read input file
        df = pd.read_excel(input_file)
        if 'url' not in df.columns:
            raise ValueError("Input file must contain 'url' column")

        print("Colab Resource Info:")
        !nvidia-smi  # GPU info
        !free -h     # RAM info

        urls = df['url'].tolist()
        results = []

        # Processing with retries
        for attempt in range(retries + 1):
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                # Only process URLs that haven't succeeded yet
                remaining_urls = [url for url in urls if url not in [r.get('url') for r in results]]
                futures = {executor.submit(process_single_url, url, timeout): url
                          for url in remaining_urls}

                try:
                    for future in tqdm(concurrent.futures.as_completed(futures),
                                     total=len(futures), desc=f"Attempt {attempt+1}"):
                        url = futures[future]
                        try:
                            results.append(future.result())
                        except Exception as e:
                            print(f"\nError processing {url}: {e}")
                            if attempt == retries:  # Final attempt
                                results.append({'url': url, 'error': str(e)})
                except Exception as e:
                    print(f"\nBatch processing error (attempt {attempt+1}): {e}")
                    if attempt == retries:
                        raise
                    max_workers = max(1, max_workers // 2)  # Reduce workers on failure
                    print(f"Reducing workers to {max_workers} for next attempt")
                    time.sleep(5)  # Cool-down period

        # Merge results with original data
        output_df = pd.merge(df, pd.DataFrame(results), on='url', how='left')
        output_df.to_excel(output_file, index=False)
        print(f"\nSuccessfully processed {len([r for r in results if r.get('error') is None])}/{len(urls)} URLs")
        print(f"Results saved to {output_file}")

    except Exception as e:
        print(f"\nFatal error: {e}")
        raise

# Example usage in Colab:
# process_urls_colab("input_urls.xlsx", "output_features.xlsx", max_workers=15)

In [None]:
process_urls_colab("testdata.xlsx", "output.xlsx", max_workers=15)

Colab Resource Info:
/bin/bash: line 1: nvidia-smi: command not found
               total        used        free      shared  buff/cache   available
Mem:            12Gi       1.2Gi       8.2Gi       2.0Mi       3.3Gi        11Gi
Swap:             0B          0B          0B


Attempt 1:   0%|          | 0/200 [00:00<?, ?it/s]

Attempt 2: 0it [00:00, ?it/s]

Attempt 3: 0it [00:00, ?it/s]


Successfully processed 171/200 URLs
Results saved to output.xlsx


In [16]:
import re
import socket
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from tld import get_tld
import pandas as pd
import concurrent.futures
from functools import partial
from tqdm.notebook import tqdm
import time

class URLFeatureExtractor:
    def __init__(self, url, timeout=10):
        self.url = url
        self.timeout = timeout
        self.parsed_url = self.safe_parse(url)
        self.domain = self.parsed_url.netloc if self.parsed_url else None
        self.soup = None
        self.page_content = None
        self.response = None
        self.error = None

        try:
            headers = {'User-Agent': 'Mozilla/5.0'}
            self.response = requests.get(
                url,
                headers=headers,
                timeout=self.timeout,
                allow_redirects=True
            )
            self.page_content = self.response.text
            self.soup = BeautifulSoup(self.page_content, 'html.parser')
        except Exception as e:
            self.error = str(e)

    def safe_parse(self, url):
        try:
            return urlparse(url)
        except:
            return None

    # Basic URL features
    def get_url_length(self):
        return len(self.url) if self.url else 0

    def get_domain_length(self):
        return len(self.domain) if self.domain else 0

    def get_tld_length(self):
        try:
            tld = get_tld(self.url, fail_silently=True)
            return len(tld) if tld else 0
        except:
            return 0

    def get_letter_ratio_in_url(self):
        letters = sum(c.isalpha() for c in self.url)
        return letters / len(self.url) if self.url else 0

    def get_digit_ratio_in_url(self):
        digits = sum(c.isdigit() for c in self.url)
        return digits / len(self.url) if self.url else 0

    def get_special_char_ratio_in_url(self):
        special = sum(not c.isalnum() for c in self.url)
        return special / len(self.url) if self.url else 0

    # Page content features
    def get_largest_line_length(self):
        if not self.page_content:
            return 0
        lines = self.page_content.split('\n')
        return max(len(line) for line in lines) if lines else 0

    def get_no_of_images(self):
        return len(self.soup.find_all('img')) if self.soup else 0

    def get_no_of_js(self):
        return len(self.soup.find_all('script')) if self.soup else 0

    def get_no_of_css(self):
        return len(self.soup.find_all('link', {'rel': 'stylesheet'})) if self.soup else 0

    def get_no_of_self_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        self_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if abs_url.startswith(base_url):
                    self_ref += 1
        return self_ref

    def get_no_of_external_ref(self):
        if not self.soup or not self.parsed_url:
            return 0
        base_url = f"{self.parsed_url.scheme}://{self.parsed_url.netloc}"
        external_ref = 0

        for tag in self.soup.find_all(['a', 'link', 'script', 'img']):
            url = tag.get('href', '') or tag.get('src', '')
            if url:
                abs_url = urljoin(base_url, url)
                if not abs_url.startswith(base_url) and urlparse(abs_url).netloc:
                    external_ref += 1
        return external_ref

    # Security/behavior features
    def is_https(self):
        if not self.parsed_url:
            return 0
        return 1 if self.parsed_url.scheme == 'https' else (-1 if self.parsed_url.scheme == 'http' else 0)

    def has_obfuscation(self):
        if not self.page_content:
            return 0
        patterns = [
            r'%[0-9a-fA-F]{2}',
            r'\\x[0-9a-fA-F]{2}',
            r'&#x[0-9a-fA-F]+;',
            r'javascript:',
            r'eval\s*\(',
            r'document\.write',
            r'String\.fromCharCode'
        ]
        return 1 if any(re.search(pattern, self.page_content) for pattern in patterns) else -1

    def has_title(self):
        if not self.soup:
            return 0
        title = self.soup.title
        return 1 if title and title.string and title.string.strip() else -1

    def has_description(self):
        if not self.soup:
            return 0
        meta = self.soup.find('meta', attrs={'name': 'description'})
        return 1 if meta and meta.get('content', '').strip() else -1

    def has_submit_button(self):
        if not self.soup:
            return 0
        buttons = self.soup.find_all('input', {'type': 'submit'}) + self.soup.find_all('button')
        return 1 if buttons else -1

    def has_password_field(self):
        if not self.soup:
            return 0
        password_fields = self.soup.find_all('input', {'type': 'password'})
        return 1 if password_fields else -1

    def has_social_net(self):
        if not self.soup:
            return 0
        social_keywords = ['facebook', 'twitter', 'linkedin', 'instagram', 'youtube', 'pinterest']
        for keyword in social_keywords:
            if self.soup.find_all(href=re.compile(keyword, re.I)):
                return 1
        return -1

    def has_favicon(self):
        if not self.soup:
            return 0
        favicon = self.soup.find('link', rel=re.compile('icon', re.I))
        return 1 if favicon else -1

    def is_domain_ip(self):
        if not self.domain:
            return 0
        try:
            socket.inet_aton(self.domain.split(':')[0])
            return 1
        except (socket.error, ValueError):
            return -1
        except:
            return 0

    def has_copyright_info(self):
        if not self.soup:
            return 0
        copyright_texts = self.soup.find_all(string=re.compile(r'copyright|©', re.I))
        return 1 if copyright_texts else -1

    def has_right_click_disabled(self):
        if not self.page_content:
            return 0
        patterns = [
            r'oncontextmenu\s*=\s*["\']return false["\']',
            r'document\.oncontextmenu\s*=\s*function\(\)\s*{\s*return false',
            r'event\.button\s*==\s*2'
        ]
        return 1 if any(re.search(pattern, self.page_content, re.I) for pattern in patterns) else -1

    def has_popup_window(self):
        if not self.page_content:
            return 0
        return 1 if re.search(r'window\.open\s*\(|alert\s*\(|confirm\s*\(|prompt\s*\(', self.page_content, re.I) else -1

    def has_iframe(self):
        if not self.soup:
            return 0
        iframes = self.soup.find_all('iframe')
        return 1 if iframes else -1

    def is_abnormal_url(self):
        if not self.url:
            return 0
        abnormal_patterns = [
            r'@',
            r'//\w+@',
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
            r'https?://[^/]+/',
            r'\.(exe|zip|rar|js|jar|dll|bat|cmd|msi)$'
        ]
        return 1 if any(re.search(pattern, self.url, re.I) for pattern in abnormal_patterns) else -1

    def has_redirect(self):
        if not self.response:
            return 0
        return 1 if len(self.response.history) > 0 else -1

    def has_on_mouseover(self):
        if not self.page_content:
            return 0
        return 1 if re.search(r'onmouseover\s*=', self.page_content, re.I) else -1

    def extract_all_features(self):
        features = {
            'URL': self.url,
            'URLLength': self.get_url_length(),
            'DomainLength': self.get_domain_length(),
            'TLDLength': self.get_tld_length(),
            'LetterRatioInURL': self.get_letter_ratio_in_url(),
            'DigitRatioInURL': self.get_digit_ratio_in_url(),
            'SpacialCharRatioInURL': self.get_special_char_ratio_in_url(),
            'LargestLineLength': self.get_largest_line_length(),
            'NoOfImage': self.get_no_of_images(),
            'NoOfJS': self.get_no_of_js(),
            'NoOfCSS': self.get_no_of_css(),
            'NoOfSelfRef': self.get_no_of_self_ref(),
            'NoOfExternalRef': self.get_no_of_external_ref(),
            'IsHTTPS': self.is_https(),
            'HasObfuscation': self.has_obfuscation(),
            'HasTitle': self.has_title(),
            'HasDescription': self.has_description(),
            'HasSubmitButton': self.has_submit_button(),
            'HasPasswordField': self.has_password_field(),
            'HasSocialNet': self.has_social_net(),
            'HasFavicon': self.has_favicon(),
            'IsDomainIP': self.is_domain_ip(),
            'HasCopyrightInfo': self.has_copyright_info(),
            'RightClick': self.has_right_click_disabled(),
            'popUpWindow': self.has_popup_window(),
            'Iframe': self.has_iframe(),
            'Abnormal_URL': self.is_abnormal_url(),
            'Redirect': self.has_redirect(),
            'on_mouseover': self.has_on_mouseover(),
            'error': self.error
        }
        return features

def process_single_url(url, timeout=10):
    try:
        extractor = URLFeatureExtractor(url, timeout)
        features = extractor.extract_all_features()
        return features
    except Exception as e:
        return {'URL': url, 'error': str(e)}

def process_urls_colab(input_file, output_file, max_workers=20, timeout=10, retries=2):
    """Colab-optimized URL processor with all requested features"""
    try:
        df = pd.read_excel(input_file)
        if 'URL' not in df.columns:
            raise ValueError("Input file must contain 'URL' column")

        print("Colab Resource Info:")
        !nvidia-smi
        !free -h

        urls = df['URL'].tolist()
        results = []

        for attempt in range(retries + 1):
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                remaining_urls = [url for url in urls if url not in [r.get('URL') for r in results]]
                futures = {executor.submit(process_single_url, url, timeout): url
                          for url in remaining_urls}

                try:
                    for future in tqdm(concurrent.futures.as_completed(futures),
                                     total=len(futures), desc=f"Attempt {attempt+1}"):
                        url = futures[future]
                        try:
                            results.append(future.result())
                        except Exception as e:
                            print(f"\nError processing {url}: {e}")
                            if attempt == retries:
                                results.append({'URL': url, 'error': str(e)})
                except Exception as e:
                    print(f"\nBatch processing error (attempt {attempt+1}): {e}")
                    if attempt == retries:
                        raise
                    max_workers = max(1, max_workers // 2)
                    print(f"Reducing workers to {max_workers} for next attempt")
                    time.sleep(5)

        output_df = pd.merge(df, pd.DataFrame(results), on='URL', how='left')
        output_df.to_excel(output_file, index=False)
        print(f"\nSuccessfully processed {len([r for r in results if r.get('error') is None])}/{len(urls)} URLs")
        print(f"Results saved to {output_file}")

    except Exception as e:
        print(f"\nFatal error: {e}")
        raise

# Example usage:
# process_urls_colab("input.xlsx", "output.xlsx", max_workers=20)

In [None]:
process_urls_colab("legimate_final.xlsx", "output.xlsx", max_workers=20)

Colab Resource Info:
/bin/bash: line 1: nvidia-smi: command not found
               total        used        free      shared  buff/cache   available
Mem:            12Gi       1.2Gi       8.9Gi       2.0Mi       2.6Gi        11Gi
Swap:             0B          0B          0B


Attempt 1:   0%|          | 0/65223 [00:00<?, ?it/s]


Assuming this really is an XML document, what you're doing might work, but you should know that using an XML parser will be more reliable. To parse this document as XML, make sure you have the Python package 'lxml' installed, and pass the keyword argument `features="xml"` into the BeautifulSoup constructor.




  self.soup = BeautifulSoup(self.page_content, 'html.parser')


Attempt 2: 0it [00:00, ?it/s]

Attempt 3: 0it [00:00, ?it/s]


Successfully processed 53208/65223 URLs
Results saved to output.xlsx


In [17]:
process_urls_colab("legimate_final.xlsx", "output.xlsx", max_workers=20)

Colab Resource Info:
/bin/bash: line 1: nvidia-smi: command not found
               total        used        free      shared  buff/cache   available
Mem:            12Gi       2.0Gi       8.0Gi       2.0Mi       2.7Gi        10Gi
Swap:             0B          0B          0B


Attempt 1:   0%|          | 0/69998 [00:00<?, ?it/s]


Assuming this really is an XML document, what you're doing might work, but you should know that using an XML parser will be more reliable. To parse this document as XML, make sure you have the Python package 'lxml' installed, and pass the keyword argument `features="xml"` into the BeautifulSoup constructor.




  self.soup = BeautifulSoup(self.page_content, 'html.parser')


Attempt 2: 0it [00:00, ?it/s]

Attempt 3: 0it [00:00, ?it/s]


Successfully processed 5725/69998 URLs
Results saved to output.xlsx
