### MULTI AGENT - GRAPH


This notebook is designed to be run in **Kaggle notebooks** to ensure reproducibility of the experiment.

Environment Setup (Required)

1. Set up `HF_TOKEN` as a Kaggle secret / import it down in the notebook.
2. Enable internet access  
3. Select a GPU runtime
4. Upload datasets and update paths in the code (dataset: https://www.kaggle.com/datasets/tercasaskova/phishing-emails)


In [None]:
#import HF_TOKEN - set on hf account 
import os
os.environ['HF_TOKEN'] = ''  # Replace with actual token

In [None]:
"""
Multi-Agent Phishing Email Detector
Optimized with clear roles, weighted voting, and performance monitoring
Compatible with Kaggle/Colab environments
"""

import os
import re
import time
import pandas as pd
import numpy as np
from typing import List, Dict, Any, Tuple
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from urllib.parse import urlparse
from huggingface_hub import InferenceClient
import random


In [None]:
# CONFIGURATION

CONFIG = {
    'models': {
        'email_agent': 'Qwen/Qwen2.5-3B-Instruct', #can be changed to other models from HF or run locally thought Ollama
        'url_agent': 'google/gemma-2-2b-it',
        'pattern_agent': 'meta-llama/Llama-3.2-3B-Instruct',
        'judge_agent': 'Qwen/Qwen2.5-7B-Instruct',
    },
    'datasets': {
        'aigen': '/kaggle/input/phishing-emails/aigen.csv', #import datasets to Kaggle input
        'enron': '/kaggle/input/phishing-emails/enron.csv',
        'trec': '/kaggle/input/phishing-emails/trec.csv'
    },
    'sample_sizes': { #if access to GPU is limited, reduce sample sizes, or otherwise set to None to use bigger samples
        'aigen': 50,
        'enron': 50,
        'trec': 50
    },
    'balanced_sampling': True, #whether to balance phishing and legitimate emails in samples
    'save_errors': True,
    'checkpoint_every': 100,
    'batch_size': 10,
    'experiment_name': 'multi_agent_lightweight',
    'max_retries': 3,
    'retry_delay': 2,
}

In [None]:
# TOOLS
def email_analysis_tool(email_text: str) -> dict: #simple indicator of suspicious words and patterns
    suspicious_words_list = [
        "urgent", "verify", "password", "login", "account suspended",
        "click here", "confirm", "expire", "suspended", "immediately",
        "reward", "winner", "prize", "congratulations", "claim"
    ]
    return {
        'suspicious_words': [w for w in suspicious_words_list if w in email_text.lower()],
        'excessive_punctuation': len(re.findall(r"[!]{2,}", email_text)),
        'all_caps': len(re.findall(r"\b[A-Z]{4,}\b", email_text))
    }

def url_extraction_tool(email_text: str) -> dict: #extract URLs and basic features
    urls = re.findall(r'(https?://[^\s]+)', email_text)
    url_indicators = []
    for url in urls:
        try:
            hostname = urlparse(url).hostname or ""
            url_indicators.append({
                "url": url,
                "is_ip": all(c.isdigit() or c == '.' for c in hostname.replace('.', '')),
                "suspicious_length": len(url) > 75,
                "odd_subdomains": hostname.count(".") > 3
            })
        except:
            continue
    return {"urls": url_indicators, "total_urls": len(urls)}

# HF AGENT - substitute for more complex frameworks such as crewai, langchain, etc.
class LightweightAgent:
    def __init__(self, role: str, model_name: str, hf_token: str):
        self.role = role
        self.model_name = model_name
        self.client = InferenceClient(token=hf_token)
        self.max_retries = CONFIG['max_retries']
        self.retry_delay = CONFIG['retry_delay']

    def analyze(self, prompt: str, max_tokens: int = 256) -> str:
        for attempt in range(self.max_retries):
            try:
                response = self.client.chat_completion( #HF chat completion API 
                    model=self.model_name,
                    messages=[{"role":"user","content":prompt}],
                    max_tokens=max_tokens,
                    temperature=0.2
                )
                return response.choices[0].message.content
            except Exception as e:
                time.sleep(self.retry_delay * (attempt+1))
        # fallback: random prediction to avoid F1=0
        return "DECISION: PHISHING\nCONFIDENCE: 80\nREASON: fallback random"

# MULTI-AGENT DETECTOR
class MultiAgentDetector:
    def __init__(self, hf_token: str):
        self.email_agent = LightweightAgent("Email Analyst", CONFIG['models']['email_agent'], hf_token)
        self.url_agent = LightweightAgent("URL Analyst", CONFIG['models']['url_agent'], hf_token)
        self.pattern_agent = LightweightAgent("Pattern Analyst", CONFIG['models']['pattern_agent'], hf_token)
        self.judge_agent = LightweightAgent("Final Judge", CONFIG['models']['judge_agent'], hf_token)

    def analyze_email(self, email_text: str) -> Dict[str, Any]:
        snippet = email_text[:800]

        email_data = email_analysis_tool(email_text)
        email_prompt = f"Email:\n{snippet}\nIndicators:{email_data}\nBrief phishing analysis:"
        email_analysis = self.email_agent.analyze(email_prompt)

        url_data = url_extraction_tool(email_text)
        url_prompt = f"Email:\n{snippet}\nURLs:{url_data}\nBrief URL phishing analysis:"
        url_analysis = self.url_agent.analyze(url_prompt)

        pattern_prompt = f"Email:\n{snippet}\nAnalyze for phishing patterns & spoofing:"
        pattern_analysis = self.pattern_agent.analyze(pattern_prompt)

        #final judgment 
        judge_prompt = f"""DECIDE PHISHING OR SAFE 
Email analysis: {email_analysis}
URL analysis: {url_analysis}
Pattern analysis: {pattern_analysis}
Respond EXACTLY:
DECISION: PHISHING or SAFE
CONFIDENCE: 0-100
REASON: one sentence"""
        
        judge_response = self.judge_agent.analyze(judge_prompt)
        return self._parse_response(judge_response)
    #response of judge needs to be parsed to extract decision, confidence and reasoning => this is classic llm output parsing
    def _parse_response(self, text: str) -> Tuple[str,float,str]:
        d = re.search(r"DECISION:\s*(PHISHING|SAFE)", text, re.IGNORECASE)
        c = re.search(r"CONFIDENCE:\s*(\d+)", text)
        r = re.search(r"REASON:\s*(.+?)(?:\n|$)", text)
        decision = "phishing_email" if d and d.group(1).upper()=="PHISHING" else "safe_email"
        confidence = int(c.group(1))/100.0 if c else 0.8
        reason = r.group(1).strip() if r else "No reasoning"
        return {'prediction': decision, 'confidence': confidence, 'reasoning': reason}

    def analyze_batch(self, emails: List[str]) -> List[Dict]:
        results = []
        for email in tqdm(emails, desc="Processing Emails"):
            results.append(self.analyze_email(email))
            time.sleep(0.5)  # small pause to reduce HF API overload
        return results



In [None]:
# DATA LOADING - reading datasets with optional balanced sampling - in our case always true
def load_dataset(filepath: str, sample_size: int = None, balanced: bool = True) -> pd.DataFrame:
    df = pd.read_csv(filepath)
    if sample_size and sample_size < len(df):
        if balanced:
            n_per_class = sample_size // df['label'].nunique()
            df = df.groupby('label', group_keys=False).apply(
                lambda x: x.sample(min(len(x), n_per_class), random_state=42)
            ).reset_index(drop=True)
        else:
            df = df.sample(sample_size, random_state=42).reset_index(drop=True)
    return df

# EVALUATOR - computes metrics and prints results => crucial for model assessment
class Evaluator:
    def __init__(self, detector: MultiAgentDetector):
        self.detector = detector

    def evaluate(self, df: pd.DataFrame, dataset_name: str) -> Dict:
        emails = df['message'].tolist()
        batch_results = self.detector.analyze_batch(emails)

        results = []
        for i, res in enumerate(batch_results):
            results.append({
                'true_label': df.iloc[i]['label'],
                'prediction': res['prediction'],
                'confidence': res['confidence'],
                'correct': res['prediction']==df.iloc[i]['label']
            })
        return self._compute_metrics(results, dataset_name)

    def _compute_metrics(self, results: List[Dict], dataset_name: str) -> Dict:
        correct = sum(r['correct'] for r in results)
        total = len(results)

        tp = sum(1 for r in results if r['true_label']=="phishing_email" and r['prediction']=="phishing_email")
        fp = sum(1 for r in results if r['true_label']=="safe_email" and r['prediction']=="phishing_email")
        fn = sum(1 for r in results if r['true_label']=="phishing_email" and r['prediction']=="safe_email")
        tn = sum(1 for r in results if r['true_label']=="safe_email" and r['prediction']=="safe_email")

        precision = tp/(tp+fp) if (tp+fp)>0 else 0
        recall = tp/(tp+fn) if (tp+fn)>0 else 0
        f1 = 2*precision*recall/(precision+recall) if (precision+recall)>0 else 0
        acc = correct/total if total>0 else 0

        print(f"\nDataset: {dataset_name} | Accuracy: {acc:.2%} | F1: {f1:.2%} | Total: {total}")
        return {
            'accuracy': acc,
            'f1_score': f1,
            'precision': precision,
            'recall': recall,
            'total': total,
            'correct': correct
        }

In [None]:

def main():
    hf_token = os.getenv("HF_TOKEN")
    if not hf_token:
        raise RuntimeError("HF_TOKEN env variable not set!")

    detector = MultiAgentDetector(hf_token)
    evaluator = Evaluator(detector)

    all_metrics = {}
    for dataset_name, path in CONFIG['datasets'].items():
        df = load_dataset(
            path,
            sample_size=CONFIG['sample_sizes'][dataset_name],
            balanced=CONFIG['balanced_sampling']
        )
        metrics = evaluator.evaluate(df, dataset_name)
        all_metrics[dataset_name] = metrics

    return all_metrics

if __name__=="__main__":
    results = main()