<h1>Práctica Final de Procesamiento en tiempo real - Opción 2</h1>
<p/>
<table border>
<tr><td>Alumno</td><td>email</td><td>Teléfono</td></ur>
<tr><td>José María Álvarez</td><td>josemaria.alvarezfernandez@elcorteingles.es</td><td>+34 682 780 953</td></ur>
<tr><td>Adolfo González</td><td>adolfo.gonzalez@elcorteingles.es</td><td>+34 609 964 414</td></ur>
<tr><td>César Colado</td><td>cesar.colado@elcorteingles.es</td><td>+34 661 415 555</td></ur>
</table>

Dado que en el ejercicio anterior de Spark usamos Spark Structured Streaming en vez de Spark Streaming simple, hemos intentado adaptar lo que se nos pedía para calcular el modelo en streaming y después calcular las predicciones también en streaming. Para ello se han creado dos scripts diferentes, uno que coge el dataset de vinos tintos (que en el anterior ejercicio se utilizó para el entrenamiento) y va sacando archivos csv con 5 registros cada segundo, y otro que coge el dataset de vinos blancos (que se utilizó para la predicción en el anterior) que reutiliza la lógica ya hecha, es decir, crear ficheros con un registro cada segundo.
El algoritmo aquí expuesto se basa pues es la carga de los ficheros de entrenamiento en streaming en una tabla sql que se llamará "training_set". Dicha tabla después se utilizará por Kmeans para cargar el modelo, que se utilizará posteriormente para las predicciones de los vinos blancos. Adicionalmente, y para mostrar las posibilidades, se ha decidido tener en cuenta solo los resultados de los últimos 10 minutos (con la instrucción withWatermark que se explica <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html">aqui</a>.

Los programas en python (*02-python_streaming_training.py* y *03-python_streaming_data.py*) cogen los datos de Internet de los vinos (http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv y http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv respectivamente), y lo van introduciendo en ficheros en un directorio (training y data respectivamente). En el caso del training, mete 5 filas cada segundo en un fichero diferente, y en el caso del entrenamiento, mete 1 fila cada segundo en un fichero diferente.

Primeramente, generamos el modelo a través de la tabla en streaming *traning_set*, con un watermark de 10 minutos para la obtención de los datos de entrenamiento que generarán el modelo.

In [1]:
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import time
schema = StructType([
    StructField("fixed acidity", DoubleType()),
    StructField("volatile acidity", DoubleType()), 
    StructField("citric acid", DoubleType()), 
    StructField("residual sugar", DoubleType()), 
    StructField("chlorides", DoubleType()), 
    StructField("free sulfur dioxide", DoubleType()), 
    StructField("total sulfur dioxide", DoubleType()), 
    StructField("density", DoubleType()), 
    StructField("pH", DoubleType()), 
    StructField("sulphates", DoubleType()), 
    StructField("alcohol", DoubleType()), 
    StructField("quality", IntegerType()),
    StructField("id", StringType())])
wineTraining = spark.readStream.option("sep", ";").option("header", "false") \
    .schema(schema) \
    .format("csv").load("./training")
vecAssembler = VectorAssembler(inputCols=["fixed acidity", "volatile acidity", "citric acid", "residual sugar", "chlorides", "free sulfur dioxide", "total sulfur dioxide", "density", "pH", "sulphates", "alcohol"], outputCol="features")
wineTraining_kmeans = vecAssembler.transform(wineTraining).select('id', 'features')
wineTraining_kmeans.writeStream.queryName("training_set").outputMode("append").format("memory").start()
training_set = spark.sql("select * from training_set").withWatermark("timestamp", "10 minutes").count()
#Para evitar que todavia este vacio cuando ejecutamos la parte de streaming de datos y no haya errores
while (training_set <= 0):
    time.sleep(1.0)
    training_set = spark.sql("select * from training_set").withWatermark("timestamp", "10 minutes").count()

Una vez nos aseguramos que tenemos datos para predecir, generamos el modelo, con los 11 centroides que vimos que eran óptimos en la práctica anterior.

In [2]:
kmeans_k = 11
kmeans = KMeans().setK(kmeans_k).setSeed(1).setFeaturesCol("features")
model = kmeans.fit(spark.sql("select * from training_set").withWatermark("timestamp", "10 minutes"))

Se ha hecho con una tabla debido a que el output de consola lo saca por la consola estandar de la sesión de pyspark, y no la consola del notebook, con lo cual los resultados que se obtenian no se veían aquí. Se genera el esquema a importar, y se cargan todos los ficheros csv del directorio, a medida que el script los va generando, y se van haciendo predicciones sobre los datos en streaming, añadiéndose a la tabla a medida que se generan (el output mode es append).

In [3]:
wineDF = spark.readStream.option("sep", ";").option("header", "false") \
    .schema(schema) \
    .format("csv").load("./data")
vecAssembler = VectorAssembler(inputCols=["fixed acidity", "volatile acidity", "citric acid", "residual sugar", "chlorides", "free sulfur dioxide", "total sulfur dioxide", "density", "pH", "sulphates", "alcohol"], outputCol="features")
wineDF_kmeans = vecAssembler.transform(wineDF).select('id', 'features')
predictions = model.transform(wineDF_kmeans).select('id', 'prediction')
query = predictions \
    .writeStream \
    .queryName("predictions_ml") \
    .outputMode("append") \
    .format("memory") \
    .start()

Después, para poder ver los resultados, hacemos durante 20 segundos una query a la tabla para ver como va creciendo con los ficheros procesados.

In [5]:
import time
i = 0
while (i < 20):
    time.sleep(1.0)
    spark.sql("select * from predictions_ml").show()
    i+=1

+------+----------+
|    id|prediction|
+------+----------+
|wine15|         1|
|wine16|        10|
|wine14|         1|
|wine18|         2|
|wine10|        10|
|wine11|         9|
|wine12|        10|
|wine17|         6|
| wine4|         1|
| wine5|         1|
|wine13|         2|
| wine3|         6|
| wine6|         6|
| wine7|         1|
| wine2|        10|
| wine9|        10|
| wine1|         1|
| wine8|         1|
|wine19|         1|
|wine20|         1|
+------+----------+
only showing top 20 rows

+------+----------+
|    id|prediction|
+------+----------+
|wine15|         1|
|wine16|        10|
|wine14|         1|
|wine18|         2|
|wine10|        10|
|wine11|         9|
|wine12|        10|
|wine17|         6|
| wine4|         1|
| wine5|         1|
|wine13|         2|
| wine3|         6|
| wine6|         6|
| wine7|         1|
| wine2|        10|
| wine9|        10|
| wine1|         1|
| wine8|         1|
|wine19|         1|
|wine20|         1|
+------+----------+
only showing t

+------+----------+
|    id|prediction|
+------+----------+
|wine15|         1|
|wine16|        10|
|wine14|         1|
|wine18|         2|
|wine10|        10|
|wine11|         9|
|wine12|        10|
|wine17|         6|
| wine4|         1|
| wine5|         1|
|wine13|         2|
| wine3|         6|
| wine6|         6|
| wine7|         1|
| wine2|        10|
| wine9|        10|
| wine1|         1|
| wine8|         1|
|wine19|         1|
|wine20|         1|
+------+----------+
only showing top 20 rows

+------+----------+
|    id|prediction|
+------+----------+
|wine15|         1|
|wine16|        10|
|wine14|         1|
|wine18|         2|
|wine10|        10|
|wine11|         9|
|wine12|        10|
|wine17|         6|
| wine4|         1|
| wine5|         1|
|wine13|         2|
| wine3|         6|
| wine6|         6|
| wine7|         1|
| wine2|        10|
| wine9|        10|
| wine1|         1|
| wine8|         1|
|wine19|         1|
|wine20|         1|
+------+----------+
only showing t