# Challenge

El archivo [SOLUTION.md](SOLUTION.md) contiene la explicaci√≥n detallada del paso apaso de la soluci√≥n

## Ambiente de desarrollo

Se utiliza make para desarrollar el proyecto. Puedes ver las opciones disponibles en el archivo [Makefile](Makefile).


In [None]:
import os
import sys
import re
import json
import emoji
import random
import orjson
import unicodedata
import polars as pl
import pandas as pd
from collections import Counter
from typing import List, Iterable

# Local libraries - ensure project root is in sys.path before local imports
project_root = os.path.abspath(os.path.join(os.getcwd(), ".."))
if project_root not in sys.path:
    sys.path.append(project_root)

from src.common.utils import twitter_schema  # noqa: E402
from src.common.performance import profile_performance, profile_detailed  # noqa: E402

file_path = "../farmers-protest-tweets-2021-2-4.json"

## Data Analisis

Se realiza un analisis de los datos para comprender su estructura y posibles problemas de calidad. ver detalle en la secci√≥n 2 del archivo [SOLUTION.md](../SOLUTION.md) 

In [None]:
def analyze_twitter_nuances(file_path, sample_size=20000):
    print(f"--- üïµÔ∏è‚Äç‚ôÇÔ∏è Deep Dive Analysis: {file_path} ---")

    data = []
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            for i, line in enumerate(f):
                if i >= sample_size:
                    break
                try:
                    data.append(json.loads(line))
                except json.JSONDecodeError:
                    continue
    except FileNotFoundError:
        print("Error: Archivo no encontrado.")
        return

    df = pd.DataFrame(data)

    # ==========================================
    # 1. AN√ÅLISIS DE TRUNCAMIENTO (Extended Mode)
    # ==========================================
    print("\n[1] AN√ÅLISIS DE TRUNCAMIENTO")

    # Verificar si existen claves nativas de la API v1.1
    has_truncated_key = "truncated" in df.columns
    has_extended_tweet = "extended_tweet" in df.columns

    if has_truncated_key:
        truncated_count = df["truncated"].sum() if df["truncated"].dtype == bool else 0
        print(
            f" - Tweets marcados como 'truncated': {truncated_count} ({truncated_count / len(df):.2%})"
        )

        if has_extended_tweet:
            extended_count = df["extended_tweet"].notnull().sum()
            print(f" - Tweets con objeto 'extended_tweet' disponible: {extended_count}")
    else:
        print(
            " - La clave 'truncated' NO existe en este dataset (probablemente ya fue procesado/aplanado)."
        )

    # Verificar visualmente si el contenido parece cortado
    # Los tweets truncados suelen terminar en "..." o un enlace t.co
    df["ends_with_ellipsis"] = df["content"].astype(str).str.strip().str.endswith("‚Ä¶")
    suspicious_truncation = df["ends_with_ellipsis"].sum()
    print(f" - Tweets que terminan visualmente en '‚Ä¶': {suspicious_truncation}")

    # ==========================================
    # 2. AN√ÅLISIS DE RETWEETS (Duplicidad)
    # ==========================================
    print("\n[2] AN√ÅLISIS DE RETWEETS")

    # Detectar RTs
    # Opci√≥n A: Clave 'retweeted_status' (Standard API)
    if "retweeted_status" in df.columns:
        rts_count = df["retweeted_status"].notnull().sum()
        print(f" - Detectados por metadato 'retweeted_status': {rts_count}")

    # Opci√≥n B: Texto empieza con "RT @"
    df["is_rt_text"] = df["content"].astype(str).str.startswith("RT @")
    rts_text_count = df["is_rt_text"].sum()
    print(
        f" - Detectados por texto ('RT @...'): {rts_text_count} ({rts_text_count / len(df):.2%})"
    )

    if rts_text_count > 0:
        print(
            "   -> CONCLUSI√ìN: Los Retweets est√°n presentes. Q2 y Q3 estar√°n inflados por repetici√≥n."
        )

    # ==========================================
    # 3. MENCIONES: TEXTO VS METADATA (Q3)
    # ==========================================
    print("\n[3] COMPARATIVA DE MENCIONES (Q3)")

    # Funci√≥n para extraer menciones con Regex (enfoque ingenuo)
    def extract_regex_mentions(text):
        return set(re.findall(r"@(\w+)", str(text)))

    # Funci√≥n para extraer menciones de Metadata (enfoque robusto)
    def extract_meta_mentions(mentions_list):
        if not isinstance(mentions_list, list):
            return set()
        return set(
            m.get("username")
            for m in mentions_list
            if isinstance(m, dict) and m.get("username")
        )

    # Aplicar comparativa en una muestra peque√±a para velocidad
    sample_df = df.head(1000).copy()

    sample_df["regex_mentions"] = sample_df["content"].apply(extract_regex_mentions)
    sample_df["meta_mentions"].fillna("", inplace=True)
    sample_df["meta_mentions"] = sample_df["mentionedUsers"].apply(
        extract_meta_mentions
    )

    # Buscar discrepancias
    # Casos donde Metadata tiene ALGO pero Regex NO (Menciones invisibles/Reply)
    sample_df["hidden_mentions"] = sample_df.apply(
        lambda x: x["meta_mentions"] - x["regex_mentions"], axis=1
    )
    hidden_count = sample_df[sample_df["hidden_mentions"].astype(bool)].shape[0]

    # Casos donde Regex tiene ALGO pero Metadata NO (Falsos positivos, emails, usuarios suspendidos)
    sample_df["fake_mentions"] = sample_df.apply(
        lambda x: x["regex_mentions"] - x["meta_mentions"], axis=1
    )
    fake_count = sample_df[sample_df["fake_mentions"].astype(bool)].shape[0]

    print(f"An√°lisis sobre {len(sample_df)} registros:")
    print(
        f" - Casos donde Metadata detecta usuarios que Regex NO ve (Hidden/Reply): {hidden_count}"
    )
    if hidden_count > 0:
        example = sample_df[sample_df["hidden_mentions"].astype(bool)].iloc[0]
        print(
            f"   Ejemplo Hidden -> Texto: '{example['content'][:50]}...' | Meta: {example['meta_mentions']}"
        )

    print(
        f" - Casos donde Regex detecta '@' que NO son usuarios v√°lidos en Metadata: {fake_count}"
    )
    if fake_count > 0:
        example = sample_df[sample_df["fake_mentions"].astype(bool)].iloc[0]
        print(
            f"   Ejemplo Falso Positivo -> Texto: '{example['content'][:50]}...' | Regex: {example['regex_mentions']}"
        )


def advanced_analysis(file_path):
    print(f"--- üî¨ An√°lisis Forense Avanzado: {file_path} ---")

    usernames_raw = []
    usernames_normalized = []

    try:
        with open(file_path, "r", encoding="utf-8") as f:
            for line in f:
                try:
                    tweet = json.loads(line)
                    u = tweet.get("user", {}).get("username")
                    if u:
                        usernames_raw.append(u)
                        # Normalizaci√≥n NFKC + Lowercase
                        usernames_normalized.append(
                            unicodedata.normalize("NFKC", u).lower()
                        )
                except Exception:
                    continue
    except FileNotFoundError:
        print("Archivo no encontrado")
        return

    # 1. CHECK DE CASE SENSITIVITY Y UNICODE
    unique_raw = len(set(usernames_raw))
    unique_norm = len(set(usernames_normalized))

    print("\n[1] INTEGRIDAD DE ENTIDADES (Usernames)")
    print(f" - Usuarios √∫nicos (Crudo): {unique_raw}")
    print(f" - Usuarios √∫nicos (Normalizado + Lower): {unique_norm}")
    diff = unique_raw - unique_norm
    if diff > 0:
        print(
            f" ‚ö†Ô∏è ALERTA: Se detectaron {diff} duplicados por falta de normalizaci√≥n/may√∫sculas."
        )
        print(" -> ACCI√ìN: Es MANDATORIO aplicar .lower() y unicodedata.")
    else:
        print(
            " -> OK: No se detectaron colisiones, pero es buena pr√°ctica implementarlo."
        )

    # 2. DETECCI√ìN DE BOTS (OUTLIERS Q1)
    print("\n[2] DISTRIBUCI√ìN DE ACTIVIDAD (Q1)")
    counts = Counter(usernames_raw)
    top_5 = counts.most_common(5)

    df_activity = pd.Series(list(counts.values()))
    p99 = df_activity.quantile(0.99)
    max_tweets = df_activity.max()

    print(f" - Top 5 Usuarios m√°s activos:\n   {top_5}")
    print(f" - El 99% de usuarios tiene menos de {p99:.0f} tweets.")
    print(f" - El usuario #1 tiene {max_tweets} tweets.")

    if max_tweets > (p99 * 10):
        print(
            f" ‚ö†Ô∏è ALERTA: El usuario top tiene una actividad {max_tweets / p99:.1f}x mayor al promedio."
        )
        print(
            " -> OBSERVACI√ìN: Probable Bot. Documentar en SOLUTION.md que esto sesga los resultados."
        )

    # 3. EMPATES (TIE-BREAKING)
    print("\n[3] RIESGO DE EMPATES EN EL CORTE")
    # Ver si hay muchos usuarios con el mismo conteo en el borde del top 10
    counts_values = list(counts.values())
    counts_freq = Counter(counts_values)

    # Imaginemos que el corte del top 10 es alrededor de X tweets
    sorted_counts = sorted(counts_values, reverse=True)
    if len(sorted_counts) > 10:
        val_at_10 = sorted_counts[9]  # El valor del d√©cimo lugar
        users_at_cutoff = counts_freq[val_at_10]
        print(f" - Valor de corte (Puesto #10): {val_at_10} tweets")
        print(f" - Cu√°ntos usuarios tienen exactamente ese valor: {users_at_cutoff}")

        if users_at_cutoff > 1:
            print(" ‚ö†Ô∏è ALERTA CR√çTICA: Hay EMPATE en el puesto #10.")
            print(
                " -> ACCI√ìN: Tu c√≥digo DEBE tener un criterio de desempate (ej: alfab√©tico) o los tests fallar√°n aleatoriamente."
            )


def discover_anomalies(file_path, sample_size=15000):
    print(f"--- Iniciando An√°lisis Profundo: {file_path} (n={sample_size}) ---")

    data = []
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            for i, line in enumerate(f):
                if i < sample_size:
                    data.append(line)
                else:
                    r = random.randint(0, i)
                    if r < sample_size:
                        data[r] = line
    except FileNotFoundError:
        print(f"Error: El archivo {file_path} no existe.")
        return

    parsed = []
    corrupt_lines = 0
    for line in data:
        try:
            parsed.append(json.loads(line))
        except Exception:
            corrupt_lines += 1

    if corrupt_lines:
        print(f"[ALERTA] Se detectaron {corrupt_lines} l√≠neas corruptas en la muestra.")

    # Normalizaci√≥n para an√°lisis
    df_anom = pd.json_normalize(parsed)

    print("\n1. INTEGRIDAD DE COLUMNAS CLAVE")
    columns_to_check = ["date", "content", "mentionedUsers", "user.username", "user.id"]
    for col in columns_to_check:
        if col in df_anom.columns:
            types = df_anom[col].apply(lambda x: type(x).__name__).value_counts()
            nulls = df_anom[col].isnull().sum()
            print(f"- '{col}': {len(types)} tipos detectados. Nulos: {nulls}")
        else:
            print(f"- [ERROR] Columna '{col}' NO encontrada.")

    print("\n2. AN√ÅLISIS DE USUARIOS (Q1 & Q3)")
    # Verificar si un username tiene m√∫ltiples IDs (cambio de handle)
    user_consistency = df_anom.groupby("user.username")["user.id"].nunique()
    inconsistent = user_consistency[user_consistency > 1]
    print(f"- Usuarios con m√°s de un ID: {len(inconsistent)}")

    # Calcular potencial de String Interning
    total_names = len(df_anom["user.username"])
    unique_names = df_anom["user.username"].nunique()
    print(
        f"- Ratio de Repetici√≥n de Usernames: {total_names / unique_names:.2f}x (Alto ratio justifica sys.intern)"
    )

    print("\n3. AN√ÅLISIS DE MENCIONES (Q3)")
    # Validar estructura interna de mentionedUsers
    mentions_data = df_anom["mentionedUsers"].dropna()
    has_nested_nulls = mentions_data.apply(
        lambda x: any(m.get("username") is None for m in x if isinstance(m, dict))
    ).sum()
    print(f"- Registros con listas de menciones v√°lidas: {len(mentions_data)}")
    print(f"- Listas con objetos internos nulos: {has_nested_nulls}")

    print("\n4. AN√ÅLISIS DE EMOJIS COMPLEJOS (Q2)")
    all_emojis_anom = []
    complex_count_anom = 0
    for txt in df_anom["content"].dropna():
        # emoji_list devuelve informaci√≥n detallada de cada emoji
        found = emoji.emoji_list(txt)
        for e in found:
            char = e["emoji"]
            all_emojis_anom.append(char)
            # Si tiene m√°s de un componente unicode o caracteres especiales de uni√≥n
            if len(char) > 1 or "\u200d" in char:
                complex_count_anom += 1

    print(f"- Total emojis detectados: {len(all_emojis_anom)}")
    print(f"- Emojis complejos (ZWJ/Multi-char): {complex_count_anom}")
    if all_emojis_anom:
        print(f"- Top 3 Emojis en muestra: {Counter(all_emojis_anom).most_common(3)}")

    print("\n5. VALORES EXTREMOS Y FECHAS")
    df_anom["date_parsed"] = pd.to_datetime(df_anom["date"], errors="coerce")
    print(f"- Rango temporal: {df_anom['date_parsed'].min()} a {df_anom['date_parsed'].max()}")
    print(f"- Tweets por fuera de 2021: {len(df_anom[df_anom['date_parsed'].dt.year != 2021])}")

analyze_twitter_nuances(file_path)
advanced_analysis(file_path)
discover_anomalies(file_path)

## Definici√≥n de estaratefias de optimizaci√≥n

Depues de analizar los datos y el c√≥digo,ver detalle en la secci√≥n 3 del archivo [SOLUTION.md](../SOLUTION.md) 



## Calidad de software

Se proponen realizar test basados en los resultados del analisis de los datos. ver detalle en la secci√≥n 4 del archivo [SOLUTION.md](../SOLUTION.md. 

## C√≥digo

El proceso de optimizaci√≥n de el procesamiento de datos empieza desde el proceso de lectura

del archivo.


In [None]:
def read_standard_json(file_path: str) -> List[dict]:
    """Lee y devuelve una lista completa de diccionarios (Standard)."""
    data = []
    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            try:
                data.append(json.loads(line))
            except json.JSONDecodeError:
                continue
    return data


def read_streaming_orjson(file_path: str) -> Iterable[dict]:
    """Devuelve un generador (Lazy) de diccionarios usando orjson."""
    with open(file_path, "rb") as f:
        for line in f:
            try:
                yield orjson.loads(line)
            except orjson.JSONDecodeError:
                continue


def read_chunks_orjson(file_path: str, chunk_size: int = 5000) -> Iterable[List[dict]]:
    """Lee y entrega bloques de 5000 registros sin funciones extra."""
    with open(file_path, "rb") as f:
        chunk = []
        for line in f:
            try:
                chunk.append(orjson.loads(line))
                if len(chunk) == chunk_size:
                    yield chunk
                    chunk = []
            except orjson.JSONDecodeError:
                continue
        if chunk:
            yield chunk


def read_full_mem_json(file_path: str) -> List[dict]:
    """Lee todo el archivo en memoria y devuelve la lista de diccionarios."""
    with open(file_path, "r", encoding="utf-8") as f:
        lines = f.readlines()
    return [json.loads(line) for line in lines if line.strip()]


def run_benchmarks(file_path: str):
    print(f"üöÄ INICIANDO BENCHMARK INTEGRAL DESACOPLADO: {file_path}\n")

    strategies = [
        ("Standard JSON", read_standard_json),
        ("Streaming Orjson", read_streaming_orjson),
        ("Chunks Orjson", read_chunks_orjson),
        (
            "Polars read_ndjson (Eager)",
            lambda f: pl.read_ndjson(f, infer_schema_length=None, ignore_errors=True),
        ),
        (
            "Polars scan_ndjson (Lazy)",
            lambda f: pl.scan_ndjson(f, infer_schema_length=None, ignore_errors=True),
        ),
        (
            "Polars read_ndjson (Eager) + schema",
            lambda f: pl.read_ndjson(f, schema=twitter_schema, ignore_errors=True),
        ),
        (
            "Polars scan_ndjson (Lazy) + schema",
            lambda f: pl.scan_ndjson(f, schema=twitter_schema, ignore_errors=True),
        ),
        ("Full Memory (readlines)", read_full_mem_json),
        ("Pandas read_json", lambda f: pd.read_json(f, lines=True)),
    ]

    for name, func in strategies:
        print(f"\n{'#' * 70}")
        print(f"### ESTRATEGIA: {name}")
        print(f"{'#' * 70}")

        # Medici√≥n 1: Rendimiento Real (Tiempo Wall-clock + Pico RAM)
        # Esto es lo que realmente importa para la eficiencia base
        perf_monitor = profile_performance(func)
        perf_monitor(file_path)

        # Medici√≥n 2: An√°lisis T√©cnico (Opcional - Reporte cProfile)
        # Se ejecuta aparte para no influir en los tiempos reales de arriba
        detailed_monitor = profile_detailed(func)
        detailed_monitor(file_path)


run_benchmarks(file_path)

Despues de haber definido los tipos de lectura de archivos que se utilizar√°n para cada proceso se procede a experimentar con el rendimiento de las funcions que ayudaran en la soluci√≥n de las preguntas, para esto se definen funciones qeu permiten realizar las preubas m√°s facilmente, sin embargo debido  a latencia qeu produce la ejecuci√≥n en JUpiter notebooks se deja este analisis en el archivo [benchmark.py](../benchmark.py)