# Proyecto Big Data: Marco Ferraro
Profesor: Luis Alexander Calvo
### Módulo de Machine Learning

En esta sección del proyecto, vamos a generar una ingesta de datos sobre nuestro servidor de postgreSQL.
Vamos a pre procesar los datos utilizando pyspark y vamos a realizar predicciones sobre dos modelos:
* Un modelo de regresión lineal con multiples dimensiones
* Un random forrest

La idea es comparar el rendimiento de estos modelos sobre varias metricas

## 1. Ingesta de Datos

Vamos a iniciar una sesión de Spark 

In [1]:
from pyspark.sql import SparkSession
import findspark


findspark.init()

In [2]:
from pyspark.sql.types import (IntegerType, FloatType,
                               StructField, StructType)

spark = SparkSession \
    .builder \
    .appName("ML Solution") \
    .config("spark.driver.extraClassPath", "postgresql-42.2.14.jar") \
    .config("spark.executor.extraClassPath", "postgresql-42.2.14.jar") \
    .getOrCreate()

ps: unrecognized option: p
BusyBox v1.30.1 (2019-10-26 11:23:07 UTC) multi-call binary.

Usage: ps [-o COL1,COL2=HEADER]

Show list of processes

	-o COL1,COL2=HEADER	Select columns for display
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/07 20:47:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host.docker.internal:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", "gold_table") \
    .load()
df.show()

+------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|climate_date|           AVG_T2M|       AVG_T2M_MAX|       AVG_T2M_MIN|         Commodity|       AVG_Minimum|       AVG_Maximum|       AVG_Average|
+------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|     2013-09| 18.75804301075269|22.539913978494617|15.620290322580646|            Bakula|              90.0|             100.0|              95.0|
|     2013-12| 7.968272632674298|14.078792924037462| 4.012570239334026|    Cabbage(Local)|26.291666666666668|            31.125|28.708333333333332|
|     2014-05| 20.86644120707596|27.129656607700316| 14.89005202913632|    Papaya(Nepali)|49.130434782608695|              55.0| 52.06521739130435|
|     2014-06| 23.16148387096774| 28.63832258064516| 17.95374193548387|          Brocauli|             120.0| 13

Vemos que contamos con un df de casi 6 mil records.

In [4]:
df.count()

5985

Cambiamos el nombre de varias columnas por motivos de comprension.

In [5]:
df = df.select(
    df["climate_date"],
    df["Commodity"],
    df["AVG_T2M"].alias("AVG_TEMP"),
    df["AVG_T2M_MAX"].alias("AVG_MAX_TEMP"),
    df["AVG_T2M_MIN"].alias("AVG_MIN_TEMP"),
    df["AVG_Minimum"].alias("AVG_MIN_PRICE"),
    df["AVG_Maximum"].alias("AVG_MAX_PRICE"),
    df["AVG_Average"].alias("AVG_PRICE")
)

df.limit(1).show()

+------------+---------+-----------------+------------------+------------------+-------------+-------------+---------+
|climate_date|Commodity|         AVG_TEMP|      AVG_MAX_TEMP|      AVG_MIN_TEMP|AVG_MIN_PRICE|AVG_MAX_PRICE|AVG_PRICE|
+------------+---------+-----------------+------------------+------------------+-------------+-------------+---------+
|     2013-09|   Bakula|18.75804301075269|22.539913978494617|15.620290322580646|         90.0|        100.0|     95.0|
+------------+---------+-----------------+------------------+------------------+-------------+-------------+---------+



## 2. Preprocesamiento de Datos

Tenemos una columna llamada Commodity que representa los productos de venda en el mercado. Vamos a hacer una exploración sobre estos datos.

In [6]:
from pyspark.ml.feature import FeatureHasher
from pyspark.sql.functions import countDistinct

Vemos que contamos con 128 datos únicos.

In [7]:
product_count = df.agg(countDistinct("Commodity")).collect()[0][0]
product_count

128

Por la gran cantidad de valores categóricos, vamos a hacer un mapeo hashing para representar cada valor de forma única sin tratar de alterar las predicciones. El mayor contrapeso de esta técnica es que no se puede hacer un reverse hashing, pero no hay problema ya que solo nos vamos a enfocar en la eficiencia de los modelos.

In [8]:
hasher = FeatureHasher(inputCols=["Commodity"], outputCol="hashed_products", numFeatures=product_count)


hashed_df = hasher.transform(df)
hashed_df.select("hashed_products").show(truncate=False)

+-----------------+
|hashed_products  |
+-----------------+
|(128,[118],[1.0])|
|(128,[30],[1.0]) |
|(128,[0],[1.0])  |
|(128,[15],[1.0]) |
|(128,[84],[1.0]) |
|(128,[114],[1.0])|
|(128,[26],[1.0]) |
|(128,[102],[1.0])|
|(128,[6],[1.0])  |
|(128,[88],[1.0]) |
|(128,[53],[1.0]) |
|(128,[0],[1.0])  |
|(128,[58],[1.0]) |
|(128,[26],[1.0]) |
|(128,[102],[1.0])|
|(128,[119],[1.0])|
|(128,[105],[1.0])|
|(128,[33],[1.0]) |
|(128,[16],[1.0]) |
|(128,[86],[1.0]) |
+-----------------+
only showing top 20 rows



In [9]:
hashed_df = hashed_df.drop("Commodity")
hashed_df.show()

+------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+
|climate_date|          AVG_TEMP|      AVG_MAX_TEMP|      AVG_MIN_TEMP|     AVG_MIN_PRICE|     AVG_MAX_PRICE|         AVG_PRICE|  hashed_products|
+------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+
|     2013-09| 18.75804301075269|22.539913978494617|15.620290322580646|              90.0|             100.0|              95.0|(128,[118],[1.0])|
|     2013-12| 7.968272632674298|14.078792924037462| 4.012570239334026|26.291666666666668|            31.125|28.708333333333332| (128,[30],[1.0])|
|     2014-05| 20.86644120707596|27.129656607700316| 14.89005202913632|49.130434782608695|              55.0| 52.06521739130435|  (128,[0],[1.0])|
|     2014-06| 23.16148387096774| 28.63832258064516| 17.95374193548387|             120.0| 130.8695652173913|125.43478

In [10]:
X = hashed_df.select("climate_date", "hashed_products", "AVG_TEMP", "AVG_MAX_TEMP", "AVG_MIN_TEMP")
y = hashed_df.select("AVG_PRICE")

y.show()

+------------------+
|         AVG_PRICE|
+------------------+
|              95.0|
|28.708333333333332|
| 52.06521739130435|
|125.43478260869566|
| 49.54545454545455|
|45.708333333333336|
|             26.24|
|205.32258064516128|
| 83.38709677419355|
| 76.69354838709677|
|            30.425|
| 72.66129032258064|
|36.724137931034484|
| 41.60344827586207|
|             275.0|
| 82.83333333333333|
|             260.5|
| 43.80952380952381|
| 62.84090909090909|
| 22.67391304347826|
+------------------+
only showing top 20 rows



Vamos a separar la columna de fecha a una columna de año y otra de mes.

In [11]:
from pyspark.sql.functions import year, month


X = X.withColumn("year", year("climate_date")) \
       .withColumn("month", month("climate_date"))

X = X.drop("climate_date")

X.show()

+-----------------+------------------+------------------+------------------+----+-----+
|  hashed_products|          AVG_TEMP|      AVG_MAX_TEMP|      AVG_MIN_TEMP|year|month|
+-----------------+------------------+------------------+------------------+----+-----+
|(128,[118],[1.0])| 18.75804301075269|22.539913978494617|15.620290322580646|2013|    9|
| (128,[30],[1.0])| 7.968272632674298|14.078792924037462| 4.012570239334026|2013|   12|
|  (128,[0],[1.0])| 20.86644120707596|27.129656607700316| 14.89005202913632|2014|    5|
| (128,[15],[1.0])| 23.16148387096774| 28.63832258064516| 17.95374193548387|2014|    6|
| (128,[84],[1.0])|15.086690946930283|19.896191467221644| 11.35987513007284|2014|   10|
|(128,[114],[1.0])|15.086690946930283|19.896191467221644| 11.35987513007284|2014|   10|
| (128,[26],[1.0])| 9.691474654377881| 16.45479262672811| 4.602523041474655|2015|    2|
|(128,[102],[1.0])|20.828033298647245|24.606930280957336|17.552726326742974|2015|    7|
|  (128,[6],[1.0])|20.2693236212

Una vez segregado, vamos a hacer un escalamiento de datos. Acá buscamos que para cada feature, la media sea cercana a cero y la desviación éstandar tenga un valor de 1.

In [12]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

feature_columns = ["AVG_TEMP", "AVG_MAX_TEMP", "AVG_MIN_TEMP", "year", "month"]

vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
X_assembled = vector_assembler.transform(X)

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(X_assembled)
X_scaled = scaler_model.transform(X_assembled)

max_length = len(feature_columns)

for i in range(max_length):
    col_name = f"feature_{i + 1}"
    extract_feature_udf = udf(lambda x: float(x[i]), DoubleType())
    X_scaled = X_scaled.withColumn(col_name, extract_feature_udf("scaled_features"))

X_scaled = X_scaled.drop("features", "scaled_features", "AVG_TEMP", "AVG_MAX_TEMP", "AVG_MIN_TEMP", "year", "month")

X_scaled.show()


[Stage 18:>                                                         (0 + 1) / 1]

+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  hashed_products|           feature_1|           feature_2|           feature_3|           feature_4|           feature_5|
+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|(128,[118],[1.0])|  0.6086600420501682|  0.3189700695422222|  0.7977327175764956| -1.6320227077331853|  0.6695934116039529|
| (128,[30],[1.0])| -1.4780452953662864| -1.5894818423247634| -1.3170124234967804| -1.6320227077331853|  1.5400357592255112|
|  (128,[0],[1.0])|  1.0164171689864085|  1.3542115037811033|  0.6646947159868585| -1.1145858318644635| -0.4909963852247915|
| (128,[15],[1.0])|   1.460270708823682|  1.6944993263880777|  1.2228511236826922| -1.1145858318644635|-0.20084893601760537|
| (128,[84],[1.0])|-0.10136711132448283| -0.2773359339544261|0.021551670592574686| -1.1145858318644635|  0.9597408608111391|


                                                                                

Posteriormente, vamos a separar los datasets en entrenamiento y validación.

In [13]:
X_train, X_test = X_scaled.randomSplit([0.8, 0.2], seed=123)
y_train, y_test = y.randomSplit([0.8, 0.2], seed=123)

X_train.count()

4782

In [14]:
y_train.show()

+------------------+
|         AVG_PRICE|
+------------------+
|               5.5|
| 6.947368421052632|
|               9.0|
| 9.366666666666667|
| 9.826086956521738|
|10.790322580645162|
|10.866666666666667|
|10.935483870967742|
|11.071428571428571|
|11.113636363636363|
|11.433333333333334|
|11.545454545454545|
|11.595238095238095|
|              11.6|
|11.689655172413794|
|              12.0|
| 12.14516129032258|
|             12.18|
|12.296296296296296|
|12.466666666666667|
+------------------+
only showing top 20 rows



## 3. Modelos y Análisis de Resultados

### 3.1 Modelo de Regresión Lineal

Primero, vamos a utilizar un modelo de regresión lineal para predecir el precio optimo de cada material, dependiendo de las features. Por el tipo de implementación en pyspark, tenemos que juntar los datasets bajo un mismo indice para entrenarlo

In [15]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator


X_train_lr = X_train.withColumn("index", monotonically_increasing_id())
y_train_lr = y_train.withColumn("index", monotonically_increasing_id())

X_test_lr = X_test.withColumn("index", monotonically_increasing_id())
y_test_lr = y_test.withColumn("index", monotonically_increasing_id())

combined_train_df = X_train_lr.join(y_train_lr.select("AVG_PRICE", "index"), on="index").drop("index")
combined_test_df = X_test_lr.join(y_test_lr.select("AVG_PRICE", "index"), on="index").drop("index")

feature_columns = ["hashed_products", "feature_1", "feature_2", "feature_3", "feature_4", "feature_5"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

combined_train_df = assembler.transform(combined_train_df)
combined_test_df = assembler.transform(combined_test_df)

lr = LinearRegression(featuresCol="features", labelCol="AVG_PRICE")

lr_model = lr.fit(combined_train_df)

predictions = lr_model.transform(combined_test_df)


predictions.show()

24/01/07 20:47:52 WARN Instrumentation: [89c6c972] regParam is zero, which might cause numerical instability and overfitting.
24/01/07 20:47:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/01/07 20:47:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/01/07 20:47:53 WARN Instrumentation: [89c6c972] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
24/01/07 20:47:53 ERROR LBFGS: Failure! Resetting history: breeze.optimize.FirstOrderException: Line search zoom failed
24/01/07 20:47:53 ERROR LBFGS: Failure again! Giving up and returning. Maybe the objective is just poorly behaved?


+-----------------+-------------------+-------------------+--------------------+--------------------+--------------------+------------------+--------------------+------------------+
|  hashed_products|          feature_1|          feature_2|           feature_3|           feature_4|           feature_5|         AVG_PRICE|            features|        prediction|
+-----------------+-------------------+-------------------+--------------------+--------------------+--------------------+------------------+--------------------+------------------+
|  (128,[0],[1.0])| 0.8872781222813996| 0.4903458906664576|   1.171284795840241|  0.4377247957417018|  0.3794459623967668|19.041666666666668|(133,[0,128,129,1...|18.605957759265166|
|  (128,[0],[1.0])| 0.9877840015168924| 1.2255322406241784|   0.739869938749024|  0.4377247957417018| -0.4909963852247915|19.596774193548388|(133,[0,128,129,1...|19.016145889727667|
| (128,[39],[1.0])| 1.0207196572610788| 0.9110288554319433|  1.0764924305426848| -1.632022

Posteriomente usamos un evaluador para analizar las predicciones.

In [16]:
evaluator = RegressionEvaluator(labelCol="AVG_PRICE", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

mse_evaluator = RegressionEvaluator(labelCol="AVG_PRICE", predictionCol="prediction", metricName="mse")
mse = mse_evaluator.evaluate(predictions)
print(f"Mean Squared Error (MSE) on test data: {mse}")

mae_evaluator = RegressionEvaluator(labelCol="AVG_PRICE", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(predictions)
print(f"Mean Absolute Error (MAE) on test data: {mae}")

mse_evaluator = RegressionEvaluator(labelCol="AVG_PRICE", predictionCol="prediction", metricName="mse")
mse = mse_evaluator.evaluate(predictions)
print(f"Mean Squared Error (MSE) on test data: {mse}")

r2_evaluator = RegressionEvaluator(labelCol="AVG_PRICE", predictionCol="prediction", metricName="r2")
r2 = r2_evaluator.evaluate(predictions)
print(f"R-Squared (R²) on test data: {r2}")

Root Mean Squared Error (RMSE) on test data: 8.245760434257217
Mean Squared Error (MSE) on test data: 67.99256513916177
Mean Absolute Error (MAE) on test data: 2.5362482466187077
Mean Squared Error (MSE) on test data: 67.99256513916177
R-Squared (R²) on test data: 0.9878486384510242


Podemos ver que, el modelo mantiene un RMSE bajo, de igual forma presenta un R-Squared bajo.

### 3.2 Modelo de Regresión Random Forrest

Igualmente, como segundo modelo, vamos a usar un algoritmo regresivo de Random Forrest para realizar predicciones. Vamos a usar un random forrest con 150 arboles de clasificación.

In [25]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol="features", labelCol="AVG_PRICE", numTrees=150)

rf_model = rf.fit(combined_train_df)

rf_predictions = rf_model.transform(combined_test_df)

rf_predictions.show()

24/01/07 20:49:28 WARN DAGScheduler: Broadcasting large task binary with size 1230.4 KiB
24/01/07 20:49:32 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/01/07 20:49:36 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB
24/01/07 20:49:41 WARN DAGScheduler: Broadcasting large task binary with size 7.8 MiB
24/01/07 20:49:46 WARN DAGScheduler: Broadcasting large task binary with size 1509.8 KiB
                                                                                

+-----------------+-------------------+-------------------+--------------------+--------------------+--------------------+------------------+--------------------+------------------+
|  hashed_products|          feature_1|          feature_2|           feature_3|           feature_4|           feature_5|         AVG_PRICE|            features|        prediction|
+-----------------+-------------------+-------------------+--------------------+--------------------+--------------------+------------------+--------------------+------------------+
|  (128,[0],[1.0])| 0.8872781222813996| 0.4903458906664576|   1.171284795840241|  0.4377247957417018|  0.3794459623967668|19.041666666666668|(133,[0,128,129,1...| 65.95601484112309|
|  (128,[0],[1.0])| 0.9877840015168924| 1.2255322406241784|   0.739869938749024|  0.4377247957417018| -0.4909963852247915|19.596774193548388|(133,[0,128,129,1...| 65.96999183717865|
| (128,[39],[1.0])| 1.0207196572610788| 0.9110288554319433|  1.0764924305426848| -1.632022

In [26]:
rf_evaluator = RegressionEvaluator(labelCol="AVG_PRICE", predictionCol="prediction", metricName="rmse")
rf_rmse = rf_evaluator.evaluate(rf_predictions)
print(f"Root Mean Squared Error (RMSE) on test data for Random Forest: {rf_rmse}")

rf_mse_evaluator = RegressionEvaluator(labelCol="AVG_PRICE", predictionCol="prediction", metricName="mse")
rf_mse = rf_mse_evaluator.evaluate(rf_predictions)
print(f"Mean Squared Error (MSE) on test data for Random Forest: {rf_mse}")

rf_mae_evaluator = RegressionEvaluator(labelCol="AVG_PRICE", predictionCol="prediction", metricName="mae")
rf_mae = rf_mae_evaluator.evaluate(rf_predictions)
print(f"Mean Absolute Error (MAE) on test data for Random Forest: {rf_mae}")

rf_r2_evaluator = RegressionEvaluator(labelCol="AVG_PRICE", predictionCol="prediction", metricName="r2")
rf_r2 = rf_r2_evaluator.evaluate(rf_predictions)
print(f"R-Squared (R²) on test data for Random Forest: {rf_r2}")

Root Mean Squared Error (RMSE) on test data for Random Forest: 41.262625106216795
Mean Squared Error (MSE) on test data for Random Forest: 1702.6042306561928
Mean Absolute Error (MAE) on test data for Random Forest: 34.22545413763678
R-Squared (R²) on test data for Random Forest: 0.6957173252814526


Asimismo, podemos ver que el rendimiento es más alto que el modelo de regresión lineal.

## 4. Análisis de Resultados


1. **Errores de Precisión:**
   - El modelo de regresión lineal muestra un RMSE (Root Mean Squared Error) de aproximadamente 8.25, lo que indica una buena precisión en la predicción, mientras que el Random Forest tiene un RMSE de alrededor de 41.26, lo que significa que, en promedio, sus predicciones están más alejadas de los valores reales en comparación con el modelo lineal.
   
   - El MAE (Mean Absolute Error) también es mucho más bajo en el modelo de regresión lineal (2.53) en comparación con el Random Forest (34.22), lo que sugiere que el modelo lineal está haciendo predicciones más precisas en promedio.

2. **R-Cuadrado (R²):**
   - El valor de R² para el modelo de regresión lineal es muy alto (0.988), lo que indica que el modelo explica aproximadamente el 98.8% de la variabilidad en los datos. Por otro lado, el valor de R² para Random Forest es 0.696, lo que significa que el modelo explica aproximadamente el 69.6% de la variabilidad en los datos. Un R² más alto sugiere un mejor ajuste del modelo a los datos.

### Diferencias de los modelos:

1. **Complejidad del Modelo:**
   - El modelo de regresión lineal es un modelo paramétrico que asume una relación lineal entre las variables predictoras y la variable de respuesta. Si los datos realmente siguen una relación lineal, este modelo funcionará bien, como se evidencia por los altos valores de R² y los errores bajos.
   
   - Por otro lado, el Random Forest es un modelo no paramétrico y se basa en árboles de decisión. Puede capturar relaciones no lineales entre las variables, lo que puede ser una ventaja cuando hay relaciones complejas en los datos. Sin embargo, también puede llevar a un sobreajuste cuando el modelo es demasiado complejo para el conjunto de datos.

2. **Interpretación:**
   - El modelo de regresión lineal es más interpretable, ya que podemos identificar el impacto individual de cada predictor en la variable de respuesta.
   
   - El Random Forest es menos interpretable en comparación, ya que se basa en múltiples árboles de decisión y no es fácil identificar cómo cada predictor afecta la predicción.

### Siguientes Pasos:

1. **Optimización de Hiperparámetros para Random Forest:**
   - Dado que Random Forest tiene un rendimiento inferior en comparación con la regresión lineal en este caso, puedes intentar mejorar su rendimiento ajustando sus hiperparámetros. Algunos de los hiperparámetros que puedes ajustar incluyen:
     - Número de árboles en el bosque (`n_estimators`).
     - Profundidad máxima de los árboles (`max_depth`).
     - Número mínimo de muestras requeridas para dividir un nodo (`min_samples_split`).
     - Número mínimo de muestras requeridas en cada hoja del árbol (`min_samples_leaf`).
     - Máximo número de características a considerar para dividir un nodo (`max_features`).

2. **Validación Cruzada:**
   - Realiza validación cruzada para evaluar el rendimiento del modelo Random Forest en diferentes subconjuntos de datos y asegurarte de que el modelo no esté sobreajustando o subajustando los datos.

3. **Feature Engineering:**
   - Considera realizar ingeniería de características para mejorar la calidad de los datos y potencialmente mejorar el rendimiento del modelo Random Forest.

En resumen, aunque Random Forest es un modelo más flexible y puede capturar relaciones no lineales, en este caso particular, el modelo de regresión lineal muestra un rendimiento superior en términos de precisión y ajuste a los datos. Sin embargo, aún se puede intentar mejorar el rendimiento de Random Forest ajustando sus hiperparámetros y realizando validación cruzada.