<a href="https://colab.research.google.com/github/swetha-006/AI_Projects/blob/main/Email_Classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
THREAT_KEYWORDS = {
    "credential_theft": [
        "verify your account", "confirm your identity", "account verification",
        "login required", "password expired", "reset your password",
        "account locked", "unusual sign-in", "security alert"
    ],

    "urgency_pressure": [
        "urgent", "immediate action required", "act now",
        "within 24 hours", "within 48 hours", "final notice",
        "permanently locked", "expires"
    ],

    "brand_impersonation": [
        "apple id", "icloud", "microsoft 365", "office 365",
        "paypal", "amazon", "google account"
    ],

    "advance_fee_fraud": [
        "dear beneficiary", "inheritance", "contract funds",
        "remittance notification", "funds transfer",
        "offshore account", "world bank", "imf",
        "million united states dollars",
        "payment file", "bank working days"
    ],

    "pii_harvesting": [
        "full name", "age", "bank account",
        "reconfirm your details", "provide us with the below details",
        "kindly reconfirm", "send copies of documents"
    ],

    "low_quality_language": [
        "dear customer", "for your safety",
        "kindly", "we therefore advice",
        "dubious officials"
    ],
    "transaction_verification": [
    "money transfer has been initiated",
    "verify your request",
    "review the specifics",
    "transaction pending",
    "unauthorized transaction",
    "payment approval required",
    "confirm transaction",
    "wire transfer",
    "fund transfer"
],

"security_vendor_impersonation": [
    "check point",
    "source: check point",
    "security notification",
    "security alert system",
    "threat prevention team"
],

"meeting_link_abuse": [
    "join with google meet",
    "meeting link",
    "calendar invite",
    "virtual meeting"
],
  "billing_fraud": [
    "payment declined",
    "billing address does not match",
    "update payment",
    "restore your service",
    "payment required",
    "service suspended",
    "account termination",
    "permanent termination of service"
]


}


# ==========================================================
# AGENTIC RAG EMAIL THREAT DETECTION (REAL-TIME VERSION)
# ==========================================================

import re
import hashlib
from datetime import datetime
from typing import Dict
import warnings
warnings.filterwarnings("ignore")

# ---------------- ML / NLP ----------------
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier

# ---------------- EMBEDDINGS ----------------
from sentence_transformers import SentenceTransformer

# ---------------- CONFIG ----------------
class Config:
    SAFE_THRESHOLD = 0.25
    SUSPICIOUS_THRESHOLD = 0.45   # ‚¨Ö LOWERED
    MALICIOUS_THRESHOLD = 0.70

    WEIGHTS = {
        "rule": 0.40,       # ‚¨Ö rule-based matters more in email
        "ml": 0.20,
        "rag": 0.20,
        "reputation": 0.20
    }


config = Config()

# ==========================================================
# PREPROCESSOR
# ==========================================================

class EmailPreprocessor:
    URL_REGEX = re.compile(r"https?://\S+|www\.\S+")
    EMAIL_REGEX = re.compile(r"[\w\.-]+@[\w\.-]+\.\w+")

    def extract(self, text: str) -> Dict:
        text_l = text.lower()
        emails = self.EMAIL_REGEX.findall(text_l)
        urls = self.URL_REGEX.findall(text_l)

        keyword_hits = 0
        matched = []

        for category, words in THREAT_KEYWORDS.items():
            for w in words:
                if w in text_l:
                    keyword_hits += 1
                    matched.append(w)

        urgency_flag = any(w in text_l for w in [
            "urgent", "immediately", "within 24 hours",
            "within 48 hours", "final notice", "expires"
        ])

        return {
            "raw": text,
            "lower": text_l,
            "emails": emails,
            "urls": urls,
            "keyword_hits": keyword_hits,
            "matched_keywords": matched,
            "urgency": int(urgency_flag)
        }


# ==========================================================
# DOMAIN & BRAND DETECTOR (CRITICAL FIX)
# ==========================================================

class DomainBrandDetector:
    LEGIT_BRANDS = {

    # ===============================
    # TECH / IDENTITY PROVIDERS
    # ===============================
    "microsoft": ["microsoft.com", "outlook.com", "office.com", "live.com"],
    "google": ["google.com", "gmail.com", "googlemail.com"],
    "apple": ["apple.com", "icloud.com"],
    "amazon": ["amazon.com", "amazon.in", "amazon.co.uk"],
    "facebook": ["facebook.com", "meta.com"],
    "instagram": ["instagram.com"],
    "linkedin": ["linkedin.com"],
    "twitter": ["twitter.com", "x.com"],

    # ===============================
    # FINANCIAL / PAYMENTS (VERY HIGH RISK)
    # ===============================
    "paypal": ["paypal.com"],
    "visa": ["visa.com"],
    "mastercard": ["mastercard.com"],
    "american express": ["americanexpress.com"],
    "stripe": ["stripe.com"],
    "square": ["squareup.com"],
    "western union": ["westernunion.com"],
    "moneygram": ["moneygram.com"],
    "world bank": ["worldbank.org"],
    "imf": ["imf.org"],

    # ===============================
    # BANKS (GLOBAL + INDIA)
    # ===============================
    "chase": ["chase.com"],
    "bank of america": ["bankofamerica.com"],
    "wells fargo": ["wellsfargo.com"],
    "citibank": ["citi.com"],
    "hsbc": ["hsbc.com"],
    "sbi": ["sbi.co.in"],
    "hdfc": ["hdfcbank.com"],
    "icici": ["icicibank.com"],
    "axis": ["axisbank.com"],

    # ===============================
    # ISPs / UTILITIES (BILLING PHISHING)
    # ===============================
    "xfinity": ["xfinity.com", "comcast.com"],
    "verizon": ["verizon.com"],
    "att": ["att.com"],
    "vodafone": ["vodafone.com"],
    "jio": ["jio.com"],
    "airtel": ["airtel.in"],

    # ===============================
    # SHIPPING / LOGISTICS
    # ===============================
    "fedex": ["fedex.com"],
    "ups": ["ups.com"],
    "dhl": ["dhl.com"],
    "usps": ["usps.com"],
    "india post": ["indiapost.gov.in"],

    # ===============================
    # GOVERNMENT / OFFICIAL (EXTREME RISK)
    # ===============================
    "irs": ["irs.gov"],
    "hmrc": ["gov.uk"],
    "income tax": ["incometax.gov.in"],
    "uidai": ["uidai.gov.in"],
    "passport office": ["passportindia.gov.in"]
}

    def analyze(self, features: Dict) -> Dict:
        risk = 0.0
        evidences = []

        text = features["lower"]

        # ==========================
        # 1Ô∏è‚É£ Brand impersonation (even WITHOUT email)
        # ==========================
        for brand, legit_domains in self.LEGIT_BRANDS.items():
            if brand in text:
                risk = max(risk, 0.6)
                evidences.append(f"Brand mentioned: {brand}")

                # Email-domain mismatch (if present)
                if features["emails"]:
                    domain = features["emails"][0].split("@")[-1]
                    if domain not in legit_domains:
                        risk = max(risk, 0.95)
                        evidences.append(f"Brand impersonation: {brand}")
                        evidences.append(f"Suspicious sender domain: {domain}")

        # ==========================
        # 2Ô∏è‚É£ Legit service abused for phishing (BEC pattern)
        # ==========================
        legit_services = ["google meet", "calendar invite", "teams meeting"]
        danger_triggers = ["verify", "money transfer", "payment", "urgent", "approval"]

        if any(svc in text for svc in legit_services):
            if any(trg in text for trg in danger_triggers):
                risk = max(risk, 0.85)
                evidences.append("Legitimate service abused for phishing")

        # ==========================
        # 3Ô∏è‚É£ Financial authority impersonation
        # ==========================
        if any(k in text for k in ["world bank", "imf", "remittance", "fund transfer"]):
            risk = max(risk, 0.9)
            evidences.append("Financial authority impersonation")

        return {
            "risk_score": round(risk, 3),
            "confidence": 0.95 if risk >= 0.8 else 0.6,
            "evidences": evidences
        }


# ==========================================================
# RULE-BASED AGENT
# ==========================================================

class RuleAgent:
    def analyze(self, features: Dict) -> Dict:
        score = 0.0

        # Keyword density
        if features["keyword_hits"] >= 6:
            score += 0.6
        elif features["keyword_hits"] >= 3:
            score += 0.4

        # Urgency pressure
        if features["urgency"]:
            score += 0.25

        # URL presence
        if features["urls"]:
            score += 0.2

        # Financial transaction panic
        if "money transfer" in features["lower"]:
            score += 0.6

        if "verify your request" in features["lower"]:
            score += 0.5

        # Meeting link abuse (BEC indicator)
        if "google meet" in features["lower"] and "verify" in features["lower"]:
            score += 0.4

        # Fake security authority
        if "check point" in features["lower"]:
            score += 0.5

        # High-risk phrases
        high_risk_phrases = [
            "verify now",
            "reset your password",
            "account will be permanently locked",
            "confirm your identity"
        ]

        for phrase in high_risk_phrases:
            if phrase in features["lower"]:
                score += 0.3

        return {
            "risk_score": min(score, 1.0),
            "confidence": min(0.9, 0.5 + score)
        }


# ==========================================================
# ML AGENT (LIGHTWEIGHT, REALISTIC)
# ==========================================================

class MLAgent:
    def __init__(self):
        self.vec = TfidfVectorizer(stop_words="english", max_features=1000)
        self.model = RandomForestClassifier(n_estimators=100, random_state=42)
        self._train()

    def _train(self):
        texts = [
            "urgent password reset click link",
            "verify account immediately",
            "invoice attached download",
            "team meeting tomorrow",
            "project update shared",
            "security alert microsoft account"
        ]
        labels = [1,1,1,0,0,1]
        X = self.vec.fit_transform(texts)
        self.model.fit(X, labels)

    def analyze(self, text: str) -> Dict:
        X = self.vec.transform([text])
        prob = self.model.predict_proba(X)[0][1]
        return {"risk_score": prob, "confidence": abs(prob-0.5)*2}

# ==========================================================
# RAG AGENT (SEMANTIC MATCH)
# ==========================================================

class RAGAgent:
    def __init__(self):
        self.embedder = SentenceTransformer("all-MiniLM-L6-v2")
        self.threats = [
            "phishing emails impersonate Microsoft password expiry",
            "urgent account suspension scams",
            "credential harvesting email attacks"
        ]
        self.embeddings = self.embedder.encode(self.threats, normalize_embeddings=True)

    def analyze(self, text: str) -> Dict:
        q = self.embedder.encode([text], normalize_embeddings=True)
        score = float((q @ self.embeddings.T).max())
        return {"risk_score": score, "confidence": score}

# ==========================================================
# ORCHESTRATOR
# ==========================================================

class EmailThreatClassifier:
    def __init__(self):
        self.pre = EmailPreprocessor()
        self.rule = RuleAgent()
        self.ml = MLAgent()
        self.rag = RAGAgent()
        self.domain = DomainBrandDetector()

    def classify(self, email: str) -> Dict:
        f = self.pre.extract(email)

        r1 = self.rule.analyze(f)
        r2 = self.ml.analyze(email)
        r3 = self.rag.analyze(email)
        r4 = self.domain.analyze(f)

        if r4.get("hard_block"):
          return {
              "classification": "MALICIOUS",
              "risk_score": 0.99,
              "timestamp": datetime.now().isoformat(),
              "email_id": hashlib.sha256(email.encode()).hexdigest()[:12],
              "evidence": r4.get("evidences", [])
          }

        final_score = (
            r1["risk_score"] * config.WEIGHTS["rule"] +
            r2["risk_score"] * config.WEIGHTS["ml"] +
            r3["risk_score"] * config.WEIGHTS["rag"] +
            r4["risk_score"] * config.WEIGHTS["reputation"]
        )

        # Initialize evidences with those from the domain detector
        current_evidences = r4.get("evidences", [])

        # Legit service used in suspicious context
        if any(svc in f["lower"] for svc in ["google meet", "calendar invite"]):
          if any(k in f["lower"] for k in ["verify", "money transfer", "urgent"]):
              # Update final_score if this condition is met
              final_score = max(final_score, 0.85)
              current_evidences.append("Legitimate service abused for phishing")

        # Determine label AFTER potential final_score update
        if final_score >= config.MALICIOUS_THRESHOLD:
            label = "MALICIOUS"
        elif final_score >= config.SUSPICIOUS_THRESHOLD:
            label = "SUSPICIOUS"
        else:
            label = "SAFE"


        return {
          "classification": label,
          "risk_score": round(final_score, 3),
          "timestamp": datetime.now().isoformat(),
          "email_id": hashlib.sha256(email.encode()).hexdigest()[:12],
          "matched_keywords": f["matched_keywords"][:8],
          "evidence": current_evidences # Use the potentially updated evidences list
        }


# ==========================================================
# INTERACTIVE REAL-TIME CLASSIFIER
# ==========================================================

classifier = EmailThreatClassifier()

print("‚úÖ Real-time Email Threat Classifier Ready")
print("Type email content. Type 'exit' to quit.\n")

while True:
    text = input("üìß Email > ")
    if text.lower() == "exit":
        break
    result = classifier.classify(text)
    print("\nüîç RESULT")
    print("Classification:", result["classification"])
    print("Risk Score:", result["risk_score"])
    if result["evidence"]:
        print("Evidence:", ", ".join(result["evidence"]))
    print("-"*50)

Loading weights:   0%|          | 0/103 [00:00<?, ?it/s]

BertModel LOAD REPORT from: sentence-transformers/all-MiniLM-L6-v2
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.


‚úÖ Real-time Email Threat Classifier Ready
Type email content. Type 'exit' to quit.

üìß Email > S  O service@paypal.com <service@paypal.com> To: Bilingdepartments1@gkjyryfjy876.onmicrosoft.com  ALERT: This message originated outside of the Fortinet network. BE CAUTIOUS before clicking any link or opening an attachment.  Hello, Bllingdepartments1@gkjyrytiy876. onmicrosoft. com  Tuesday, December 3, 2024 at 9:51AM  A small reminder from Brian Oistad  Payment request details  Amount requested $2,185.96 USD  Note from Brian Oistad: Don't recognize the seller? Please contact Pay (888) 632-2513 (Toll Free). If you have any iss bin%2P3Fcmd%3D_prq626id%3D5KxkddVVQGooiL52V15- KjUda8Z5UR.0.PSOPw%22%7D7D&fiowContextData=mv8t-5G-zgC- 2911 (Toll Free). If you do not reach out, we wi  Transaction ID U-3GB115879W233070N  Transaction date December 3, 2024  https://www.paypal.com/signin/? returnUri=%2Fmyaccount62Ftransfer2FpayRequest62FU-38S42484X966233392FU-3GB 116879W233070N3FclassicUr302FUS%2Fcg-