# Proyecto Final Modulo
## Nombre: Darlyn Bravo Peña

## Conjunto de Dataset
[Red Wine Quality](hps://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv)

## 1. Conguración e Inicio de Spark

We are using 1 worker to make this work

In [2]:
# Imports
from pyspark.sql import SparkSession

In [4]:
# Session builder
spark = SparkSession.builder \
    .appName("NoteBookFinalSession") \
    .getOrCreate()

# Verifying the current session builder (only for testing purpose)
print(spark.sparkContext.getConf().getAll())

[('spark.driver.port', '43205'), ('spark.executor.id', 'driver'), ('spark.driver.memory', '1g'), ('spark.driver.host', '32d4c8e10f1a'), ('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'), ('spark.app.name', 'N

## 2. Carga y Exploración de Datos (ETL)

In [7]:
df = spark.read.csv('/data/winequality-red.csv', header=True, inferSchema=True, sep=';')
print("Dataset loaded successfully")

Dataset loaded successfully


### Exploratory Analysis

In [8]:
# Print Schema
df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [18]:
# Show first 5 lines
df.limit(5).toPandas()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5


In [22]:
# Stadistic Summary
df.describe().show()

+-------+------------------+-------------------+-------------------+------------------+--------------------+-------------------+--------------------+--------------------+-------------------+------------------+------------------+------------------+
|summary|     fixed acidity|   volatile acidity|        citric acid|    residual sugar|           chlorides|free sulfur dioxide|total sulfur dioxide|             density|                 pH|         sulphates|           alcohol|           quality|
+-------+------------------+-------------------+-------------------+------------------+--------------------+-------------------+--------------------+--------------------+-------------------+------------------+------------------+------------------+
|  count|              1599|               1599|               1599|              1599|                1599|               1599|                1599|                1599|               1599|              1599|              1599|              1599|
|   mean

In [10]:
df.groupBy("quality").count().orderBy("quality").show()

+-------+-----+
|quality|count|
+-------+-----+
|      3|   10|
|      4|   53|
|      5|  681|
|      6|  638|
|      7|  199|
|      8|   18|
+-------+-----+



We can see a balance of data with quality **5,6,7**

In [11]:
for col_name in df.columns:
    if col_name != 'quality':
        corr = df.stat.corr(col_name, 'quality')
        print(f"Correlación entre {col_name} y quality: {corr:.4f}")

Correlación entre fixed acidity y quality: 0.1241
Correlación entre volatile acidity y quality: -0.3906
Correlación entre citric acid y quality: 0.2264
Correlación entre residual sugar y quality: 0.0137
Correlación entre chlorides y quality: -0.1289
Correlación entre free sulfur dioxide y quality: -0.0507
Correlación entre total sulfur dioxide y quality: -0.1851
Correlación entre density y quality: -0.1749
Correlación entre pH y quality: -0.0577
Correlación entre sulphates y quality: 0.2514
Correlación entre alcohol y quality: 0.4762


In [20]:
#  Null values per column
from pyspark.sql.functions import col, sum

df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density| pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|            0|               0|          0|             0|        0|                  0|                   0|      0|  0|        0|      0|      0|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+



## 3. Preparación de Datos y Feature Engineering

### Creating a new label column from quality called target

In [29]:
from pyspark.sql.functions import when

df = df.withColumn("label", when(df["quality"] > 5, 1.0).otherwise(0.0))

### Feature Selection
We are skipping the **quality and label** variable because it will used as a target.

In [36]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Chossing feature columns
feature_cols = [col for col in df.columns if col not in ('quality', 'label')]

# creating the unscaled_features vector assembler
feature_assembler = VectorAssembler(
 inputCols=feature_cols,
 outputCol="unscaled_features"
)

# normalizing features using standard scaler
scaler = StandardScaler(inputCol='unscaled_features', outputCol='features')

## Training the Model using Pipelines
We are going to use `LogisticRegression` and `DecisionTreeClassifier`

In [40]:
# Import required libs
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [33]:
# Split data for training and test
train, test = df.randomSplit([0.8, 0.2], seed=42)

In [46]:
# Preparing the pipeline
models = {
    'LogisticRegression': LogisticRegression(labelCol='label', featuresCol='features', maxIter=10),
    'DecisionTree': DecisionTreeClassifier(labelCol='label', featuresCol='features')
}

# Run each model
results = []

# Evaluador AUC
evaluator = BinaryClassificationEvaluator(labelCol='label', metricName='areaUnderROC')
confusion_matrix = {}

for name, model in models.items():
    pipeline = Pipeline(stages=[feature_assembler, scaler, model])
    pipeline_model = pipeline.fit(train)
    predictions = pipeline_model.transform(test)
    
    auc = evaluator.evaluate(predictions)
    cm = predictions.groupBy("label").pivot("prediction").count().fillna(0).orderBy("label")
    results.append({'model': name, 'AUC': auc, 'cm': cm})    


In [47]:
results

[{'model': 'LogisticRegression',
  'AUC': 0.8351724137931038,
  'cm': DataFrame[label: double, 0.0: bigint, 1.0: bigint]},
 {'model': 'DecisionTree',
  'AUC': 0.702970822281167,
  'cm': DataFrame[label: double, 0.0: bigint, 1.0: bigint]}]

## Evaluating Models

**LogisticRegression — AUC = 0.835**
- Muy buen desempeño.
- El AUC (Área Bajo la Curva ROC) de 0.835 significa que, en promedio, el modelo tiene un 83.5% de probabilidad de asignar una puntuación de predicción más alta a una muestra positiva (vino de buena calidad) que a una negativa (vino de calidad estándar).
- Esto indica una fuerte capacidad de discriminación entre vinos buenos y regulares.

**DecisionTree — AUC = 0.703**
- Desempeño moderado.
- Un AUC de 0.703 todavía es aceptable, pero indica que el modelo separa las clases con menos eficacia que la regresión logística.
- Puede ser más susceptible a sobreajuste o menos capaz de capturar patrones generalizables con los parámetros por defecto.

### Confusion Matrix

In [48]:
for result in results:
    cm = result['cm']
    print('Confusion Matrix for', result['model'])
    cm.show()

Confusion Matrix for LogisticRegression
+-----+---+---+
|label|0.0|1.0|
+-----+---+---+
|  0.0|105| 25|
|  1.0| 37|108|
+-----+---+---+

Confusion Matrix for DecisionTree
+-----+---+---+
|label|0.0|1.0|
+-----+---+---+
|  0.0| 93| 37|
|  1.0| 40|105|
+-----+---+---+



## Conlusión

Logistic Regression no solo tiene mejor AUC, sino que también comete menos errores tanto de tipo I (falsos positivos) como de tipo II (falsos negativos). Es decir:

Clasifica mejor los vinos buenos sin castigar tanto con falsos positivos.