In [4]:
# Random Forest Classifier from Scratch for Malicious URL Detection
# No external libraries used — aligns with improving ML transparency in security

# ---------------------------
# Section 1: Dataset Loading and Preprocessing
# ---------------------------

def read_csv_manual(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    header = lines[0].strip().split(',')
    data = [line.strip().split(',') for line in lines[1:] if len(line.strip().split(',')) == len(header)]
    return header, data

header, data = read_csv_manual("malicious_phish.csv")

print("First 5 rows of the dataset:")
for row in data[:5]:
    print(row)

# Extract URLs and labels
url_index = header.index("url")
type_index = header.index("type")

# Count of each type
print("\nType counts:")
type_counts = {}
for row in data:
    if len(row) > type_index:
        label = row[type_index]
        type_counts[label] = type_counts.get(label, 0) + 1
for k, v in type_counts.items():
    print(f"{k}: {v} URLs")

urls = [row[url_index] for row in data]
labels = [0 if row[type_index] == 'benign' else 1 for row in data if len(row) > type_index]



First 5 rows of the dataset:
['br-icloud.com.br', 'phishing']
['mp3raid.com/music/krizz_kaliko.html', 'benign']
['bopsecrets.org/rexroth/cr/1.htm', 'benign']
['http://www.garage-pirenne.be/index.php?option=com_content&view=article&id=70&vsig70_0=15', 'defacement']
['http://adventure-nicaragua.net/index.php?option=com_mailto&tmpl=component&link=aHR0cDovL2FkdmVudHVyZS1uaWNhcmFndWEubmV0L2luZGV4LnBocD9vcHRpb249Y29tX2NvbnRlbnQmdmlldz1hcnRpY2xlJmlkPTQ3OmFib3V0JmNhdGlkPTM2OmRlbW8tYXJ0aWNsZXMmSXRlbWlkPTU0', 'defacement']

Type counts:
phishing: 93818 URLs
benign: 427883 URLs
defacement: 96062 URLs
malware: 32520 URLs
s/: 1 URLs


In [5]:

# ---------------------------
# WHOIS-based domain feature: domain expiration in days
# ---------------------------
import whois
from datetime import datetime

def domain_expiration_days(url):
    try:
        domain = re.findall(r"https?://([^/]+)", url)[0]
        info = whois.whois(domain)
        expiration_date = info.expiration_date

        if isinstance(expiration_date, list):
            expiration_date = expiration_date[0]
        if expiration_date is None:
            return 0  # suspicious
        delta = (expiration_date - datetime.utcnow()).days
        return max(delta, 0)
    except Exception:
        return 0  # fallback for failed lookups


In [6]:

# ---------------------------
# Section 2: Feature Extraction
# ---------------------------

def extract_features(url):
    def count_digits(s):
        return sum(1 for c in s if '0' <= c <= '9')

    def starts_with_https(s):
        return 1 if s.startswith("https") else 0

    def has_ip(s):
        import re
        return 1 if re.match(r"http[s]?://(?:\d{1,3}\.){3}\d{1,3}", s) else 0

    return [
        domain_expiration_days(url),  # WHOIS feature
        len(url),
        url.count('.'),
        url.count('-'),
        url.count('_'),
        url.count('/'),
        url.count('?'),
        url.count('='),
        url.count('@'),
        count_digits(url),
        has_ip(url),
        starts_with_https(url)
    ]

X = [extract_features(url) for url in urls]
# Ensure y is properly defined alongside X
y = labels

# Normalize features manually
# Global normalization stats
means = []
stds = []

def compute_normalization_stats(X):
    global means, stds
    n = len(X[0])
    means = [sum(row[i] for row in X) / len(X) for i in range(n)]
    stds = [
        (sum((row[i] - means[i]) ** 2 for row in X) / len(X)) ** 0.5
        for i in range(n)
    ]

def normalize_features(X):
    global means, stds
    n = len(X[0])
    return [
        [(row[i] - means[i]) / stds[i] if stds[i] != 0 else 0 for i in range(n)]
        for row in X
    ]

compute_normalization_stats(X)
X = normalize_features(X)

In [7]:
# Use the full dataset for training/testing
import time

def train_test_split_manual(X, y, test_ratio=0.3):
    combined = list(zip(X, y))
    import random
    random.seed(42)
    random.shuffle(combined)
    split_idx = int(len(combined) * (1 - test_ratio))
    train = combined[:split_idx]
    test = combined[split_idx:]
    X_train, y_train = zip(*train)
    X_test, y_test = zip(*test)
    return list(X_train), list(X_test), list(y_train), list(y_test)

X_train, X_test, y_train, y_test = train_test_split_manual(X, y, test_ratio=0.3)


In [8]:
# ---------------------------
# Section 4: Random Forest Core Code
# ---------------------------

class DecisionTreeNode:
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, value=None):
        self.feature_index = feature_index
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

def gini_index(groups):
    total = sum(len(group) for group in groups)
    gini = 0.0
    for group in groups:
        size = len(group)
        if size == 0:
            continue
        score = 0.0
        labels = [row[-1] for row in group]
        for class_val in [0, 1]:
            proportion = labels.count(class_val) / size
            score += proportion * proportion
        gini += (1 - score) * (size / total)
    return gini

def split_data(index, threshold, dataset):
    left, right = [], []
    for row in dataset:
        if row[0][index] < threshold:
            left.append(row)
        else:
            right.append(row)
    return left, right

def best_split(dataset):
    best_idx, best_thresh, best_score = None, None, float('inf')
    best_groups = None
    n_features = len(dataset[0][0])
    for i in range(n_features):
        thresholds = set(row[0][i] for row in dataset)
        for t in thresholds:
            groups = split_data(i, t, dataset)
            gini = gini_index(groups)
            if gini < best_score:
                best_idx, best_thresh, best_score, best_groups = i, t, gini, groups
    return best_idx, best_thresh, best_groups

def build_tree(dataset, max_depth, min_size, depth=0):
    labels = [row[1] for row in dataset]
    if labels.count(labels[0]) == len(labels):
        return DecisionTreeNode(value=labels[0])
    if depth >= max_depth or len(dataset) <= min_size:
        return DecisionTreeNode(value=max(set(labels), key=labels.count))
    index, threshold, (left, right) = best_split(dataset)
    if not left or not right:
        return DecisionTreeNode(value=max(set(labels), key=labels.count))
    left_node = build_tree(left, max_depth, min_size, depth + 1)
    right_node = build_tree(right, max_depth, min_size, depth + 1)
    return DecisionTreeNode(index, threshold, left_node, right_node)

def predict_tree(node, row):
    if node.value is not None:
        return node.value
    if row[node.feature_index] < node.threshold:
        return predict_tree(node.left, row)
    else:
        return predict_tree(node.right, row)

class RandomForest:
    def __init__(self, n_trees=5, max_depth=10, min_size=10):
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.min_size = min_size
        self.trees = []

    def subsample(self, data):
        import random
        return [random.choice(data) for _ in range(len(data))]

    def fit(self, X, y):
        dataset = list(zip(X, y))
        self.trees = []
        for _ in range(self.n_trees):
            sample = self.subsample(dataset)
            tree = build_tree(sample, self.max_depth, self.min_size)
            self.trees.append(tree)

    def predict(self, row):
        predictions = [predict_tree(tree, row) for tree in self.trees]
        return max(set(predictions), key=predictions.count)

    def predict_all(self, X):
        return [self.predict(row) for row in X]


In [9]:
# ---------------------------
# Section 5: Train, Evaluate, and Predict Custom URL
# ---------------------------

rf = RandomForest(n_trees=5, max_depth=10, min_size=5)
start_time = time.time()
rf.fit(X_train, y_train)
y_pred = rf.predict_all(X_test)

def compute_metrics(y_true, y_pred):
    tp = sum((yt == 1 and yp == 1) for yt, yp in zip(y_true, y_pred))
    tn = sum((yt == 0 and yp == 0) for yt, yp in zip(y_true, y_pred))
    fp = sum((yt == 0 and yp == 1) for yt, yp in zip(y_true, y_pred))
    fn = sum((yt == 1 and yp == 0) for yt, yp in zip(y_true, y_pred))
    accuracy = (tp + tn) / len(y_true)
    precision = tp / (tp + fp) if (tp + fp) != 0 else 0
    recall = tp / (tp + fn) if (tp + fn) != 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) != 0 else 0
    return accuracy, precision, recall, f1

acc, prec, rec, f1 = compute_metrics(y_test, y_pred)
print("Accuracy:", acc)
print("Precision:", prec)
print("Recall:", rec)
print("F1 Score:", f1)
print("Training and testing time:", round(time.time() - start_time, 2), "seconds")


Accuracy: 0.88210840347334
Precision: 0.8338093129094825
Recall: 0.8183739544923104
F1 Score: 0.8260195320478392
Training and testing time: 6640.41 seconds


In [11]:
# Custom input prediction loop with continue confirmation
print("Enter URLs one by one. Type 'exit' to stop:")
while True:
    url = input("Enter URL: ").strip()
    if url.lower() == 'exit':
        break
    if url:
        features = extract_features(url)
        norm_features = normalize_features([features])[0]
        prediction = rf.predict(norm_features)
        result = "Malicious" if prediction == 1 else "Benign"
        print(f"URL: {url} => Prediction: {result}")
        cont = input("Check another? (y/n): ").strip().lower()
        if cont != 'y':
            print("Exiting URL checker.")
            break



Enter URLs one by one. Type 'exit' to stop:


Enter URL:  www.google.com/


URL: www.google.com/ => Prediction: Benign


Check another? (y/n):  y
Enter URL:  www.g00gle.com


URL: www.g00gle.com => Prediction: Malicious


Check another? (y/n):  y
Enter URL:  www.csueastbay.edu/canvaslogin


URL: www.csueastbay.edu/canvaslogin => Prediction: Benign


Check another? (y/n):  n


Exiting URL checker.
