Parte 1 – Filtrado y preprocesamiento

Cargue la información del archivo large_even.json en una lista, muestre la cantidad de
registros total (deben ser 746, 909). Este es nuestro tráfico inicial!

In [2]:
import json


total_records = 0
dns_events = []
all_events=[]


with open('large_eve.json', 'r') as file:
    for line in file:
        total_records += 1
        try:
            
            data = json.loads(line)
            all_events.append(data)
            if data.get('event_type') == 'dns':
                dns_events.append(data)
        
            # print(data)
        except json.JSONDecodeError as e:
            print(f"Error en la línea: {line}")
            print(f"Detalles del error: {e}")
print(f"Total de registros en el archivo: {total_records}")

Total de registros en el archivo: 746909


Debido a que estamos buscando dominios web, del total de registros, solamente estamos
interesados en los registros DNS. Cargue únicamente aquellos registros que sean DNS.

In [3]:

# Guardar los eventos DNS en un nuevo archivo JSON
with open('dns_events.json', 'w') as outfile:
    json.dump(dns_events, outfile, indent=4)

print(f"{len(dns_events)} eventos DNS guardados en 'dns_events.json'.")

15749 eventos DNS guardados en 'dns_events.json'.


Muestre la información de 2 registros cualesquiera

In [4]:
print(all_events[12])
print(all_events[20])

{'timestamp': '2017-07-22T17:33:17.786254-0500', 'flow_id': 1135816796360543, 'pcap_cnt': 30365, 'event_type': 'http', 'vlan': 140, 'src_ip': '192.168.204.59', 'src_port': 38012, 'dest_ip': '192.168.24.201', 'dest_port': 80, 'proto': 'TCP', 'tx_id': 1, 'http': {'hostname': '192.168.24.201', 'url': '/phppgadmin/browser.php', 'http_user_agent': 'Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.15) Gecko/2009102814 Ubuntu/8.10 (intrepid) Firefox/3.0.15', 'http_content_type': 'text/html', 'http_refer': 'http://192.168.24.201/phppgadmin/', 'http_method': 'GET', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 1125}}
{'timestamp': '2017-07-22T17:33:18.003156-0500', 'flow_id': 1135816796360543, 'pcap_cnt': 32000, 'event_type': 'http', 'vlan': 140, 'src_ip': '192.168.204.59', 'src_port': 38012, 'dest_ip': '192.168.24.201', 'dest_port': 80, 'proto': 'TCP', 'tx_id': 5, 'http': {'hostname': '192.168.24.201', 'url': '/phppgadmin/xloadtree/xloadtree2.js', 'http_user_agent': 'Mozilla/5.0 (X11; U; Li

Debido a que la data consiste en estructuras JSON anidadas, utilice la característica
json_normalize para normalizar la información y asignarla en un dataframe. Muestre el shape
del dataframe

In [5]:
from pandas import json_normalize

# Normalizar la data JSON anidada
df = json_normalize(all_events)

# Mostrar shape del DataFrame
print("Shape del DataFrame:", df.shape)

Shape del DataFrame: (746909, 199)


Como estamos buscando dominios DGA, debemos filtrar los registros DNS para aquellos
registros tipo A (son aquellos que mantienen una dirección IP asociada a un dominio)

In [6]:
import json

# Cargar los eventos DNS guardados
with open('dns_events.json', 'r') as f:
    dns_events = json.load(f)

# Filtrar consultas DNS donde rrtype sea 'A'
dns_rrtype_a = [
    event for event in dns_events
    if event.get("dns", {}).get("rrtype") == "A"
]

# Mostrar cantidad de registros con rrtype A
print("Cantidad de consultas DNS tipo A:", len(dns_rrtype_a))


Cantidad de consultas DNS tipo A: 2849


Filtre los dominios únicos

In [7]:
# Extraer los dominios (rrname) de los eventos DNS tipo A
dominios = [event["dns"]["rrname"] for event in dns_rrtype_a if "rrname" in event["dns"]]

# Obtener dominios únicos
dominios_unicos = set(dominios)

# Mostrar cantidad y ejemplo
print(f"Cantidad de dominios únicos: {len(dominios_unicos)}")
print("Ejemplo de dominios únicos:")
for dominio in list(dominios_unicos)[:10]:
    print("-", dominio)


Cantidad de dominios únicos: 177
Ejemplo de dominios únicos:
- 
- www.arduino.cc
- www.freepbx.org
- tools.google.com.ad.aol.aoltw.net
- ueip.vmware.com
- secure.informaction.com.home
- widgets.alexa.com
- gg.arrancar.org
- www.sql-ledger.org
- mirrors.tummy.com


Escriba una función que obtenga el TLD para un dominio. Por ejemplo, para
api.wunderground.com el TLD es wunderground.com, para
safebrowsing.clients.google.com.home, el TLD es home. Utilice un LLM para ayudarle a
escribir esta función, verifique que obtiene correctamente el TLD, incluya el prompt utilizado
en su notebook.

In [8]:
# 1. Función para extraer el TLD
def obtener_tld(dominio):
    """
    Extrae el TLD personalizado de un dominio.
    - Si termina en '.home', retorna 'home'.
    - Si tiene múltiples subdominios, retorna los dos últimos segmentos (como dominio.tld).
    """
    partes = dominio.strip('.').split('.')
    if len(partes) == 1:
        return partes[0]
    elif partes[-1] == "home":
        return "home"
    else:
        return ".".join(partes[-2:])  # dominio.tld

# 2. Extraer los dominios de los registros DNS tipo A
dominios = [event["dns"]["rrname"] for event in dns_rrtype_a if "rrname" in event["dns"]]

# 3. Obtener TLDs aplicando la función
tlds = [obtener_tld(d) for d in dominios]

# 4. Mostrar algunos TLDs únicos y su cantidad
tlds_unicos = set(tlds)
print(f"Cantidad de TLDs únicos: {len(tlds_unicos)}")

print("Ejemplo de TLDs únicos:")
for t in list(tlds_unicos)[:10]:
    print("-", t)


Cantidad de TLDs únicos: 106
Ejemplo de TLDs únicos:
- 
- youtube.com
- tummy.com
- nvidia.com
- 22.201:
- mozilla.com
- usf.edu
- sql-ledger.org
- bluehost.com
- umoss.org


Utilice Gemini para clasificar los dominios como DGA (1) o legítimos (0).

In [None]:
import json
import google.generativeai as genai

with open('dns_events.json', 'r') as f:
    dns_events = json.load(f)

dominios = [
    event['dns']['rrname']
    for event in dns_events
    if event.get('dns', {}).get('rrtype') == 'A' and 'rrname' in event['dns']
]


dominios_unicos = list(set(dominios))

genai.configure(api_key="GEMINI KEY")

model = genai.GenerativeModel(model_name="models/gemini-1.5-pro-latest")


prompt = (
    "Actúa como un detector de dominios DGA.\n"
    "Devuelve 1 si es DGA, 0 si es legítimo.\n"
    "Formato JSON por dominio.\n"
    "Ejemplos:\n"
    "- google.com → 0\n"
    "- asdkl123zx.biz → 1\n"
    "Ahora clasifica y dime cuantos registros DGA unicos hay:\n" +
    "\n".join(f"- {dominio}" for dominio in dominios_unicos)
)


response = model.generate_content(prompt)
print(response.text)


```json
{
  "www.arduino.cc": 0,
  "www.freepbx.org": 0,
  "tools.google.com.ad.aol.aoltw.net": 0,
  "ueip.vmware.com": 0,
  "secure.informaction.com.home": 0,
  "widgets.alexa.com": 0,
  "gg.arrancar.org": 0,
  "www.sql-ledger.org": 0,
  "mirrors.tummy.com": 0,
  "aolmtcmxm04.office.aol.com.ad.aol.aoltw.net": 1,
  "safebrowsing.clients.google.com.localdomain": 0,
  "safebrowsing.clients.google.com.stayonline.net": 0,
  "192.168.26-27.0": 1,
  "repo.genomics.upenn.edu": 0,
  "192.168.22.201:.stayonline.net": 1,
  "hpca-tier2.office.aol.com": 1,
  "AOLDTCMA04.ad.aol.aoltw.net": 1,
  "www.acunetix.com": 0,
  "192.168.21.1201": 1,
  "www.securityfocus.com": 0,
  "1922.168.22.254": 1,
  "www.metasploit.com.office.aol.com": 0,
  "www.stopbadware.org": 0,
  "clients1.google.com.ad.aol.aoltw.net": 0,
  "whitecell.localdomain": 0,
  "www.sql-ledger.org.hsd1.pa.comcast.net": 0,
  "safebrowsing.clients.google.com.hsd1.pa.comcast.net": 0,
  "ky.hec.net": 0,
  "secure.informaction.com": 0,
  "mirr

devuelva 0 si el TLD se encuentra en la lista y 1 si no está. Utilice un LLM para crear dicha
función, verifique que no se carga la lista cada vez que se busca un TLD

PROMPT
Escribe una función en Python que reciba un TLD como string (por ejemplo "google.com", "suspicious.biz", etc.) y verifique si está presente en una lista de 1 millón de dominios populares (desde un archivo CSV). La función debe devolver 0 si el TLD está en la lista y 1 si no está. La lista solo debe cargarse una vez, aunque la función se llame muchas veces.


In [30]:
import pandas as pd
import google.generativeai as genai
import json
import time

# === 1. Leer tu archivo CSV ===
dominios_df = pd.read_csv('top-1m.csv')

# === 2. Extraer los dominios (máximo 30-50 por prompt, dependiendo del modelo) ===
dominios = dominios_df['Dominio'].tolist()

# === 3. Configurar Gemini ===
genai.configure(api_key="AIzaSyBIjVW_T1yEKT-5SQKxes-8jxJqLeV389A")  # Reemplaza con tu clave real

model = genai.GenerativeModel("models/gemini-1.5-pro-latest")

# === Leer los primeros 1000 dominios del top-1m ===
df_top = pd.read_csv('top-1m.csv', header=None, names=['rank', 'domain'])
dominios = df_top['domain'].head(1000).tolist()

# === Función para clasificar con Gemini ===
def clasificar_con_gemini(lote):
    prompt = (
        "Actúa como un clasificador de dominios generados algorítmicamente (DGA).\n"
        "Devuelve 1 si un dominio parece generado automáticamente o sospechoso, y 0 si es legítimo.\n"
        "Responde en formato JSON:\n"
        "[{\"dominio\": \"abc123.biz\", \"clasificacion\": 1}, ...]\n\n"
        "Dominios a clasificar:\n" +
        "\n".join(f"- {d}" for d in lote)
    )
    
    try:
        response = model.generate_content(prompt)
        raw = response.text.strip().replace("```json", "").replace("```", "").replace("→", ":").replace("'", '"')
        if raw.startswith("["):
            return json.loads(raw)
        else:
            print("⚠️ Respuesta no válida.")
            return []
    except Exception as e:
        print("❗ Error:", e)
        time.sleep(10)
        return []

# === Clasificar por bloques ===
resultados = []
batch_size = 20

for i in range(0, len(dominios), batch_size):
    lote = dominios[i:i + batch_size]
    print(f"🔄 Clasificando dominios {i+1} a {i+len(lote)}...")
    res = clasificar_con_gemini(lote)
    if isinstance(res, list):
        resultados.extend(res)
    time.sleep(1.5)

# === Convertir en DataFrame y guardar ===
df_resultado = pd.DataFrame(resultados)
df_resultado.columns = ['Dominio', 'DGA']

# Guardar sospechosos
df_sospechosos = df_resultado[df_resultado['DGA'] == 1]
df_sospechosos.to_csv('sospechosos_detectados_por_gemini.csv', index=False)

print("\n✅ Clasificación terminada.")
print(f"Se detectaron {len(df_sospechosos)} dominios sospechosos.")

  df_top = pd.read_csv('top-1m.csv', header=None, names=['rank', 'domain'])


🔄 Clasificando dominios 1 a 20...
❗ Error: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 25
}
]


KeyboardInterrupt: 

In [None]:
# === Funciones previas de soporte ===

_top1m_set = None

def cargar_top1m():
    global _top1m_set
    if _top1m_set is None:
        df = pd.read_csv('top-1m.csv', header=None, names=['rank', 'domain'])
        _top1m_set = set(df['domain'])

def verificar_tld(tld):
    cargar_top1m()
    return 0 if tld in _top1m_set else 1

def extraer_tld(dominio):
    partes = dominio.strip().split('.')
    if len(partes) <= 2:
        return dominio
    else:
        return ".".join(partes[-2:])

# === Leer el archivo generado por Gemini ===
df_sospechosos = pd.read_csv('sospechosos_detectados_por_gemini.csv')

# === Extraer TLD y verificar si está en top-1m ===
df_sospechosos['tld'] = df_sospechosos['Dominio'].apply(extraer_tld)
df_sospechosos['en_top_1m'] = df_sospechosos['tld'].apply(verificar_tld)

# === Filtrar los que SÍ están en el top-1m ===
df_filtrados = df_sospechosos[df_sospechosos['en_top_1m'] == 0]

# === Mostrar y guardar resultados ===
print(f"✅ Se encontraron {len(df_filtrados)} dominios sospechosos que SÍ están en el top-1m.")
df_filtrados.to_csv('sospechosos_en_top1m.csv', index=False)


Finalmente, para confirmar los dominios maliciosos podemos buscar la fecha de creación del
TLD. Cree una función qué en base al TLD, devuelva la fecha de creación de este. Utilice un
LLM para escribir dicha función, incluya el prompt utilizado en su notebook


Prompt
Escribe una función en Python que reciba un TLD (por ejemplo, 'google.com', 'example.biz', etc.) y devuelva la fecha de creación del dominio usando WHOIS. Si no puede encontrar la fecha, devuelve 'Desconocida'. Usa la biblioteca `whois` de Python.


In [None]:
import whois

def obtener_fecha_creacion(tld):
    """
    Devuelve la fecha de creación del TLD usando WHOIS.
    Si no se puede obtener, retorna 'Desconocida'.
    """
    try:
        info = whois.whois(tld)
        fecha = info.creation_date
        # A veces WHOIS devuelve una lista
        if isinstance(fecha, list):
            return fecha[0].strftime("%Y-%m-%d")
        elif fecha:
            return fecha.strftime("%Y-%m-%d")
        else:
            return "Desconocida"
    except Exception as e:
        return "Desconocida"

# Aplicar sobre tu archivo de sospechosos
df = pd.read_csv('sospechosos_en_top1m.csv')
df['fecha_creacion'] = df['tld'].apply(obtener_fecha_creacion)
df.to_csv('sospechosos_con_fecha.csv', index=False)
