Código completo

In [6]:
# === Notebook: Modelado con Spark ML (Regresión Logística) - Wine Quality (UCI) ===
# Autor: Ricardo Herrera Lara
# Fecha: noviembre 2025

# 1) Instalación y configuración de Spark en Colab
!apt-get update -qq
!apt-get install -qq openjdk-11-jdk-headless  # Java necesario para Spark
!pip install -q pyspark==3.5.1 findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

# 2) Crear la sesión Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("WineQuality_LogisticRegression") \
    .getOrCreate()

print("Spark session creada:", spark.sparkContext.appName)

# 3) Descargar dataset (UCI - Wine Quality)
# Usaremos el vino tinto (winequality-red.csv). Puedes cambiar a white si quieres.
!wget -q -O winequality-red.csv "https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
print("Dataset descargado")

# 4) Cargar datos en Spark DataFrame (archivo separado por ;)
df = spark.read.csv("winequality-red.csv", header=True, sep=';', inferSchema=True)
print("Esquema original:")
df.printSchema()
print("Primeras filas:")
df.show(5)

# 5) Exploración rápida y limpieza
# Revisar nulos
from pyspark.sql.functions import col, isnan, when, count
null_stats = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
print("Conteo de nulos por columna:")
null_stats.show()

# Estadísticas resumidas
print("Descripción estadística:")
df.describe().toPandas().set_index('summary').T

# Si hubiera nulos, podríamos imputar. En este dataset UCI no suele haber nulos.
# Asegurarnos de que todas las features son numéricas (double)
numeric_cols = [c for c, t in df.dtypes if t in ('double','int','float','bigint')]
print("Columnas numéricas detectadas:", numeric_cols)

# 6) Crear etiqueta binaria (label) para regresión logística
# Definimos "buena" calidad como quality >= 7
from pyspark.sql.functions import when
df2 = df.withColumn("label", when(col("quality") >= 7, 1).otherwise(0))
print("Distribución de la etiqueta (label):")
df2.groupBy("label").count().show()

# 7) Feature selection y pipeline (VectorAssembler + StandardScaler + LogisticRegression)
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

feature_cols = [c for c in df.columns if c != 'quality']

feature_cols = [c for c in feature_cols if c != 'label']
print("Features usadas:", feature_cols)

assembler = VectorAssembler(inputCols=feature_cols, outputCol="rawFeatures", handleInvalid="skip")
scaler = StandardScaler(inputCol="rawFeatures", outputCol="features", withMean=True, withStd=True)
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=100, regParam=0.01, elasticNetParam=0.0)

pipeline = Pipeline(stages=[assembler, scaler, lr])

# 8) Train/test split
train_df, test_df = df2.randomSplit([0.8, 0.2], seed=42)
print("Tamaño train:", train_df.count(), "Tamaño test:", test_df.count())

# 9) Entrenar modelo
model = pipeline.fit(train_df)
print("Modelo entrenado")

# 10) Predicción sobre test
predictions = model.transform(test_df)
predictions.select("features", "label", "rawPrediction", "probability", "prediction").show(5)

# 11) Evaluación (accuracy, precision, recall, F1, AUC)
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# AUC (ROC)
bce = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = bce.evaluate(predictions)

# Accuracy, F1, precision, recall (como multiclass, pero aquí etiqueta 0/1)
mce_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
mce_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
mce_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="precisionByLabel")
mce_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="recallByLabel")

accuracy = mce_acc.evaluate(predictions)
f1 = mce_f1.evaluate(predictions)
# precisionByLabel and recallByLabel need label index: using default metrics returns for label "1.0" if available.
precision = mce_precision.evaluate(predictions)  # en algunos Spark versions esto retorna precision global si no hay etiqueta partic.
recall = mce_recall.evaluate(predictions)

print(f"Evaluación del modelo sobre test set:")
print(f" - AUC (ROC): {auc:.4f}")
print(f" - Accuracy:  {accuracy:.4f}")
print(f" - F1-score:  {f1:.4f}")
print(f" - Precision: {precision:.4f}")
print(f" - Recall:    {recall:.4f}")

# 12) Matriz de confusión
confusion = predictions.groupBy("label", "prediction").count().orderBy("label", "prediction")
print("Matriz de confusión (label, prediction, count):")
confusion.show()

# 13) Interpretación simple: coeficientes del modelo
lr_model = model.stages[-1]
coefficients = lr_model.coefficients.toArray().tolist()
intercept = lr_model.intercept
# Map coefficients to feature names (after assembler order)
assembled_features = assembler.getInputCols()
coef_map = list(zip(assembled_features, coefficients))
print("Intercept:", intercept)
print("Coeficientes (feature, coef):")
for feat, coef in coef_map:
    print(f"{feat:25s}: {coef:.6f}")

# 14) (Opcional) Búsqueda de hiperparámetros con CrossValidator / TrainValidationSplit
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=bce,
                           trainRatio=0.8,
                           seed=42)

# bestModel = tvs.fit(train_df)
# bestModelPreds = bestModel.transform(test_df)
# print("AUC mejor modelo:", BinaryClassificationEvaluator().evaluate(bestModelPreds))

# 15) Guardar modelo localmente
model_path = "/content/wine_lr_spark_model"
model.write().overwrite().save(model_path)
print("Modelo guardado en:", model_path)

# 16) Guardar resultados/Evaluación a un archivo de texto para el informe
report = f"""
Resultados del modelo (Logistic Regression) - Wine Quality (binaria quality>=7):
 - AUC (ROC): {auc:.4f}
 - Accuracy:  {accuracy:.4f}
 - F1-score:  {f1:.4f}
 - Precision: {precision:.4f}
 - Recall:    {recall:.4f}
 - Train size: {train_df.count()}
 - Test size:  {test_df.count()}
 - Umbral para 'buena' calidad: >=7
"""
with open("/content/wine_model_report.txt", "w") as f:
    f.write(report)
print("Reporte guardado en /content/wine_model_report.txt")


# 17) Detener Spark
#spark.stop()


W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Spark session creada: WineQuality_LogisticRegression
Dataset descargado
Esquema original:
root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)

Primeras filas:
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidit