In [1]:
# -*- coding: utf-8 -*-
import os
import re
import ipaddress
from urllib.parse import urlparse
import warnings
warnings.filterwarnings('ignore', category=UserWarning, module='sklearn')

import time
from datetime import datetime
import joblib
import numpy as np
import pandas as pd

# === RUTAS / ARCHIVOS ===
rutaBase = '/003_xgboost_phishing_count_vector_9000/'  # carpeta que contiene tus pkl
input_csv = 'correos_normalizados.csv'                              # CSV de entrada con columnas: subject, body
output_pred_csv = 'predicciones.csv'                   # CSV con resultados (subject, body, label, probabilidad)
output_time_csv = 'tiempos.csv'                        # CSV con tiempos agregados (1 fila)

# === NOMBRES DE COLUMNAS EN TU CSV ===
SUBJECT_COL = 'subject'
BODY_COL = 'body'


In [2]:
def leer_modelos():
    try:
        model = joblib.load('model.pkl')
        vectorizer = joblib.load('vectorizer.pkl')
        scaler = joblib.load('scaler.pkl')
        return model, vectorizer, scaler
    except Exception as e:
        raise RuntimeError(f"Fallo al cargar artefactos: {e}")

model, vectorizer, scaler = leer_modelos()
print("✅ Modelos cargados correctamente.")


✅ Modelos cargados correctamente.


In [3]:
PHISHING_KEYWORDS = {
    'english': {
        'urgency': ['urgent', 'expire', 'suspend', 'immediate', 'limited time', 'act now', 
                    'hurry', 'deadline', 'critical', 'important notice'],
        'action': ['click here', 'verify', 'confirm', 'update', 'validate', 'secure',
                   'restore', 'unlock', 'activate', 'claim'],
        'money': ['account', 'payment', 'billing', 'credit', 'debit', 'bank', 'paypal',
                  'refund', 'prize', 'winner', 'lottery', 'tax'],
        'threat': ['suspended', 'blocked', 'restricted', 'locked', 'illegal', 'unauthorized',
                   'breach', 'compromised', 'violation', 'terminate']
    },
    'spanish': {
        'urgency': ['urgente', 'expira', 'suspender', 'inmediato', 'tiempo limitado', 
                    'actúa ahora', 'apresúrate', 'fecha límite', 'crítico', 'aviso importante'],
        'action': ['haga clic aquí', 'verificar', 'confirmar', 'actualizar', 'validar',
                   'asegurar', 'restaurar', 'desbloquear', 'activar', 'reclamar'],
        'money': ['cuenta', 'pago', 'facturación', 'crédito', 'débito', 'banco', 'paypal',
                  'reembolso', 'premio', 'ganador', 'lotería', 'impuesto'],
        'threat': ['suspendida', 'bloqueada', 'restringida', 'bloqueado', 'ilegal',
                   'no autorizado', 'violación', 'comprometido', 'infracción', 'terminar']
    }
}

def detect_language(text: str) -> str:
    spanish_words = {'el', 'la', 'de', 'que', 'y', 'en', 'un', 'para'}
    english_words = {'the', 'be', 'to', 'of', 'and', 'a', 'in', 'that'}
    words = (text or "").lower().split()[:50]
    if sum(w in spanish_words for w in words) > sum(w in english_words for w in words):
        return 'spanish'
    return 'english'

def phishing_preprocessing(text: str) -> str:
    if pd.isna(text):
        return ""
    text = str(text)

    # Preservar URLs/emails temporalmente
    url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
    urls = re.findall(url_pattern, text)
    for i, url in enumerate(urls):
        text = text.replace(url, f" URL{i} ")

    email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b'
    emails = re.findall(email_pattern, text)
    for i, em in enumerate(emails):
        text = text.replace(em, f" EMAIL{i} ")

    text = text.lower()

    # Restaurar
    for i, url in enumerate(urls):
        text = text.replace(f"url{i}", url.lower())
    for i, em in enumerate(emails):
        text = text.replace(f"email{i}", em.lower())

    text = re.sub(r'_{3,}', ' ', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def extract_url_features(text: str) -> dict:
    feats = {
        'url_count': 0,
        'shortened_url': 0,
        'has_ip': 0,
        'suspicious_domain': 0,  # placeholder simple
        'long_url': 0,
        'has_at_symbol': 0,
        'multiple_subdomains': 0,
        'https_count': 0,
        'http_count': 0
    }
    u_pat = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
    urls = re.findall(u_pat, (text or "").lower())
    feats['url_count'] = len(urls)

    shorteners = ['bit.ly', 'tinyurl', 'goo.gl', 'ow.ly', 'short.link', 't.co']
    suspicious_tlds = {'.ru', '.tk', '.cn'}  # heurística opcional

    for url in urls:
        if any(s in url for s in shorteners):
            feats['shortened_url'] += 1
        try:
            parsed = urlparse(url)
            if parsed.hostname:
                try:
                    ipaddress.ip_address(parsed.hostname)
                    feats['has_ip'] += 1
                except ValueError:
                    pass
                # Heurística muy básica: TLD sospechoso o muchos guiones
                host = parsed.hostname
                if any(host.endswith(tld) for tld in suspicious_tlds) or host.count('-') >= 3:
                    feats['suspicious_domain'] += 1
        except Exception:
            pass
        if len(url) > 75:
            feats['long_url'] += 1
        if '@' in url:
            feats['has_at_symbol'] += 1
        if url.count('.') > 3:
            feats['multiple_subdomains'] += 1
        if url.startswith('https'):
            feats['https_count'] += 1
        elif url.startswith('http:'):
            feats['http_count'] += 1
    return feats

def extract_phishing_features(text: str, language='english') -> dict:
    feats = {}
    tlower = (text or "").lower()
    kw = PHISHING_KEYWORDS.get(language, PHISHING_KEYWORDS['english'])
    for category, words in kw.items():
        feats[f'{category}_words'] = sum(w in tlower for w in words)
    feats['exclamation_count'] = tlower.count('!')
    feats['question_count'] = tlower.count('?')
    feats['uppercase_words'] = len(re.findall(r'\b[A-Z]{2,}\b', text or ""))
    feats['special_chars'] = len(re.findall(r'[!@#$%^&*(),.?":{}|<>]', text or ""))

    email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b'
    emails = re.findall(email_pattern, text or "")
    feats['email_count'] = len(emails)

    legitimate_domains = [
        'paypal.com','amazon.com','google.com','microsoft.com','apple.com',
        'facebook.com','chase.com','netflix.com'
    ]
    suspicious_email = 0
    for em in emails:
        domain = em.split('@')[-1].lower()
        for legit in legitimate_domains:
            if domain != legit and (
                domain.replace('0','o') == legit or
                domain.replace('1','l') == legit or
                legit.split('.')[0] in domain and domain != legit
            ):
                suspicious_email += 1
                break
    feats['suspicious_email'] = suspicious_email
    return feats

FEATURE_ORDER = [
    'url_count','shortened_url','has_ip','suspicious_domain','long_url','has_at_symbol',
    'multiple_subdomains','https_count','http_count','urgency_words','action_words',
    'money_words','threat_words','exclamation_count','question_count','uppercase_words',
    'special_chars','email_count','suspicious_email'
]


In [4]:
def preparar_batch_features(texts: list[str]) -> tuple:
    """
    Devuelve:
      - text_matrix: matriz densa/CSR de vectorizer.transform
      - extra_scaled: matriz numpy con features extras escaladas en el mismo orden que FEATURE_ORDER
    """
    # Preproceso para vectorizer
    preprocessed = [phishing_preprocessing(t) for t in texts]
    text_matrix = vectorizer.transform(preprocessed)

    # Extras (no preprocesados, usan el texto "original")
    lang_list = [detect_language(t or "") for t in texts]
    extras = []
    for t, lang in zip(texts, lang_list):
        u = extract_url_features(t or "")
        p = extract_phishing_features(t or "", language=lang)
        allf = {**u, **p}
        extras.append([allf.get(name, 0) for name in FEATURE_ORDER])

    extras_df = pd.DataFrame(extras, columns=FEATURE_ORDER)
    extra_scaled = scaler.transform(extras_df)
    return text_matrix, extra_scaled

def predecir_batch(texts: list[str]) -> tuple[np.ndarray, np.ndarray]:
    """
    Retorna:
      - labels: array de 0/1
      - probs: probabilidad clase 1 (phishing) como float
    """
    X_text, X_extra = preparar_batch_features(texts)
    if hasattr(X_text, 'toarray'):
        X_comb = np.hstack([X_text.toarray(), X_extra])
    else:
        X_comb = np.hstack([X_text, X_extra])
    labels = model.predict(X_comb)
    if hasattr(model, "predict_proba"):
        probs = model.predict_proba(X_comb)[:, 1]
    else:
        # Si no hay predict_proba, usar decisión calibrada simple (fallback)
        if hasattr(model, "decision_function"):
            df = model.decision_function(X_comb)
            # escalado sigmoid
            probs = 1 / (1 + np.exp(-df))
        else:
            probs = labels.astype(float)
    return labels, probs


In [5]:
# === Cargar el CSV de entrada ===
df = pd.read_csv(input_csv)
if SUBJECT_COL not in df.columns or BODY_COL not in df.columns:
    raise ValueError(f"El CSV debe contener las columnas '{SUBJECT_COL}' y '{BODY_COL}'.")

# Ensamblar texto final a evaluar
subjects = df[SUBJECT_COL].fillna("").astype(str)
bodies = df[BODY_COL].fillna("").astype(str)
texts = (subjects + " " + bodies).tolist()

# === Medir tiempo total de predicciones ===
t0 = time.perf_counter()
labels, probs = predecir_batch(texts)
t1 = time.perf_counter()

# === Armar salida de predicciones ===
out = df[[SUBJECT_COL, BODY_COL]].copy()
out['label'] = (labels.astype(int))           # 1=malicioso, 0=legítimo
out['probabilidad'] = probs
out.to_csv(output_pred_csv, index=False, encoding='utf-8')
print(f"✅ Archivo de predicciones guardado en: {output_pred_csv}")

# === CSV de tiempos (agregado) ===
n = len(texts)
elapsed = t1 - t0
avg = (elapsed / n) if n else 0.0
time_row = {
    'fecha': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
    'n_correos': n,
    'segundos_totales': round(elapsed, 6),
    'segundos_promedio_por_correo': round(avg, 6),
    'modelo_path': rutaBase
}
pd.DataFrame([time_row]).to_csv(output_time_csv, index=False, encoding='utf-8')
print(f"⏱️  Archivo de tiempos guardado en: {output_time_csv}")
print(f"→ Segundos totales: {elapsed:.4f}  |  Promedio por correo: {avg:.6f}")


✅ Archivo de predicciones guardado en: predicciones.csv
⏱️  Archivo de tiempos guardado en: tiempos.csv
→ Segundos totales: 2.7532  |  Promedio por correo: 0.001335
