In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# 1. Crear sesión de Spark
spark = SparkSession.builder.appName("AmazonReviewsPipeline").getOrCreate()

# 2. Cargar el CSV desde DBFS
file_path = "/FileStore/tables/Datafiniti_Amazon_Consumer_Reviews_of_Amazon_Products_May19.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)

# 3. Seleccionar y renombrar columnas
data = data.selectExpr("`reviews.text` as texto", "`reviews.rating` as rating") \
           .dropna(subset=["texto", "rating"])

# 4. Crear columna binaria: 1 (positivo), 0 (negativo)
data = data.withColumn("etiqueta", when(col("rating") >= 4, 1).when(col("rating") <= 2, 0)) \
           .dropna(subset=["etiqueta"])

# 5. Limpiar el texto
data = data.withColumn("texto_limpio", lower(col("texto")))
data = data.withColumn("texto_limpio", regexp_replace(col("texto_limpio"), "[^a-zA-Z\\s]", ""))

# 6. Construir pipeline de NLP + ML
tokenizer = Tokenizer(inputCol="texto_limpio", outputCol="palabras")
remover = StopWordsRemover(inputCol="palabras", outputCol="palabras_limpias")
hashingTF = HashingTF(inputCol="palabras_limpias", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")
lr = LogisticRegression(labelCol="etiqueta", featuresCol="features")

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

# 7. Entrenar y evaluar
train, test = data.randomSplit([0.8, 0.2], seed=42)
modelo = pipeline.fit(train)
predicciones = modelo.transform(test)

# 8. Evaluación con AUC
evaluator = BinaryClassificationEvaluator(labelCol="etiqueta")
auc = evaluator.evaluate(predicciones)
from pyspark.sql import Row
display(spark.createDataFrame([Row(AUC=f"{auc:.3f}")]))

# Muestra algunas predicciones con texto original
predicciones.select("texto", "rating", "etiqueta", "prediction", "probability").show(10, truncate=False)



AUC
0.89


+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+--------+----------+------------------------------------------+
|texto                                                                                                                                                                                                                                                                                                                           