In [1]:
!pip install pyspark==3.4.1 spark-nlp==5.3.2 requests beautifulsoup4

Collecting pyspark==3.4.1
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting spark-nlp==5.3.2
  Downloading spark_nlp-5.3.2-py2.py3-none-any.whl.metadata (57 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.1/57.1 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
Collecting py4j==0.10.9.7 (from pyspark==3.4.1)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading spark_nlp-5.3.2-py2.py3-none-any.whl (564 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m565.0/565.0 kB[0m [31m36.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m15.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
 

In [2]:
from pyspark.sql.functions import concat_ws, col
from pyspark.ml.clustering import LDA
from pyspark.ml import Pipeline
from google.colab import files
import pandas as pd
import numpy as np
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml.feature import CountVectorizer
import os
import sparknlp
from pyspark.sql import SparkSession
import urllib.request
from pyspark.ml.feature import IDF


In [3]:
# Instalo Java 8

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Inicio sesion de Spark

spark = sparknlp.start()
print(f"Versión Spark: {spark.version}")


Versión Spark: 3.4.1


In [4]:
# Cargo el archivo desde un repositorio de github para no tener que cargarlo manualmente cada vez que abro Colab

url_archivo = "https://github.com/juandimeglio25/TP_Prog/raw/refs/heads/main/bbc-news-data.csv"
archivo_local = "bbc_news.csv"

if not os.path.exists(archivo_local):
    print(">>> Descargando dataset...")
    urllib.request.urlretrieve(url_archivo, archivo_local)
else:
    print(">>> Archivo local detectado.")

df_raw = spark.read \
    .option("header", True) \
    .option("delimiter", ";") \
    .option("inferSchema", True) \
    .option("multiLine", True) \
    .option("quote", '"') \
    .option("escape", '"') \
    .csv(archivo_local)

df = df_raw.filter(col("content").isNotNull())

df = df.withColumn("texto_completo", concat_ws(" ", col("title"), col("content")))

print("Esquema final y muestra:")
df.printSchema()
df.select("title", "texto_completo").show(2, truncate=80)

>>> Descargando dataset...
Esquema final y muestra:
root
 |-- category: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- title: string (nullable = true)
 |-- content: string (nullable = true)
 |-- texto_completo: string (nullable = false)

+---------------------------------+--------------------------------------------------------------------------------+
|                            title|                                                                  texto_completo|
+---------------------------------+--------------------------------------------------------------------------------+
|Ad sales boost Time Warner profit|Ad sales boost Time Warner profit  Quarterly profits at US media giant TimeWa...|
| Dollar gains on Greenspan speech|Dollar gains on Greenspan speech  The dollar has hit its highest level agains...|
+---------------------------------+--------------------------------------------------------------------------------+
only showing top 2 rows



In [5]:
# EMpiezo con el pipeline de procesamiento: Convierto texto a formato documento
document_assembler = DocumentAssembler() \
    .setInputCol("texto_completo") \
    .setOutputCol("document")

# Tokenizo
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

# Normalizo
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized") \
    .setLowercase(True)

# Elimino Stopwords (palabras vacías) y adicionales para que no manchen el modelo, como pueden ser algunos verbos o pronombres
palabras_basura = ["mr", "said", "would", "year", "also", "new", "one","our", "ours", "ourselves", "yours", "my", "me","going", "gone", "went", "goes", "two","ba","bn", "last", "first","we", "m", "make", "play", "win", "get", "use", "go", "take", "say", "tell", "come", "see", "us"]

stopwords_cleaner = StopWordsCleaner.pretrained("stopwords_en", "en") \
    .setInputCols(["normalized"]) \
    .setOutputCol("clean_tokens") \
    .setCaseSensitive(False) \
    .setStopWords(StopWordsCleaner.loadDefaultStopWords("english") + palabras_basura)

# Lematizo
lemmatizer = LemmatizerModel.pretrained("lemma_antbnc", "en") \
    .setInputCols(["clean_tokens"]) \
    .setOutputCol("lemma")

# Finisher (agarro los resultados)
finisher = Finisher() \
    .setInputCols(["lemma"]) \
    .setOutputCols(["tokens_finales"]) \
    .setCleanAnnotations(False)

# Con esto ejecuto el pipeline
nlp_pipeline = Pipeline(stages=[
    document_assembler,
    tokenizer,
    normalizer,
    stopwords_cleaner,
    lemmatizer,
    finisher
])

stopwords_en download started this may take some time.
Approximate size to download 2.9 KB
[OK!]
lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [6]:
# fit() entrena el pipeline
model_nlp = nlp_pipeline.fit(df)

# transform() aplica la lógica a los datos
processed_df = model_nlp.transform(df)

# Verifico que la lematización funcionó (usando la corrección de .result)
processed_df.select(
    col("title"),
    col("lemma.result").alias("ejemplo_lemas")
).show(5, truncate=80)

+---------------------------------+--------------------------------------------------------------------------------+
|                            title|                                                                   ejemplo_lemas|
+---------------------------------+--------------------------------------------------------------------------------+
|Ad sales boost Time Warner profit|[ad, sales, boost, time, warner, profit, quarterly, profit, media, giant, tim...|
| Dollar gains on Greenspan speech|[dollar, gain, greenspan, speech, dollar, hit, high, level, euro, almost, thr...|
|Yukos unit buyer faces loan claim|[yukos, unit, buyer, face, loan, claim, owner, embattled, russian, oil, giant...|
|High fuel prices hit BA's profits|[high, fuel, price, hit, ba, profit, british, airway, blame, high, fuel, pric...|
|Pernod takeover talk lifts Domecq|[pernod, takeover, talk, lift, domecq, share, uk, drink, food, firm, ally, do...|
+---------------------------------+-----------------------------

In [7]:
# Calculo la TF
cv = CountVectorizer(
    inputCol="tokens_finales",
    outputCol="features",
    vocabSize=1000,
    minDF=5.0
)

cv_model = cv.fit(processed_df)
df_vectorizado = cv_model.transform(processed_df)

# Cacheo para optimizar
df_vectorizado.cache()

# Agrego una 'seed' para que los resultados no cambien cada vez que se ejecuta
lda = LDA(k=5, maxIter=10, seed=1234)
model_lda = lda.fit(df_vectorizado)


In [8]:
vocab = cv_model.vocabulary

# Indices de los términos más importantes por tema
topics = model_lda.describeTopics(maxTermsPerTopic=10)
topics_rdd = topics.rdd

# Función auxiliar para mapear índices numéricos a palabras reales
def map_indices_to_words(row):
    topic_indices = row['termIndices']
    terms = [vocab[idx] for idx in topic_indices]
    return (row['topic'], terms)

print("RESULTADOS DEL ANÁLISIS DE TÓPICOS:")
results = topics_rdd.map(map_indices_to_words).collect()

for topic, words in results:
    print(f"Tema {topic}: {', '.join(words)}")

RESULTADOS DEL ANÁLISIS DE TÓPICOS:
Tema 0: bank, rise, market, growth, rate, fall, party, economy, figure, december
Tema 1: game, show, time, good, take, well, make, people, second, world
Tema 2: people, firm, use, company, phone, music, mobile, technology, user, service
Tema 3: film, good, award, star, win, england, year, world, include, club
Tema 4: government, minister, plan, country, company, tell, firm, uk, report, deal


In [9]:
# TF-IDF
idf = IDF(inputCol="features", outputCol="tfidf_features")

idf_model = idf.fit(df_vectorizado)

df_tfidf = idf_model.transform(df_vectorizado)
print(">>> Vectores TF-IDF generados:")
df_tfidf.select("title", "tfidf_features").show(5, truncate=50)

>>> Vectores TF-IDF generados:
+---------------------------------+--------------------------------------------------+
|                            title|                                    tfidf_features|
+---------------------------------+--------------------------------------------------+
|Ad sales boost Time Warner profit|(1000,[2,9,10,11,15,21,27,29,30,31,35,37,39,41,...|
| Dollar gains on Greenspan speech|(1000,[2,4,7,10,13,15,19,21,29,33,35,37,39,42,4...|
|Yukos unit buyer faces loan claim|(1000,[11,14,16,21,28,71,72,78,97,98,113,117,12...|
|High fuel prices hit BA's profits|(1000,[1,4,6,8,10,13,14,25,29,30,35,41,42,44,45...|
|Pernod takeover talk lifts Domecq|(1000,[2,10,13,15,18,28,35,41,45,46,49,60,72,73...|
+---------------------------------+--------------------------------------------------+
only showing top 5 rows



In [10]:
# Entreno LDA usando los vectores TF-IDF en lugar de los conteos simples
lda_tfidf = LDA(k=5, maxIter=10, featuresCol="tfidf_features")
model_lda_tfidf = lda_tfidf.fit(df_tfidf)


In [12]:
#Comparo conteo simple con TF-IDF

print(">>> Evaluando Modelo 1 (Conteo Simple):")
ll_counts = model_lda.logLikelihood(df_vectorizado)
lp_counts = model_lda.logPerplexity(df_vectorizado)
print(f"Log Likelihood: {ll_counts}")
print(f"Log Perplexity: {lp_counts}")

print("-" * 30)


# Entreno un nuevo modelo LDA específico para TF-IDF
lda_tfidf = LDA(k=5, maxIter=10, featuresCol="tfidf_features")
model_lda_tfidf = lda_tfidf.fit(df_tfidf)

print(">>> Evaluando Modelo 2 (TF-IDF):")
ll_tfidf = model_lda_tfidf.logLikelihood(df_tfidf)
lp_tfidf = model_lda_tfidf.logPerplexity(df_tfidf)

print(f"Log Likelihood (TF-IDF): {ll_tfidf}")
print(f"Log Perplexity (TF-IDF): {lp_tfidf}")

>>> Evaluando Modelo 1 (Conteo Simple):
Log Likelihood: -1784641.015852126
Log Perplexity: 6.525338822248846
------------------------------
>>> Evaluando Modelo 2 (TF-IDF):
Log Likelihood (TF-IDF): -4218207.366975071
Log Perplexity (TF-IDF): 6.6120093212875295
