# Proyecto Final
### Machine Learning con datos en PostgreSQL
Luis Morera / Erick Castillo

Primero se inicia sesión en pyspark

In [27]:
import findspark
findspark.init('/Users/Luis/Desktop/spark/spark-2.4.4-bin-hadoop2.7')

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Basic JDBC pipeline") \
    .config("spark.jars", "./postgresql-42.2.9.jar") \
    .getOrCreate()


Se importan las librerias necesarias para ejecutar este notebook

In [60]:
# Se importan las librerias para normalizar
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import Normalizer
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Se lee el set de datos como un dataframe de spark para posteriormente agregarlo a postgreSQL

In [29]:
df=spark.read.option("delimiter", ";").csv('winequality-white.csv',inferSchema=True,header=True)

Se escriben los datos dentro de postgresql

In [98]:
df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/FinalProject") \
    .option("user", "postgres") \
    .option("password", "password") \
    .option("dbtable", "WineQuality") \
    .mode('append') \
    .save()

Una vez que se tienen los datos dentro de PostgreSQL, se hace lectura de los mismos y se almacenan dentro de un dataframe de spark

In [30]:
# Reading single DataFrame in Spark by retrieving all rows from a DB table.
data = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/FinalProject") \
    .option("user", "postgres") \
    .option("password", "password") \
    .option("dbtable", "winequality") \
    .load() 

Se puede observar que si se intenta visualizar los datos como un dataframe de spark, se dificulta la lectura de los datos, por lo que se procede a transformar los datos a un dataframe de Pandas

In [31]:
data.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.0|            0.27|       0.36|          20.7|    0.045|               45.0|               170.0|  1.001| 3.0|     0.45|    8.8|      6|
|          6.3|             0.3|       0.34|           1.6|    0.049|               14.0|               132.0|  0.994| 3.3|     0.49|    9.5|      6|
|          8.1|            0.28|        0.4|           6.9|     0.05|               30.0|                97.0| 0.9951|3.26|     0.44|   10.1|      6|
|          7.2|            0.23|       0.32|           8.5|    0.058|               47.0|           

In [32]:
data = data.withColumnRenamed("residual sugar", "residual_sugar").withColumnRenamed("free sulfur dioxide", "free_sulfur_dioxide").withColumnRenamed("total sulfur dioxide", "total_sulfur_dioxide").withColumnRenamed("fixed acidity", "fixed_acidity").withColumnRenamed("citric acid", "citric_acid").withColumnRenamed("volatile acidity", "volatile_acidity")
data.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.0|            0.27|       0.36|          20.7|    0.045|               45.0|               170.0|  1.001| 3.0|     0.45|    8.8|      6|
|          6.3|             0.3|       0.34|           1.6|    0.049|               14.0|               132.0|  0.994| 3.3|     0.49|    9.5|      6|
|          8.1|            0.28|        0.4|           6.9|     0.05|               30.0|                97.0| 0.9951|3.26|     0.44|   10.1|      6|
|          7.2|            0.23|       0.32|           8.5|    0.058|               47.0|           

Se puede observar que los datos son desplegados de una forma más ordenada y clara para empezar a analizar los datos

In [33]:
data.toPandas().head(10)

Unnamed: 0,fixed_acidity,volatile_acidity,citric_acid,residual_sugar,chlorides,free_sulfur_dioxide,total_sulfur_dioxide,density,pH,sulphates,alcohol,quality
0,7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8,6
1,6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5,6
2,8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1,6
3,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6
4,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6
5,8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1,6
6,6.2,0.32,0.16,7.0,0.045,30.0,136.0,0.9949,3.18,0.47,9.6,6
7,7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8,6
8,6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5,6
9,8.1,0.22,0.43,1.5,0.044,28.0,129.0,0.9938,3.22,0.45,11.0,6


Se empieza por ver cuantas instancias se tienen en el set de datos

In [34]:
data.count()

24490

Probando si hay datos faltantes en el data set y desplegando la informacion como un dataframe de pandas

In [35]:
data.select([count(when(isnan(c), c)).alias(c) for c in data.columns]).toPandas()

Unnamed: 0,fixed_acidity,volatile_acidity,citric_acid,residual_sugar,chlorides,free_sulfur_dioxide,total_sulfur_dioxide,density,pH,sulphates,alcohol,quality
0,0,0,0,0,0,0,0,0,0,0,0,0


Agrupando los datos por calidad del vino se puede observar que la mayoría de los datos se encuentran agrupados en los valores "centrales", es decir, 5, 6 y 7.

In [36]:
quality = data.groupBy('quality').count().show()

+-------+-----+
|quality|count|
+-------+-----+
|      6|10990|
|      3|  100|
|      5| 7285|
|      9|   25|
|      4|  815|
|      8|  875|
|      7| 4400|
+-------+-----+



De los datos, las columnas 'residual_sugar', 'free_sulfur_dioxide' y 'total_sulfur_dioxide' son de mayor magnitud comparado a los otros atributos. En este proyecto no se van a normalizar los datos dado que se va a utilizar el algoritmo Random Forest para predecir el resultado y este toma cada atributo de manera independiente, es decir, que al no haber comparación entre atributos no hace falta tenerlos en la misma escala.

Ahora se separan las columnas en features y label

In [37]:
data.columns

['fixed_acidity',
 'volatile_acidity',
 'citric_acid',
 'residual_sugar',
 'chlorides',
 'free_sulfur_dioxide',
 'total_sulfur_dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [38]:
df_assembler = VectorAssembler(inputCols=['fixed_acidity',
 'volatile_acidity',
 'citric_acid',
 'residual_sugar',
 'chlorides',
 'free_sulfur_dioxide',
 'total_sulfur_dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol'], outputCol="features")
df = df_assembler.transform(data)

In [39]:
df.printSchema()

root
 |-- fixed_acidity: double (nullable = true)
 |-- volatile_acidity: double (nullable = true)
 |-- citric_acid: double (nullable = true)
 |-- residual_sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free_sulfur_dioxide: double (nullable = true)
 |-- total_sulfur_dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)
 |-- features: vector (nullable = true)



In [45]:
data = df.withColumnRenamed("quality", "label")

In [47]:
data.select(['features','label']).show(20,False)

+----------------------------------------------------------+-----+
|features                                                  |label|
+----------------------------------------------------------+-----+
|[7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8]  |6    |
|[6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5]    |6    |
|[8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1]   |6    |
|[7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9]  |6    |
|[7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9]  |6    |
|[8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1]   |6    |
|[6.2,0.32,0.16,7.0,0.045,30.0,136.0,0.9949,3.18,0.47,9.6] |6    |
|[7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8]  |6    |
|[6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5]    |6    |
|[8.1,0.22,0.43,1.5,0.044,28.0,129.0,0.9938,3.22,0.45,11.0]|6    |
|[8.1,0.27,0.41,1.45,0.033,11.0,63.0,0.9908,2.99,0.56,12.0]|5    |
|[8.6,0.23,0.4,4.2,0.035,17.0,109.0,0.9947,3.14,0.53,9.7]  |5 

Ahora se procede a contruir el modelo

In [48]:
model_df = data.select(['features','label'])

Se divide el set de datos en entrenamiento y prueba, con porcentajes 80-20 respectivamente

In [49]:
training, test = model_df.randomSplit([0.8,0.2])

El set de entrenamiento y de prueba se encuentran en buenas proporciones en comparación al set de datos original por lo tanto no se va a modificar

In [50]:
training.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    6| 8777|
|    3|   84|
|    5| 5838|
|    9|   23|
|    4|  652|
|    8|  709|
|    7| 3556|
+-----+-----+



In [51]:
test.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    6| 2213|
|    3|   16|
|    5| 1447|
|    9|    2|
|    4|  163|
|    8|  166|
|    7|  844|
+-----+-----+



Se procede a entrenar el modelo

In [52]:
rf = RandomForestClassifier()
rf_model = rf.fit(training)

In [53]:
predictions = rf_model.transform(test)

Se prueba con distintos valores para obtener el mejor modelo con un evaluador multiclase

In [62]:
evaluator = MulticlassClassificationEvaluator()

rf = RandomForestClassifier()
paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [5,15])
             .addGrid(rf.maxBins, [10,25])
             .addGrid(rf.numTrees, [10,30])
             .build())
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cv_model = cv.fit(training)

Se obtiene el mejor modelo

In [63]:
best_model = cv_model.bestModel

Se obtienen las predicciones

In [76]:
train_output_df = best_model.transform(training)
test_output_df = best_model.transform(test)

Mostrando algunas de las predicciones sobre el set de testing hechas por el modelo

In [77]:
test_output_df.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[3.8,0.31,0.02,11...|    6|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       6.0|
|[3.9,0.225,0.4,4....|    8|[0.0,0.0,0.0,1.07...|[0.0,0.0,0.0,0.03...|       8.0|
|[3.9,0.225,0.4,4....|    8|[0.0,0.0,0.0,1.07...|[0.0,0.0,0.0,0.03...|       8.0|
|[3.9,0.225,0.4,4....|    8|[0.0,0.0,0.0,1.07...|[0.0,0.0,0.0,0.03...|       8.0|
|[4.2,0.17,0.36,1....|    7|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       7.0|
|[4.2,0.215,0.23,5...|    3|[0.0,0.0,0.0,28.6...|[0.0,0.0,0.0,0.95...|       3.0|
|[4.4,0.32,0.39,4....|    8|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       8.0|
|[4.7,0.145,0.29,1...|    6|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       6.0|
|[4.7,0.335,0.14,1...|    5|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
|[4.7,0.335,0.14

De las pruebas hechas, se obtuvieron excelentes resultados, mostrando que el algoritmo es muy preciso a la hora de predecir el tipo de calidad del vino

In [79]:
train_predictionAndLabels = train_output_df.select("prediction", "label")
test_predictionAndLabels = test_output_df.select("prediction", "label")

metrics = ['weightedPrecision', 'weightedRecall', 'accuracy']

for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(metricName=metric)
    print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_predictionAndLabels)))
    print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_predictionAndLabels)))

Train weightedPrecision = 0.9988299485632541
Test weightedPrecision = 0.9975306620435604
Train weightedRecall = 0.9988288609399665
Test weightedRecall = 0.9975262832405689
Train accuracy = 0.9988288609399664
Test accuracy = 0.9975262832405689
