#ZeroScam



## Instalo librerias

In [None]:
!pip install chromadb fastapi pyngrok uvicorn
!pip install datasets trl
!pip uninstall -y bitsandbytes
!pip install --upgrade bitsandbytes
!pip install --upgrade transformers accelerate
!nvidia-smi
!pip install accelerate
!accelerate config
!pip install flash-attn --no-build-isolation
!pip install pytesseract
!pip install --upgrade python-telegram-bot
!pip install dotenv
!apt-get update
!apt-get install -y tesseract-ocr
!pip install pytesseract
!tesseract -v

In [None]:
import os
import json
import re
import sys
import threading
import logging
import requests
from dotenv import load_dotenv

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from sentence_transformers import SentenceTransformer
from bitsandbytes import BitsAndBytesConfig

from fastapi import FastAPI
import uvicorn
from pyngrok import ngrok

import pytesseract
from PIL import Image, ImageEnhance
import spacy

import nest_asyncio
nest_asyncio.apply()

In [None]:
# CONFIGURACIÓN: Montar Google Drive y Variables
drive.mount('/content/drive')
BASE_DIR = "/content/drive/My Drive/Trabajo Final Bootcamp"
NORMATIVA_DIR = os.path.join(BASE_DIR, "normativa")
EMBEDDINGS_BACKUP_PATH = os.path.join(BASE_DIR, "embeddings.json")

if not os.path.exists(NORMATIVA_DIR):
    raise FileNotFoundError(f"La carpeta {NORMATIVA_DIR} no existe. Verifica la ruta o crea la carpeta.")

In [None]:
# Login en Hugging Face
notebook_login()

In [None]:
# CARGA DE MODELOS Y CONFIGURACIÓN DE DISPOSITIVO
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"✅ Usando dispositivo: {device}")

model_name = "CasiAC/deepseek-r1-8b-ciberseguridad"
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4"
)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config,
    device_map="auto",
    trust_remote_code=True
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device=device)
print("✅ Modelo cargado 🚀")

In [None]:
# CONFIGURACIÓN DE CHROMADB Y FASTAPI
app = FastAPI()

# Configuración de ngrok para exponer el puerto 8000
ngrok.set_auth_token("2sN2ljFFRN4UpJk7VPL6jPiHVJL_6FafRKrvugJysTGGRV1KB")
public_url = ngrok.connect(8000)
print(f"🔗 ChromaDB API accesible en: {public_url}")

client = chromadb.Client()
collection = client.create_collection(
    name="test",
    metadata={"hnsw:search_ef": 100, "hnsw:construction_ef": 1000}
)

@app.get("/")
def read_root():
    return {"message": "Chroma API está en funcionamiento"}

@app.get("/collections")
def get_collections():
    return {"collections": client.list_collections()}

@app.get("/collections/{collection_name}")
def get_collection(collection_name: str):
    return {"collection": client.get_collection(name=collection_name)}

@app.post("/collections/{collection_name}/add")
def add_to_collection(collection_name: str, item: dict):
    coll = client.get_collection(name=collection_name)
    coll.add(
        documents=[item["document"]],
        metadatas=[item.get("metadata", {})],
        ids=[item.get("id", "default_id")]
    )
    return {"message": f"Elemento añadido a la colección {collection_name}"}

@app.get("/health")
def health_check():
    return {"status": "OK"}

def start_server():
    uvicorn.run(app, host="0.0.0.0", port=8000)

server_thread = threading.Thread(target=start_server, daemon=True)
server_thread.start()

In [None]:
# FUNCIONES DE INDEXACIÓN Y GENERACIÓN DE CONTEXTO (RAG)
def cargar_documentos_y_embeddings():
    embeddings_dict = {}
    for archivo in os.listdir(NORMATIVA_DIR):
        ruta_json = os.path.join(NORMATIVA_DIR, archivo)
        with open(ruta_json, "r", encoding="utf-8") as f:
            documentos = json.load(f)

        for section in documentos.get("sections", []):
            doc_id = f"{archivo}_p{section['page']}"
            content = section["content"]
            embedding = embedding_model.encode(content, convert_to_tensor=True).cpu().numpy().tolist()
            embeddings_dict[doc_id] = embedding

            collection.add(
                documents=[content],
                metadatas=[{"title": documentos.get("title", "Desconocido"), "page": section["page"]}],
                embeddings=[embedding],
                ids=[doc_id]
            )

    with open(EMBEDDINGS_BACKUP_PATH, "w", encoding="utf-8") as f:
        json.dump(embeddings_dict, f, ensure_ascii=False, indent=4)
    print(f"✅ {len(embeddings_dict)} documentos indexados y guardados en ChromaDB 🚀")
    return embeddings_dict

embeddings_dict = cargar_documentos_y_embeddings()

def obtener_contexto(pregunta, n_docs=3):
    embedding_pregunta = embedding_model.encode([pregunta], convert_to_tensor=True).cpu().numpy().tolist()
    resultados = collection.query(query_embeddings=embedding_pregunta, n_results=n_docs)
    documents = resultados.get('documents', [])
    if not documents:
        return "No se encontraron documentos relevantes."

    documentos_convertidos = [
        " ".join(map(str, doc)) if isinstance(doc, list) else str(doc)
        for doc in documents
    ]
    return "\n".join(documentos_convertidos)

def generar_respuesta_rag(pregunta, max_tokens=300, temperatura=0.1):
    contexto = obtener_contexto(pregunta)
    entrada = f"Contexto: {contexto}\nPregunta: {pregunta}\nRespuesta:"
    inputs = tokenizer(entrada, return_tensors="pt").to(device)
    output = model.generate(
        **inputs,
        max_new_tokens=max_tokens,
        temperature=temperatura,
        top_p=0.9,
        repetition_penalty=1.05
    )
    return tokenizer.decode(output[0], skip_special_tokens=True)

In [None]:
# FUNCIONES DE OCR Y CONSULTA A VIRUSTOTAL
pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract'
nlp = spacy.load("en_core_web_sm")

API_KEY = "06858db9f480b4aba21a5831457a9b919b1f9014e6f8872ee1f4f7d1a029197c"
HEADERS = {"x-apikey": API_KEY}

def preprocesar_imagen(imagen):
    """Convierte la imagen a escala de grises y mejora el contraste."""
    try:
        image = Image.open(imagen).convert("L")
        enhancer = ImageEnhance.Contrast(image)
        return enhancer.enhance(2.0)
    except Exception as e:
        return None

def extraer_texto_img(imagen):
    """Extrae y limpia el texto de una imagen."""
    image = preprocesar_imagen(imagen)
    if image is None:
        return "Error al procesar la imagen"
    texto_extraido = pytesseract.image_to_string(image)
    return limpiar_texto(texto_extraido)

def consultar_ip(ip):
    url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip}"
    response = requests.get(url, headers=HEADERS)
    if response.status_code == 200:
        data = response.json()
        stats = data["data"]["attributes"]["last_analysis_stats"]
        malicious = stats.get("malicious", 0)
        if malicious > 0:
            veredicto = f"❌ La IP {ip} ha sido reportada como maliciosa en {malicious} análisis."
        else:
            veredicto = f"✅ La IP {ip} parece segura."
        return {"IP": ip, "Veredicto": veredicto, "Análisis": stats}
    return {"error": f"Error en la consulta: {response.status_code}"}

def consultar_url(url):
    scan_url = "https://www.virustotal.com/api/v3/urls"
    response = requests.post(scan_url, headers=HEADERS, data={"url": url})
    if response.status_code == 200:
        analysis_id = response.json()["data"]["id"]
        result_url = f"https://www.virustotal.com/api/v3/analyses/{analysis_id}"
        result_response = requests.get(result_url, headers=HEADERS)
        if result_response.status_code == 200:
            data = result_response.json()
            stats = data["data"]["attributes"]["stats"]
            malicious = stats.get("malicious", 0)
            if malicious > 0:
                veredicto = f"❌ La URL {url} ha sido marcada como maliciosa en {malicious} análisis."
            else:
                veredicto = f"✅ La URL {url} parece segura."
            return {"URL": url, "Veredicto": veredicto, "Análisis": stats}
    return {"error": f"Error en la consulta: {response.status_code}"}

def limpiar_texto(texto):
    texto = texto.strip()
    texto = re.sub(r'\s+', ' ', texto)
    return texto.replace("\n", " ").replace("\r", "")

def analizar_con_modelo(texto_extraido):
    """Analiza el mensaje para detectar señales de phishing utilizando el modelo.
       Responde siempre en español.
    """
    texto_limpio = limpiar_texto(texto_extraido)
    system_prompt = """
      Eres un asistente altamente especializado en ciberseguridad. Tu tarea principal es analizar mensajes y detectar intentos de phishing con precisión.
      🔹 REGLAS ESTRICTAS:
      1. No inventes información. Basa tu respuesta ÚNICAMENTE en el texto proporcionado.
      2. Sé conciso y preciso.
      3. La respuesta debe comenzar con "Phishing" o "Not Phishing", seguido de una breve explicación.
      4. Responde siempre en español.
    """
    prompt_modelo = f"""
      Analiza el siguiente mensaje y determina si se trata de un intento de phishing.
      MENSAJE A EVALUAR:
      {texto_extraido}
    """
    inputs = tokenizer(system_prompt + prompt_modelo, return_tensors="pt", padding=True, truncation=True).to(device)
    outputs = model.generate(
        inputs.input_ids,
        attention_mask=inputs.attention_mask,
        max_new_tokens=300,
        temperature=0.15,
        do_sample=True,
        top_k=10,
        top_p=0.7,
        repetition_penalty=1.2,
        pad_token_id=tokenizer.eos_token_id,
    )
    response = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
    return response

def analizar_prompt(prompt):
    """
    Detecta si el prompt contiene:
      - Una imagen (por extensión) para procesar OCR y análisis de phishing.
      - Una IP o URL para consulta en VirusTotal.
    Devuelve un resultado especial (no None) si se cumple alguno de estos casos.
    """
    if isinstance(prompt, str) and prompt.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.heic')):
        print(f"🔍 Detectada imagen: {prompt}")
        texto_extraido = extraer_texto_img(prompt)
        print(f"Texto extraído: {texto_extraido}")
        return analizar_con_modelo(texto_extraido)

    ip_pattern = r"\b(?:\d{1,3}\.){3}\d{1,3}\b"
    url_pattern = r'(https?://[^\s]+|www\.[^\s]+)'
    if re.search(ip_pattern, prompt):
        ip = re.search(ip_pattern, prompt).group()
        print(f"🔍 Detectada IP: {ip}")
        return consultar_ip(ip)
    if re.search(url_pattern, prompt):
        url = re.search(url_pattern, prompt).group()
        print(f"🔍 Detectada URL: {url}")
        return consultar_url(url)
    return None

In [None]:
# FUNCIÓN UNIFICADA DE GENERACIÓN DE RESPUESTA
def generate_response(prompt):
    """
    Función unificada que:
      1. Verifica si el prompt es especial (imagen, IP, URL) mediante analizar_prompt().
      2. Si no es especial, utiliza la generación basada en contexto (RAG).
      3. Y si no se encuentra relación en el contexto (por ejemplo, se obtiene "No se encontraron documentos relevantes"),
         se usa una generación “por defecto”.
    """
    # Paso 1: Verificar si el prompt es especial
    resultado_api = analizar_prompt(prompt)
    if resultado_api is not None:
        return json.dumps(resultado_api, indent=4, ensure_ascii=False)

    # Paso 2: Consultar contexto en ChromaDB para RAG
    contexto = obtener_contexto(prompt)
    if contexto.strip().lower().startswith("no se encontraron documentos"):
        # Paso 3: Generación por defecto si no hay contexto relevante
        inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)
        outputs = model.generate(
            inputs.input_ids,
            attention_mask=inputs.attention_mask,
            max_new_tokens=200,
            temperature=0.2,
            do_sample=True,
            top_k=50,
            top_p=0.9,
            repetition_penalty=1.5,
            pad_token_id=tokenizer.eos_token_id,
        )
        return tokenizer.decode(outputs[0], skip_special_tokens=True)
    else:
        # Si se encontró contexto, usar generación basada en RAG
        return generar_respuesta_rag(prompt)


In [None]:
# PRUEBA DE LA FUNCIÓN UNIFICADA
# ---------------------------------
prompt_usuario = 'image6.png'
respuesta_usuario = generate_response(prompt_usuario)
print(f"\n🔹 Respuesta:\n{respuesta_usuario}")

In [None]:
# CONFIGURACIÓN DEL BOT DE TELEGRAM
# ---------------------------------
load_dotenv()

logging.basicConfig(
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    level=logging.INFO,
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)

from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext

async def start(update: Update, context: CallbackContext) -> None:
    await update.message.reply_text("¡Hola! Soy tu especialista en ciberseguridad. ¿En qué te puedo ayudar?")

async def handle_message(update: Update, context: CallbackContext) -> None:
    user_input = update.message.text
    username = update.message.from_user.username
    logger.info(f"Mensaje recibido de {username}: {user_input}")
    response = generate_response(user_input)
    await update.message.reply_text(response)

def main():
    TELEGRAM_TOKEN = "7047664203:AAEa-JEcZQpv-tDCIdV6ZE_odp4lPTH0Bd8"
    if not TELEGRAM_TOKEN:
        logger.error("El token de Telegram no está configurado.")
        sys.exit(1)

    app_telegram = Application.builder().token(TELEGRAM_TOKEN).build()
    app_telegram.add_handler(CommandHandler("start", start))
    app_telegram.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
    logger.info("Bot iniciado y ejecutándose...")
    app_telegram.run_polling()

if __name__ == "__main__":
    main()

In [None]:
import os
import json
import re
import sys
import threading
import logging
import requests
from dotenv import load_dotenv

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from sentence_transformers import SentenceTransformer
from bitsandbytes import BitsAndBytesConfig

from fastapi import FastAPI
import uvicorn
from pyngrok import ngrok

import pytesseract
from PIL import Image, ImageEnhance
import spacy

import nest_asyncio
nest_asyncio.apply()

# ---------------------------------
# CONFIGURACIÓN: Montar Google Drive y Variables
# ---------------------------------
drive.mount('/content/drive')
BASE_DIR = "/content/drive/My Drive/Trabajo Final Bootcamp"
NORMATIVA_DIR = os.path.join(BASE_DIR, "normativa")
EMBEDDINGS_BACKUP_PATH = os.path.join(BASE_DIR, "embeddings.json")

if not os.path.exists(NORMATIVA_DIR):
    raise FileNotFoundError(f"La carpeta {NORMATIVA_DIR} no existe. Verifica la ruta o crea la carpeta.")

notebook_login()  # Login en Hugging Face (ejecutar solo una vez)

# ---------------------------------
# CARGA DE MODELOS Y CONFIGURACIÓN DE DISPOSITIVO
# ---------------------------------
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"✅ Usando dispositivo: {device}")

model_name = "CasiAC/deepseek-r1-8b-ciberseguridad"
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4"
)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config,
    device_map="auto",
    trust_remote_code=True
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device=device)
print("✅ Modelo cargado 🚀")

# ---------------------------------
# CONFIGURACIÓN DE CHROMADB Y FASTAPI
# ---------------------------------
app = FastAPI()

# Configuración de ngrok para exponer el puerto 8000
ngrok.set_auth_token("2sN2ljFFRN4UpJk7VPL6jPiHVJL_6FafRKrvugJysTGGRV1KB")
public_url = ngrok.connect(8000)
print(f"🔗 ChromaDB API accesible en: {public_url}")

client = chromadb.Client()
collection = client.create_collection(
    name="test",
    metadata={"hnsw:search_ef": 100, "hnsw:construction_ef": 1000}
)

@app.get("/")
def read_root():
    return {"message": "Chroma API está en funcionamiento"}

@app.get("/collections")
def get_collections():
    return {"collections": client.list_collections()}

@app.get("/collections/{collection_name}")
def get_collection(collection_name: str):
    return {"collection": client.get_collection(name=collection_name)}

@app.post("/collections/{collection_name}/add")
def add_to_collection(collection_name: str, item: dict):
    coll = client.get_collection(name=collection_name)
    coll.add(
        documents=[item["document"]],
        metadatas=[item.get("metadata", {})],
        ids=[item.get("id", "default_id")]
    )
    return {"message": f"Elemento añadido a la colección {collection_name}"}

@app.get("/health")
def health_check():
    return {"status": "OK"}

def start_server():
    uvicorn.run(app, host="0.0.0.0", port=8000)

server_thread = threading.Thread(target=start_server, daemon=True)
server_thread.start()

# ---------------------------------
# FUNCIONES DE INDEXACIÓN Y GENERACIÓN DE CONTEXTO (RAG)
# ---------------------------------
def cargar_documentos_y_embeddings():
    embeddings_dict = {}
    for archivo in os.listdir(NORMATIVA_DIR):
        ruta_json = os.path.join(NORMATIVA_DIR, archivo)
        with open(ruta_json, "r", encoding="utf-8") as f:
            documentos = json.load(f)

        for section in documentos.get("sections", []):
            doc_id = f"{archivo}_p{section['page']}"
            content = section["content"]
            embedding = embedding_model.encode(content, convert_to_tensor=True).cpu().numpy().tolist()
            embeddings_dict[doc_id] = embedding

            collection.add(
                documents=[content],
                metadatas=[{"title": documentos.get("title", "Desconocido"), "page": section["page"]}],
                embeddings=[embedding],
                ids=[doc_id]
            )

    with open(EMBEDDINGS_BACKUP_PATH, "w", encoding="utf-8") as f:
        json.dump(embeddings_dict, f, ensure_ascii=False, indent=4)
    print(f"✅ {len(embeddings_dict)} documentos indexados y guardados en ChromaDB 🚀")
    return embeddings_dict

embeddings_dict = cargar_documentos_y_embeddings()

def obtener_contexto(pregunta, n_docs=3):
    embedding_pregunta = embedding_model.encode([pregunta], convert_to_tensor=True).cpu().numpy().tolist()
    resultados = collection.query(query_embeddings=embedding_pregunta, n_results=n_docs)
    documents = resultados.get('documents', [])
    if not documents:
        return "No se encontraron documentos relevantes."

    documentos_convertidos = [
        " ".join(map(str, doc)) if isinstance(doc, list) else str(doc)
        for doc in documents
    ]
    return "\n".join(documentos_convertidos)

def generar_respuesta_rag(pregunta, max_tokens=300, temperatura=0.1):
    contexto = obtener_contexto(pregunta)
    entrada = f"Contexto: {contexto}\nPregunta: {pregunta}\nRespuesta:"
    inputs = tokenizer(entrada, return_tensors="pt").to(device)
    output = model.generate(
        **inputs,
        max_new_tokens=max_tokens,
        temperature=temperatura,
        top_p=0.9,
        repetition_penalty=1.05
    )
    return tokenizer.decode(output[0], skip_special_tokens=True)

# ---------------------------------
# FUNCIONES DE OCR Y CONSULTA A VIRUSTOTAL
# ---------------------------------
pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract'
nlp = spacy.load("en_core_web_sm")

API_KEY = "06858db9f480b4aba21a5831457a9b919b1f9014e6f8872ee1f4f7d1a029197c"
HEADERS = {"x-apikey": API_KEY}

def preprocesar_imagen(imagen):
    """Convierte la imagen a escala de grises y mejora el contraste."""
    try:
        image = Image.open(imagen).convert("L")
        enhancer = ImageEnhance.Contrast(image)
        return enhancer.enhance(2.0)
    except Exception as e:
        return None

def extraer_texto_img(imagen):
    """Extrae y limpia el texto de una imagen."""
    image = preprocesar_imagen(imagen)
    if image is None:
        return "Error al procesar la imagen"
    texto_extraido = pytesseract.image_to_string(image)
    return limpiar_texto(texto_extraido)

def consultar_ip(ip):
    url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip}"
    response = requests.get(url, headers=HEADERS)
    if response.status_code == 200:
        data = response.json()
        stats = data["data"]["attributes"]["last_analysis_stats"]
        malicious = stats.get("malicious", 0)
        if malicious > 0:
            veredicto = f"❌ La IP {ip} ha sido reportada como maliciosa en {malicious} análisis."
        else:
            veredicto = f"✅ La IP {ip} parece segura."
        return {"IP": ip, "Veredicto": veredicto, "Análisis": stats}
    return {"error": f"Error en la consulta: {response.status_code}"}

def consultar_url(url):
    scan_url = "https://www.virustotal.com/api/v3/urls"
    response = requests.post(scan_url, headers=HEADERS, data={"url": url})
    if response.status_code == 200:
        analysis_id = response.json()["data"]["id"]
        result_url = f"https://www.virustotal.com/api/v3/analyses/{analysis_id}"
        result_response = requests.get(result_url, headers=HEADERS)
        if result_response.status_code == 200:
            data = result_response.json()
            stats = data["data"]["attributes"]["stats"]
            malicious = stats.get("malicious", 0)
            if malicious > 0:
                veredicto = f"❌ La URL {url} ha sido marcada como maliciosa en {malicious} análisis."
            else:
                veredicto = f"✅ La URL {url} parece segura."
            return {"URL": url, "Veredicto": veredicto, "Análisis": stats}
    return {"error": f"Error en la consulta: {response.status_code}"}

def limpiar_texto(texto):
    texto = texto.strip()
    texto = re.sub(r'\s+', ' ', texto)
    return texto.replace("\n", " ").replace("\r", "")

def analizar_con_modelo(texto_extraido):
    """Analiza el mensaje para detectar señales de phishing utilizando el modelo.
       Responde siempre en español.
    """
    texto_limpio = limpiar_texto(texto_extraido)
    system_prompt = """
      Eres un asistente altamente especializado en ciberseguridad. Tu tarea principal es analizar mensajes y detectar intentos de phishing con precisión.
      🔹 REGLAS ESTRICTAS:
      1. No inventes información. Basa tu respuesta ÚNICAMENTE en el texto proporcionado.
      2. Sé conciso y preciso.
      3. La respuesta debe comenzar con "Phishing" o "Not Phishing", seguido de una breve explicación.
      4. Responde siempre en español.
    """
    prompt_modelo = f"""
      Analiza el siguiente mensaje y determina si se trata de un intento de phishing.
      MENSAJE A EVALUAR:
      {texto_extraido}
    """
    inputs = tokenizer(system_prompt + prompt_modelo, return_tensors="pt", padding=True, truncation=True).to(device)
    outputs = model.generate(
        inputs.input_ids,
        attention_mask=inputs.attention_mask,
        max_new_tokens=300,
        temperature=0.15,
        do_sample=True,
        top_k=10,
        top_p=0.7,
        repetition_penalty=1.2,
        pad_token_id=tokenizer.eos_token_id,
    )
    response = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
    return response

def analizar_prompt(prompt):
    """
    Detecta si el prompt contiene:
      - Una imagen (por extensión) para procesar OCR y análisis de phishing.
      - Una IP o URL para consulta en VirusTotal.
    Devuelve un resultado especial (no None) si se cumple alguno de estos casos.
    """
    if isinstance(prompt, str) and prompt.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.heic')):
        print(f"🔍 Detectada imagen: {prompt}")
        texto_extraido = extraer_texto_img(prompt)
        print(f"Texto extraído: {texto_extraido}")
        return analizar_con_modelo(texto_extraido)

    ip_pattern = r"\b(?:\d{1,3}\.){3}\d{1,3}\b"
    url_pattern = r'(https?://[^\s]+|www\.[^\s]+)'
    if re.search(ip_pattern, prompt):
        ip = re.search(ip_pattern, prompt).group()
        print(f"🔍 Detectada IP: {ip}")
        return consultar_ip(ip)
    if re.search(url_pattern, prompt):
        url = re.search(url_pattern, prompt).group()
        print(f"🔍 Detectada URL: {url}")
        return consultar_url(url)
    return None

# ---------------------------------
# FUNCIÓN UNIFICADA DE GENERACIÓN DE RESPUESTA
# ---------------------------------
def generate_response(prompt):
    """
    Función unificada que:
      1. Verifica si el prompt es especial (imagen, IP, URL) mediante analizar_prompt().
      2. Si no es especial, utiliza la generación basada en contexto (RAG).
      3. Y si no se encuentra relación en el contexto (por ejemplo, se obtiene "No se encontraron documentos relevantes"),
         se usa una generación “por defecto”.
    """
    # Paso 1: Verificar si el prompt es especial
    resultado_api = analizar_prompt(prompt)
    if resultado_api is not None:
        return json.dumps(resultado_api, indent=4, ensure_ascii=False)

    # Paso 2: Consultar contexto en ChromaDB para RAG
    contexto = obtener_contexto(prompt)
    if contexto.strip().lower().startswith("no se encontraron documentos"):
        # Paso 3: Generación por defecto si no hay contexto relevante
        inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)
        outputs = model.generate(
            inputs.input_ids,
            attention_mask=inputs.attention_mask,
            max_new_tokens=200,
            temperature=0.2,
            do_sample=True,
            top_k=50,
            top_p=0.9,
            repetition_penalty=1.5,
            pad_token_id=tokenizer.eos_token_id,
        )
        return tokenizer.decode(outputs[0], skip_special_tokens=True)
    else:
        # Si se encontró contexto, usar generación basada en RAG
        return generar_respuesta_rag(prompt)

# ---------------------------------
# PRUEBA DE LA FUNCIÓN UNIFICADA
# ---------------------------------
prompt_usuario = 'image6.png'
respuesta_usuario = generate_response(prompt_usuario)
print(f"\n🔹 Respuesta:\n{respuesta_usuario}")

# ---------------------------------
# CONFIGURACIÓN DEL BOT DE TELEGRAM
# ---------------------------------
load_dotenv()

logging.basicConfig(
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    level=logging.INFO,
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)

from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext

async def start(update: Update, context: CallbackContext) -> None:
    await update.message.reply_text("¡Hola! Soy tu especialista en ciberseguridad. ¿En qué te puedo ayudar?")

async def handle_message(update: Update, context: CallbackContext) -> None:
    user_input = update.message.text
    username = update.message.from_user.username
    logger.info(f"Mensaje recibido de {username}: {user_input}")
    response = generate_response(user_input)
    await update.message.reply_text(response)

def main():
    TELEGRAM_TOKEN = "7047664203:AAEa-JEcZQpv-tDCIdV6ZE_odp4lPTH0Bd8"
    if not TELEGRAM_TOKEN:
        logger.error("El token de Telegram no está configurado.")
        sys.exit(1)

    app_telegram = Application.builder().token(TELEGRAM_TOKEN).build()
    app_telegram.add_handler(CommandHandler("start", start))
    app_telegram.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
    logger.info("Bot iniciado y ejecutándose...")
    app_telegram.run_polling()

if __name__ == "__main__":
    main()


0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Connecting to security.ubuntu.com] [Waiting for headers] [Connected to r2u.stat.illinois.edu (19                                                                                                    Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
0% [Waiting for headers] [Connecting to security.ubuntu.com] [2 InRelease 3,632 B/3,632 B 100%] [Wai0% [Waiting for headers] [Connecting to security.ubuntu.com] [Waiting for headers] [Waiting for head                                                                                                    Get:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
0% [3 InRelease 5,484 B/128 kB 4%] [Connecting to security.ubuntu.com] [Waiting for headers] [Waitin                                                                                                    Get:4 https://developer.download.nvidia.com/compute

In [None]:
import os
import json
import torch
import requests
import chromadb
from pyngrok import ngrok
from fastapi import FastAPI
from google.colab import drive
from huggingface_hub import notebook_login
from sentence_transformers import SentenceTransformer
import uvicorn
import threading
from datasets import load_dataset, Dataset, DatasetDict
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TrainingArguments,
    BitsAndBytesConfig
)
from peft import LoraConfig, prepare_model_for_kbit_training, get_peft_model
from trl import SFTTrainer
import random
import pytesseract
import spacy
import re
from PIL import Image
from google.colab import files
import sys
import logging
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
from dotenv import load_dotenv
from PIL import Image, ImageEnhance
import re
import requests

In [None]:
# 🚀 Montar Google Drive para cargar JSON
drive.mount('/content/drive')
BASE_DIR = "/content/drive/My Drive/Trabajo Final Bootcamp"
NORMATIVA_DIR = os.path.join(BASE_DIR, "normativa")
EMBEDDINGS_BACKUP_PATH = os.path.join(BASE_DIR, "embeddings.json")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Verificar que la carpeta de normativa exista
if not os.path.exists(NORMATIVA_DIR):
    raise FileNotFoundError(f"La carpeta {NORMATIVA_DIR} no existe. Verifica la ruta o crea la carpeta.")

In [None]:
# Login en Hugging Face (ejecutar solo una vez)
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

## Cargo los modelos

In [None]:
# 🚀 Configurar GPU A100
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"✅ Usando dispositivo: {device}")

# 🚀 Cargar Modelo y Tokenizador
model_name = "CasiAC/deepseek-r1-8b-ciberseguridad"
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4"
)
model = AutoModelForCausalLM.from_pretrained(model_name, quantization_config=bnb_config, device_map="auto", trust_remote_code=True)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device=device)
print("✅ Modelo cargado 🚀")

✅ Usando dispositivo: cuda


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

✅ Modelo cargado 🚀


#Configurar ChromaDB en un contenedor Docker

In [None]:
# Iniciar el servidor FastAPI
app = FastAPI()

In [None]:
# Configuración de ngrok para exponer el puerto 8000
ngrok.set_auth_token("2sN2ljFFRN4UpJk7VPL6jPiHVJL_6FafRKrvugJysTGGRV1KB")
public_url = ngrok.connect(8000)
print(f"🔗 ChromaDB API accesible en: {public_url}")

🔗 ChromaDB API accesible en: NgrokTunnel: "https://74dc-35-188-84-114.ngrok-free.app" -> "http://localhost:8000"


In [None]:
# --- Crear cliente y colección en ChromaDB ---
client = chromadb.Client()

In [None]:
# Se unifica la variable como "collection" para mayor consistencia
collection = client.create_collection(name="test", metadata={"hnsw:search_ef": 100, "hnsw:construction_ef": 1000})

In [None]:
# --- Definición de endpoints de FastAPI ---
@app.get("/")
def read_root():
    return {"message": "Chroma API is running!"}

@app.get("/collections")
def get_collections():
    collections = client.list_collections()
    return {"collections": collections}

@app.get("/collections/{collection_name}")
def get_collection(collection_name: str):
    coll = client.get_collection(name=collection_name)
    return {"collection": coll}

@app.post("/collections/{collection_name}/add")
def add_to_collection(collection_name: str, item: dict):
    coll = client.get_collection(name=collection_name)
    coll.add(
        documents=[item["document"]],
        metadatas=[item.get("metadata", {})],
        ids=[item.get("id", "default_id")]
    )
    return {"message": f"Item added to collection {collection_name}"}

@app.get("/health")
def health_check():
    return {"status": "OK"}

In [None]:
# Iniciar el servidor FastAPI en un hilo de fondo para no bloquear Colab
def start_server():
    uvicorn.run(app, host="0.0.0.0", port=8000)

server_thread = threading.Thread(target=start_server, daemon=True)
server_thread.start()

## Cargar documentos JSON y Generar Embeddings

In [None]:
# 🚀 Cargar Documentos y Guardar Embeddings en ChromaDB
def cargar_documentos_y_embeddings():
    embeddings_dict = {}
    for archivo in os.listdir(NORMATIVA_DIR):
        ruta_json = os.path.join(NORMATIVA_DIR, archivo)
        with open(ruta_json, "r", encoding="utf-8") as f:
            documentos = json.load(f)

        for section in documentos.get("sections", []):
            doc_id = f"{archivo}_p{section['page']}"
            content = section["content"]
            embedding = embedding_model.encode(content, convert_to_tensor=True).cpu().numpy().tolist()
            embeddings_dict[doc_id] = embedding

            # Enviar embedding a ChromaDB
            collection.add(
                documents=[content],
                metadatas=[{"title": documentos.get("title", "Desconocido"), "page": section["page"]}],
                embeddings=[embedding],
                ids=[doc_id]
            )

    # Guardar copia en Drive
    with open(EMBEDDINGS_BACKUP_PATH, "w", encoding="utf-8") as f:
        json.dump(embeddings_dict, f, ensure_ascii=False, indent=4)
    print(f"✅ {len(embeddings_dict)} documentos indexados y guardados en ChromaDB 🚀")
    return embeddings_dict

embeddings_dict = cargar_documentos_y_embeddings()

✅ 215 documentos indexados y guardados en ChromaDB 🚀


# Implementar el RAG

In [None]:
# 🚀 Función para Obtener Contexto desde ChromaDB
def obtener_contexto(pregunta, n_docs=3):
    embedding_pregunta = embedding_model.encode([pregunta], convert_to_tensor=True).cpu().numpy().tolist()
    resultados = collection.query(
        query_embeddings=embedding_pregunta,
        n_results=n_docs
    )
    documents = resultados.get('documents', [])
    if not documents:
        return "No se encontraron documentos relevantes."

    # Convertir cada documento a cadena: si es una lista, unir sus elementos; si no, convertir a string
    documentos_convertidos = []
    for doc in documents:
        if isinstance(doc, list):
            documentos_convertidos.append(" ".join(map(str, doc)))
        else:
            documentos_convertidos.append(str(doc))

    return "\n".join(documentos_convertidos)


In [None]:
# 🚀 Función para Generar Respuesta
def generar_respuesta(pregunta, max_tokens=300, temperatura=0.1):
    contexto = obtener_contexto(pregunta)
    entrada = f"Contexto: {contexto}\nPregunta: {pregunta}\nRespuesta:"
    inputs = tokenizer(entrada, return_tensors="pt").to(device)
    output = model.generate(**inputs, max_new_tokens=max_tokens, temperature=temperatura, top_p=0.9, repetition_penalty=1.05)
    respuesta = tokenizer.decode(output[0], skip_special_tokens=True)
    return respuesta


In [None]:
# 🚀 Probar el modelo con el RAG
pregunta = "¿Cómo mejorar la seguridad en mi empresa?"
respuesta = generar_respuesta(pregunta)
print("🛡️ Respuesta Generada:")
print(respuesta)

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


🛡️ Respuesta Generada:
Contexto: ISO 27001:2022 IMPLEMENTATION GUIDE
14
SECCIÓN 4: 
CONTEXTO DE LA 
ORGANIZACIÓN
Contexto interno
Los siguientes son ejemplos de las áreas que pueden tenerse 
en cuenta al evaluar las cuestiones internas que pueden influir 
en los riesgos del SGSI:
•  Madurez: ¿Es una empresa ágil con un lienzo en blanco en 
el que trabajar, o una institución con procesos y controles 
de seguridad establecidos?
•  Cultura organizativa: ¿Es su organización relajada en 
cuanto a cómo, cuándo y dónde trabaja la gente, o 
extremadamente reglamentada? 
•  Gestión: ¿Existen canales y procesos de comunicación 
claros entre los principales responsables de la toma de 
decisiones y el resto de la organización?
•  Tamaño de los recursos: ¿Trabaja con un equipo de 
seguridad de la información o lo hace todo una persona?
•  Madurez de los recursos: ¿Los recursos disponibles están 
informados, plenamente formados, son fiables y constantes, 
o el personal carece de experiencia y c

In [None]:
# Configuración Tesseract OCR
pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract'
nlp = spacy.load("en_core_web_sm")

In [None]:

#Pipeline modelo + OCR

# 🔹 API Key de VirusTotal
API_KEY = "06858db9f480b4aba21a5831457a9b919b1f9014e6f8872ee1f4f7d1a029197c"
HEADERS = {"x-apikey": API_KEY}

def preprocesar_imagen(imagen):
    """Convierte la imagen a escala de grises y mejora el contraste."""
    try:
        image = Image.open(imagen)
        image = image.convert("L")
        enhancer = ImageEnhance.Contrast(image)
        image = enhancer.enhance(2.0)
        return image
    except Exception as e:
        return None

def extraer_texto_img(imagen):
    """Extrae texto de una imagen tras preprocesarla."""
    try:
        image = preprocesar_imagen(imagen)
        if image is None:
            return "Error al procesar la imagen"
        # Extraer texto con pytesseract
        texto_extraido = pytesseract.image_to_string(image)
        return limpiar_texto(texto_extraido)
    except Exception as e:
        return f"Error al procesar la imagen: {str(e)}"

def consultar_ip(ip):
    """Consulta una IP en VirusTotal y evalúa si es segura o maliciosa."""
    url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip}"
    response = requests.get(url, headers=HEADERS)

    if response.status_code == 200:
        data = response.json()
        stats = data["data"]["attributes"]["last_analysis_stats"]
        malicious = stats.get("malicious", 0)
        harmless = stats.get("harmless", 0)

        if malicious > 0:
            veredicto = f"❌ La IP {ip} ha sido reportada como **maliciosa** en {malicious} análisis."
        else:
            veredicto = f"✅ La IP {ip} parece **segura**, sin reportes de actividad maliciosa."

        return {
            "IP": ip,
            "Veredicto": veredicto,
            "Análisis": stats
        }
    return {"error": f"Error en la consulta: {response.status_code}"}

def consultar_url(url):
    """Consulta una URL en VirusTotal y evalúa si es segura o maliciosa."""
    scan_url = "https://www.virustotal.com/api/v3/urls"
    response = requests.post(scan_url, headers=HEADERS, data={"url": url})

    if response.status_code == 200:
        analysis_id = response.json()["data"]["id"]
        result_url = f"https://www.virustotal.com/api/v3/analyses/{analysis_id}"
        result_response = requests.get(result_url, headers=HEADERS)

        if result_response.status_code == 200:
            data = result_response.json()
            stats = data["data"]["attributes"]["stats"]
            malicious = stats.get("malicious", 0)
            harmless = stats.get("harmless", 0)

            if malicious > 0:
                veredicto = f"❌ La URL {url} ha sido **marcada como maliciosa** en {malicious} análisis."
            else:
                veredicto = f"✅ La URL {url} parece **segura**, sin reportes de actividad maliciosa."

            return {
                "URL": url,
                "Veredicto": veredicto,
                "Análisis": stats
            }

    return {"error": f"Error en la consulta: {response.status_code}"}

def analizar_prompt(prompt):
    """Detecta si el prompt contiene una IP o URL y consulta VirusTotal si es necesario."""

    if isinstance(prompt, str) and (prompt.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.heic'))):
        print(f"🔍 Detectada imagen: {prompt}")
        texto_extraido = extraer_texto_img(prompt)
        print(f"Texto extraído de la imagen: {texto_extraido}")

        return analizar_con_modelo(texto_extraido)

    ip_pattern = r"\b(?:\d{1,3}\.){3}\d{1,3}\b"
    url_pattern = r'(https?://[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}(?:/[^ \n]*)?|www\.[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}(?:/[^ \n]*)?|[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}(?:/[^ \n]*)?)'

    ip_match = re.search(ip_pattern, prompt)
    url_match = re.search(url_pattern, prompt)

    if ip_match:
        ip = ip_match.group()
        print(f"🔍 Detectada IP en el prompt: {ip}")
        return consultar_ip(ip)

    if url_match:
        url = url_match.group()
        print(f"🔍 Detectada URL en el prompt: {url}")
        return consultar_url(url)

    return None  # No se detectó ninguna IP o URL

def generar_respuesta(prompt):
    """Genera una respuesta con el modelo o consulta VirusTotal si es necesario."""

    resultado_api = analizar_prompt(prompt)

    if resultado_api:
        return json.dumps(resultado_api, indent=4, ensure_ascii=False)  # Respuesta en JSON

    # Tokenizar el prompt y generar la respuesta
    inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)

    outputs = model.generate(
        inputs.input_ids,
        attention_mask=inputs.attention_mask,
        max_new_tokens=300,
        temperature=0.3,
        do_sample=True,
        top_k=50,
        top_p=0.9,
        repetition_penalty=1.5,
        pad_token_id=tokenizer.eos_token_id,
    )

    # Decodificar y limpiar la salida
    response = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
    return response.replace(prompt, "").strip()

def analizar_con_modelo(texto_extraido):
    """Pasa el texto al modelo para que lo analice en busca de señales claras de phishing."""

    texto_limpio = limpiar_texto(texto_extraido)

    system_prompt = """
      You are a highly specialized AI in cybersecurity. Your primary task is to analyze messages and detect phishing attempts with precision.

      🔹 STRICT RULES:
      1. Do not invent information. You must base your response ONLY on the given text.
      2. Do not provide general explanations unless explicitly asked.
      3. Be concise and precise. Your response must be short and strictly relevant.
      4. Use formal cybersecurity language and avoid assumptions.
      5. Output must always start with either "Phishing" or "Not Phishing", followed by a brief explanation of why.

      🔹 HOW TO ANALYZE A MESSAGE FOR PHISHING:
      - Urgency: Does the message pressure the user to act quickly?
      - Suspicious Links: Does it contain shortened or untrusted links?
      - Requests for Personal Information: Is the user asked to provide passwords or sensitive data?
      - Errors or Inconsistencies: Are there grammar mistakes, unnatural tone, or unusual sender details?

      🔹 RESPONSE FORMAT (STRICT):
      ```
      Phishing or Not Phishing
      Explanation: [Concise justification, mentioning phishing indicators if present]
      ```
      ---
    """

    prompt_modelo = f"""
  	  Analyze the following message and determine whether it is a **phishing attempt** based on the criteria defined in the system instructions.

      REMEMBER: Your response must strictly follow the required format. Do not repeat instructions or add unnecessary details.

      MESSAGE TO EVALUATE:
      {texto_extraido}
    """

    # Tokenizamos el prompt y lo pasamos al modelo
    inputs = tokenizer(system_prompt + prompt_modelo, return_tensors="pt", padding=True, truncation=True).to(device)

    outputs = model.generate(
        inputs.input_ids,
        attention_mask=inputs.attention_mask,
        max_new_tokens=300,  # Limitar los tokens de la respuesta
        temperature=0.15,  # Controlar la aleatoriedad
        do_sample=True,  # Sin aleatorización
        top_k=10,  # Menos diversidad
        top_p=0.7,
        repetition_penalty=1.2,
        pad_token_id=tokenizer.eos_token_id,
    )

    # Decodificar y limpiar la salida
    response = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()

    if "</think>" in response:
        user_answer = response.split("</think>", 1)[-1].strip()

    return user_answer



def limpiar_texto(texto):
    """Limpia el texto extraído de caracteres no deseados."""
    texto = texto.strip()
    texto = re.sub(r'\s+', ' ', texto)  # Reemplazar espacios extra
    texto = texto.replace("\n", " ").replace("\r", "")  # Eliminar saltos de línea
    return texto




prompt_usuario_4 = 'image6.png'
respuesta_4 = generar_respuesta(prompt_usuario_4)
print(f"\n🔹 Respuesta para imagen:\n{respuesta_4}")



🔍 Detectada imagen: image6.png
Texto extraído de la imagen: Sent on: Friday, June 23, 2023 11:31:12 AM To: Subject: YOUR ACCOUNT IS AT RISK!! Dear Valued User , We received a request from you to terminate your Office 365 email due to a dual college/universities account. This process has begun by our administrator. If you did not authorize this action and you have no knowledge of it, you are advised to re-verify your account. Please give us 24 hours to terminate your account if you initiated the request. Failure to re-verify will result in the closure of your account and you will lose all of my files on these 365 accounts. If this request was made accidentally and you have no knowledge of it, you are advised to copy and paste the URL Below into the address bar of your web browser to fill in the form. cutt.ly/OwtNi6KO Failure to Verify will result in the closure of your account. lowa State University IT Helpdesk All Right Reserved.

🔹 Respuesta para imagen:
"Not Phishing  \nExplanation: 

In [None]:
# Función para generar respuestas
def generate_response(text):
    inputs = tokenizer(text, return_tensors="pt").to(device)
    output = model.generate(**inputs, max_new_tokens=200, temperature=0.2, do_sample=True)
    return tokenizer.decode(output[0], skip_special_tokens=True)

In [None]:
import nest_asyncio
nest_asyncio.apply()

In [None]:
# Cargar variables de entorno
load_dotenv()

# Configurar logging
logging.basicConfig(
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    level=logging.INFO,
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)

# Handlers de Telegram
async def start(update: Update, context: CallbackContext) -> None:
    await update.message.reply_text("¡Hola! Soy tu especialista en ciberseguridad. ¿En qué te puedo ayudar?")

async def handle_message(update: Update, context: CallbackContext) -> None:
    user_input = update.message.text
    username = update.message.from_user.username
    logger.info(f"Mensaje recibido de {username}: {user_input}")

    response = generate_response(user_input)  # Usa la función importada
    await update.message.reply_text(response)

# Función principal
def main():
    TELEGRAM_TOKEN = "7047664203:AAEa-JEcZQpv-tDCIdV6ZE_odp4lPTH0Bd8"
    if not TELEGRAM_TOKEN:
        logger.error("El token de Telegram no está configurado.")
        sys.exit(1)

    app = Application.builder().token(TELEGRAM_TOKEN).build()
    app.add_handler(CommandHandler("start", start))
    app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))

    logger.info("Bot iniciado y ejecutándose...")
    app.run_polling()

if __name__ == "__main__":
    main()

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


RuntimeError: Cannot close a running event loop

In [None]:
main()