As described in: Lin, M.-S., et al.: Malicious URL filtering- a big data application. IEEE Interna-
tional Conference on Big Data (2013) we are going to extract features from URLs

In [541]:
import re
from urllib.parse import urlparse
from sklearn import *
import numpy as np
import pandas as pd
import json
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score
import joblib

Step 1: Lexical features extraction: split URLs into components, apply a sliding window to the domain, and use a bag-of-words model to describe each component.

In [542]:
def extract_lexical_features(url):
    # Ensure the URL has a scheme for proper parsing, crucial for IPv6 addresses
    if not urlparse(url).scheme:
        url = 'http://' + url  # Prepend with a default scheme if missing
    
    try:
        # Split URL into components
        parsed_url = urlparse(url)
        domain = parsed_url.netloc
        path = parsed_url.path
        query = parsed_url.query

        # Remove common prefixes from domain
        domain = domain.replace('www.', '')

        # Calculate features
        features = {
            'domain': domain,
            'domain_length': len(domain),
            'path_length': len(path),
            'query_length': len(query),
            'num_path_components': len(path.split('/')) - 1,  # Subtracting 1 because the leading '/' results in an empty string at the start
            'num_query_components': len(query.split('&')) if query else 0,  # Only count if there's a query
        }

        # Additional processing to identify specific features, e.g., presence of digits in domain
        features['has_digits_in_domain'] = any(char.isdigit() for char in domain)
        
        return features
    except ValueError as e:
        # Handle specific errors, e.g., invalid IPv6 URL
        print(f"Error processing URL {url}: {e}")
        return {}

Step 2: descriptive features extraction - this function will further split the path component, remove common prefixes and TLDs, and calc stats

In [543]:
def extract_descriptive_features(url):
    parsed_url = urlparse(url)
    domain = parsed_url.netloc.replace('www.', '')  # Remove common prefix
    path = parsed_url.path
    query = parsed_url.query
    
    # Further split the path
    path_components = path.split('/')
    filename = path_components[-1] if '.' in path_components[-1] else None
    file_extension = filename.split('.')[-1] if filename else None
    
    # Calculate statistics
    features = {
        'domain_length': len(domain),
        'path_length': len(path),
        'query_length': len(query),
        'num_path_components': len(path_components),
        'filename': filename,
        'file_extension': file_extension,
        'is_ip_address': bool(re.match(r'^\d{1,3}(\.\d{1,3}){3}$', domain)),
        'executable_extension': file_extension in ['exe', 'bin', 'bat']
    }
    
    return features

Step 3: integrate and test

In [544]:
#CAUTION: neutered phishing URL: urls = ['{http colon slash slash}clt1658125{dot}benchurl{dot}com']
urls = ['https://www.example.org/bin.exe?arg=value', 'http://blog.example.com:443/executable.exe?arg=test123']

for url in urls:
    print(f"URL: {url}")
    lexical_features = extract_lexical_features(url)
    descriptive_features = extract_descriptive_features(url)
    print("Lexical Features:", lexical_features)
    print("Descriptive Features:", descriptive_features)
    print("\n")

URL: https://www.example.org/bin.exe?arg=value
Lexical Features: {'domain': 'example.org', 'domain_length': 11, 'path_length': 8, 'query_length': 9, 'num_path_components': 1, 'num_query_components': 1, 'has_digits_in_domain': False}
Descriptive Features: {'domain_length': 11, 'path_length': 8, 'query_length': 9, 'num_path_components': 2, 'filename': 'bin.exe', 'file_extension': 'exe', 'is_ip_address': False, 'executable_extension': True}


URL: http://blog.example.com:443/executable.exe?arg=test123
Lexical Features: {'domain': 'blog.example.com:443', 'domain_length': 20, 'path_length': 15, 'query_length': 11, 'num_path_components': 1, 'num_query_components': 1, 'has_digits_in_domain': True}
Descriptive Features: {'domain_length': 20, 'path_length': 15, 'query_length': 11, 'num_path_components': 2, 'filename': 'executable.exe', 'file_extension': 'exe', 'is_ip_address': False, 'executable_extension': True}




In [545]:
df = pd.read_csv('malicious_phish-kaggle-thishusseinali.csv', names=['URL', 'Classification'])

In [546]:
# Apply lexical feature extraction
df['Lexical_Features'] = df['URL'].apply(lambda x: extract_lexical_features(x))

# Apply descriptive feature extraction
df['Descriptive_Features'] = df['URL'].apply(lambda x: extract_descriptive_features(x))

# Print the first row as an example (you can change the index to print another row)
print(df.iloc[1])

Error processing URL http://RybjUxÙãl5»7ÆE%ÝÔk+h|U+ýk©ìÉ½Æq]âF·õÁ¢w)ëA·ç°{t*m!¦2: Invalid IPv6 URL
Error processing URL http://ÆeF§÷%¶¿Õ½9¿b@Ö¸ÚZE¤ÒC¢ÄÅª2åç-]W³fU¤Jgkz.ø¿nJçåæuøD%@ðûÇùM¹uË: Invalid IPv6 URL
Error processing URL http://Ó6¸RTÃu~æÙg0>÷mÖiÓ=;XZ\%êýÜÉfn&\°%7õÉ"ieÖ1ÄÁêFÐò<$cï6t[0ò2"/Æa^2âpù/ýãÇ$E¬R«È²ú[Ì¶p¥qÒ°i°^ò[»³»]±9êdÓS¿Ë]ùþ5j¿·ªocÂplà7ÊÏJ§¢#3ðDCDõ²çÇGÝ.Vò=¿QB§Ä'`ÊáZÉê ÔîÆm®ÍÝQÓ(z;¹Áê¬âytÖÙ®ëNP²ÜEQ: Invalid IPv6 URL
Error processing URL http://µÔA¨!ÝÛ=]º£¦Pôwr72-ÕY5Äòè7¬-³]×)&¡e¸¢À6RD­NvY¨Ð«Ñ3Â¸%Qñ+ÛÈ¸$¶gz{þ: Invalid IPv6 URL
Error processing URL http://¨RÊÃûaCóÞit×ßÂe-DÖØ+9YèÌçÏ¯·"0£ÙÕ.0ößF«7¹NRÙ{ccÉÄãéçx[Ä6a5Ñ³LÖíÜÉÀ£Òma¥yRX*0ÅÝ7×ÊÁÌo«Õs¶0kdèÑ&Ä"Ï¨mZ'àDM×ñXÚÒK"päî±h¬cAÊeK@4r"^'ÓFþ1*ËË PÞô;õ$úàÑ@þ=êWÑ"Ãhñ®ç^«Ýó^çRúUJ.<6CyÜFØrÿV2ôæýZãiiIb;¨Ëµu^ÍVy)­è»âýº+SÖáÃì?å6åÔ/: Invalid IPv6 URL
Error processing URL ht

In [547]:
# Select the row index you're interested in
row_index = 1  # For example, to print the second row

# Convert the dictionary to a JSON string for pretty printing
lexical_features_str = json.dumps(df.at[row_index, 'Lexical_Features'], indent=4)
descriptive_features_str = json.dumps(df.at[row_index, 'Descriptive_Features'], indent=4)

# Print the features
print(f"Lexical Features for row {row_index}:\n{lexical_features_str}\n")
print(f"Descriptive Features for row {row_index}:\n{descriptive_features_str}\n")

Lexical Features for row 1:
{
    "domain": "br-icloud.com.br",
    "domain_length": 16,
    "path_length": 0,
    "query_length": 0,
    "num_path_components": 0,
    "num_query_components": 0,
    "has_digits_in_domain": false
}

Descriptive Features for row 1:
{
    "domain_length": 0,
    "path_length": 16,
    "query_length": 0,
    "num_path_components": 1,
    "filename": "br-icloud.com.br",
    "file_extension": "br",
    "is_ip_address": false,
    "executable_extension": false
}



In [548]:
chunk_size = 5000  # Adjust based on your system's capabilities

In [549]:
# Initialize an empty DataFrame to hold the results
df_final = pd.DataFrame()

# Process in chunks
for start in range(0, df.shape[0], chunk_size):
    end = min(start + chunk_size, df.shape[0])
    df_chunk = df.iloc[start:end].copy()
    df_chunk.reset_index(drop=True, inplace=True)
    
    # Normalize Lexical Features
    lexical_features_df = pd.json_normalize(df_chunk['Lexical_Features'])
    lexical_features_df.columns = ['Lexical_' + str(col) for col in lexical_features_df.columns]
    
    # Normalize Descriptive Features
    descriptive_features_df = pd.json_normalize(df_chunk['Descriptive_Features'])
    descriptive_features_df.columns = ['Descriptive_' + str(col) for col in descriptive_features_df.columns]
    
    # Concatenate normalized features with the chunk
    df_chunk = pd.concat([df_chunk, lexical_features_df, descriptive_features_df], axis=1)
    
    # Directly append the processed chunk to the final DataFrame with ignore_index=True
    df_final = pd.concat([df_final, df_chunk], axis=0) #ignore_index=True

# Optionally, drop the original columns containing dictionaries
df_final.drop(['Lexical_Features', 'Descriptive_Features'], axis=1, inplace=True)
#print(df_final)

In [550]:
# Drop row 1 (Note: Python uses 0-based indexing, so row 1 is the second row)
df_final = df_final.drop(1)

df_final = df_final.drop(['URL'], axis=1)
        # Proceed with retraining your model here

df_final_columns = df_final.columns.tolist()
with open('model_columns.txt', 'w') as f:
    f.write('\n'.join(df_final_columns))

# drop rows with missing values
df_final = df_final.dropna()

# Identify categorical features. This is a basic approach and might need adjustment based on your dataset.
categorical_cols = df_final.select_dtypes(include=['object', 'category']).columns

# Convert categorical features to numerical values
for col in categorical_cols:
    # Skip the target column 'Classification'
    if col == 'Classification':
        continue
    le = LabelEncoder()
    df_final[col] = le.fit_transform(df_final[col])
    # save the encoder
    joblib.dump(le, 'categorical_feature_encoder.joblib')

# Split the DataFrame into X (features) and y (target)
X = df_final.drop('Classification', axis=1)
y = df_final['Classification']

# Convert 'Classification' to numerical values if it's categorical
if y.dtype == 'object' or y.dtype.name == 'category':
    le = LabelEncoder()
    y = le.fit_transform(y)

# Split data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize and train a decision tree classifier
dTree = DecisionTreeClassifier(random_state=42)
dTree.fit(X_train, y_train)

print(f"Mapping for Classification: {dict(zip(le.classes_, le.transform(le.classes_)))}")

Mapping for Classification: {'benign': 0, 'defacement': 1, 'malware': 2, 'phishing': 3}


In [551]:
# predict test set results
y_pred = dTree.predict(X_test)

# use accuracy_score to test the model
accuracy = accuracy_score(y_test, y_pred) # as in M5_Decision Trees-1.ipynb
print(f"Accuracy of the Decision Tree model: {accuracy:.2f}")

Accuracy of the Decision Tree model: 0.94


In [552]:
# encoder saved from before - training phase
encoder = joblib.load('categorical_feature_encoder.joblib')

def predict_url_classification(url, dTree, df_final_columns):
    # Extract features
    lexical_features = extract_lexical_features(url)
    descriptive_features = extract_descriptive_features(url)
    
    # Combine features
    all_features = {**lexical_features, **descriptive_features}
    
    # Create a DataFrame for the features
    features_df = pd.DataFrame([all_features])
    
    # Ensure the DataFrame matches the training data structure
    # Add missing columns with default values
    for col in df_final_columns:
        if col not in features_df.columns:
            features_df[col] = 0  # Or another appropriate default value
    
    # Reorder columns to match the training data
    features_df = features_df[df_final_columns]
    
    # Drop columns that are not features (e.g., 'URL', 'Classification' if they were included)
    features_to_drop = ['URL', 'Classification']  # Adjust based on your actual data
    features_df = features_df.drop(columns=[col for col in features_to_drop if col in features_df.columns], errors='ignore')
    
    # Predict the classification
    prediction = dTree.predict(features_df)
    
    return prediction[0]  # Assuming binary classification for simplicity

# Use with caution if you are pasting in real malicious domains
# CAUTION # url = "c/l?u=10C78AC0&e=17AD89B&c=194D0D&t=0&email=WqS0CM9o%2BpbtiwumbI%2Fj2w%3D%3D&seq=1"
url = 'https://accounts.google.com/v3/'
classification = predict_url_classification(url, dTree, df_final_columns)
print(f"Classes: 'benign': 0, 'defacement': 1, 'malware': 2, 'phishing': 3")
print(f"Class of URL: {classification}")

Classes: 'benign': 0, 'defacement': 1, 'malware': 2, 'phishing': 3
Class of URL: 2
