## 1. Inicializar Spark y preparar Spark SQL

En esta sección:
- Importamos las librerías necesarias.
- Creamos una `SparkSession`.
- Dejamos lista la sesión para trabajar con **DataFrames** y **Spark SQL** (consultas SQL sobre vistas temporales).

In [None]:
# 1. Importar librerías y crear SparkSession orientada a Spark SQL

from pyspark.sql import SparkSession, functions as F
import os, sys

# Aseguramos que Spark use el mismo intérprete de Python que este notebook
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

# Creamos/obtenemos una sesión de Spark
spark = (
    SparkSession.builder
    .appName("AnalisisDiscursoPresidencialSparkSQL")
    .config("spark.pyspark.python", sys.executable)
    .config("spark.pyspark.driver.python", sys.executable)
    .getOrCreate()
)

print("Versión de Spark:", spark.version)

## 2. Cargar el archivo de texto como DataFrame y vista SQL

En esta sección:
- Definimos la ruta del archivo `05 - discurso.txt`.
- Cargamos el archivo usando `spark.read.text` en un DataFrame.
- Registramos una vista temporal para poder consultarlo con Spark SQL.
- Vemos algunas líneas para verificar la lectura.

In [None]:
# 2. Cargar el archivo de discurso como DataFrame y crear vista SQL

# Ruta del archivo de texto.
# Si el notebook está en la misma carpeta que el archivo (M8/S2),
# esta ruta relativa debería funcionar directamente (en Colab suele ser /content/...).
file_path = "/content/05 - discurso.txt"

# Cargamos el archivo como un DataFrame de una sola columna llamada "value"
lines_df = spark.read.text(file_path)

# Renombramos la columna para que sea más descriptiva
lines_df = lines_df.withColumnRenamed("value", "line")

# Registramos una vista temporal para usar Spark SQL
lines_df.createOrReplaceTempView("discurso")

print("Número aproximado de líneas:", lines_df.count())
print("\nPrimeras líneas del discurso:\n")
for row in lines_df.limit(5).collect():
    print(row["line"])

## 3. Limpieza de texto y definición de stopwords

En esta sección:
- Pasamos el texto a minúsculas.
- Eliminamos signos de puntuación y caracteres no alfabéticos.
- Separamos el texto en palabras.
- Definimos una lista de **stopwords** (palabras sin mucho significado para el análisis: artículos, pronombres, etc.).
- Generamos un DataFrame de palabras limpias y lo registramos como vista para usarlo desde **Spark SQL**.

In [None]:
# 3. Funciones de limpieza, lista de stopwords y generación de DataFrame de palabras

import re

# Lista de palabras vacías (stopwords) en español.
# Se puede ampliar según necesidad.
stopwords = {
    "a", "e", "i", "o", "u",
    "el", "la", "los", "las", "un", "una", "unos", "unas",
    "y", "o", "u", "de", "del", "al", "en", "con", "por", "para",
    "como", "que", "se", "su", "sus", "es", "son", "ser", "fue", "fueron",
    "este", "esta", "estos", "estas", "ese", "esa", "esos", "esas",
    "yo", "tú", "tu", "usted", "ustedes", "nosotros", "nosotras",
    "ellos", "ellas",
    "mi", "mis", "nuestro", "nuestra", "nuestros", "nuestras",
    "su", "sus",
    "un", "una", "al", "del",
    "pero", "también", "ya", "muy", "más", "menos",
    "que", "porque", "cuando", "donde",
    "hoy", "ayer", "mañana",
    "es", "son", "era", "eran", "ser", "estar", "está", "están"
}

def limpiar_y_dividir(linea):
    """Convierte la línea a minúsculas, elimina puntuación y la separa en palabras."""
    # Pasamos a minúsculas
    linea = linea.lower()
    # Reemplazamos cualquier caracter que no sea letra (incluye tildes y ñ) por un espacio
    # Conservamos letras a-z, áéíóúñü
    linea = re.sub(r"[^a-záéíóúñü]+", " ", linea)
    # Dividimos en palabras por espacios
    palabras = linea.split()
    return palabras

# Probamos la función de limpieza con una línea de ejemplo
ejemplo = "¡Queridos compatriotas chilenos! Hoy me dirijo a ustedes..."
print("Línea original:", ejemplo)
print("Palabras limpias:", limpiar_y_dividir(ejemplo))

# Usamos la función en el DataFrame para obtener un DataFrame de palabras
lines_con_palabras_df = lines_df.withColumn(
    "palabras",
    F.udf(limpiar_y_dividir, "array<string>")(F.col("line"))
)

# Explosionamos el array de palabras para tener una fila por palabra
palabras_df = lines_con_palabras_df.select(F.explode("palabras").alias("word"))

# Registramos la vista de palabras para consultar con Spark SQL
palabras_df.createOrReplaceTempView("palabras")

palabras_df.show(10)

## 4. Pipeline con Spark SQL: contar palabras

Ahora realizamos el conteo de palabras usando **consultas Spark SQL** sobre la vista `palabras`:

1. Filtramos **stopwords** directamente en la consulta SQL.
2. Agrupamos por palabra (`GROUP BY`).
3. Contamos cuántas veces aparece cada palabra (`COUNT(*)`).
4. Ordenamos por frecuencia de mayor a menor (`ORDER BY`).
5. Obtenemos las **10 palabras más frecuentes** usando `LIMIT 10`.

In [None]:
# 4. Construir el pipeline de conteo de palabras usando Spark SQL

# Generamos una lista de stopwords en formato SQL ('palabra1','palabra2',...)
stopwords_list = sorted(list(stopwords))
stopwords_sql_list = ",".join(f"'{w}'" for w in stopwords_list)

consulta_top_10 = f"""
SELECT
    word,
    COUNT(*) AS frecuencia
FROM palabras
WHERE word NOT IN ({stopwords_sql_list})
GROUP BY word
ORDER BY frecuencia DESC
LIMIT 10
"""

top_10_palabras_df = spark.sql(consulta_top_10)

top_10_palabras_df.show()

## 5. Mostrar los 10 principales términos formateados

En esta sección simplemente mostramos los resultados de forma más amigable:
- Palabra.
- Cantidad de apariciones.

Usaremos `collect()` para traer los resultados a Python y formatearlos.

In [None]:
# 5. Imprimir los 10 términos más frecuentes de forma legible

print("Top 10 palabras más frecuentes (sin stopwords):\n")
for row in top_10_palabras_df.collect():
    palabra = row["word"]
    frecuencia = row["frecuencia"]
    print(f"{palabra:15s} -> {frecuencia}")

## 6. Cierre de la sesión de Spark

Al finalizar el análisis, es buena práctica detener la sesión de Spark.

In [None]:
# 6. Detener la sesión de Spark

spark.stop()