# Introducción

## Operaciones estadísticas: Correlación

Podemos computar la correlación entre las columnas de nuestro dataframe.

Primero generaremos un dataframe de pruebas.

In [None]:
data = [[1.0, 0.0, 0.0, -2.0],
        [2.0, 5.0, 0.0, 3.0],
        [3.0, 7.0, 0.0, 8.0],
        [4.0, 0.0, 0.0, 1.0],
       ]

df = spark.createDataFrame(data) 
df.show()

Este dataframe no es válido para poderse utilizar con MLlib, ya que no está vectorizado.

Para vectorizarlo podemos utilizar transformadores que veremos más adelante.

Lo definiremos en modo vectorizado usando vectores sparse y dense conforme corresponda.


In [None]:
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
        (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
        (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
        (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]

df = spark.createDataFrame(data, ["features"])
df.show()

Ahora podemos lanzar el método Correlation de MLlib sin problemas, especificando la columna del dataframe donde están los vectores.

In [None]:
from pyspark.ml.stat import Correlation

r1 = Correlation.corr(df, "features").head()

print("Pearson correlation matrix:\n" + str(r1[0]))

Vamos a testear el mismo método con un dataframe algo más potente, generado aleatoriamente.

In [None]:
import random

a = [ random.random() for x in range(1, 1000)]
b = [ x * 2 for x in a ]

dataList = [ [a[i], b[i]] for i in range(0, len(a)) ]
dataML = [ (Vectors.dense([a[i], b[i]]),) for i in range(0, len(a))]

dfList = spark.createDataFrame(dataList, ["features"])
dfVectors = spark.createDataFrame(dataML, ["features"])

display(dataList)

In [None]:
r1 = Correlation.corr(dfVectors, "features").head()

print("Pearson correlation matrix:\n" + str(r1[0]))

## Regresión Lineal

Podemos encontrar varios problemas de regresión en los ejemplos que propone databricks.

Utilizaremos un problema que plantea la estimación de éxito de un servicio de alquiler de bicicletas.
Esta es la descripción oficial que nos ofrece el repositorio de datos UCI

#### Data description

**Feature columns:**
- dteday: date
- season: season (1:spring, 2:summer, 3:fall, 4:winter)
- yr: year (0:2011, 1:2012)
- mnth: month (1 to 12)
- hr: hour (0 to 23)
- holiday: whether day is holiday or not
- weekday: day of the week
- workingday: if day is neither weekend nor holiday is 1, otherwise is 0.
- weathersit:
  - 1: Clear, Few clouds, Partly cloudy, Partly cloudy
  - 2: Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist
  - 3: Light Snow, Light Rain + Thunderstorm + Scattered clouds, Light Rain + Scattered clouds
  - 4: Heavy Rain + Ice Pallets + Thunderstorm + Mist, Snow + Fog
- temp: Normalized temperature in Celsius. The values are derived via `(t-t_min)/(t_max-t_min)`, `t_min=-8`, `t_max=+39` (only in hourly scale)
- atemp: Normalized feeling temperature in Celsius. The values are derived via `(t-t_min)/(t_max-t_min)`, `t_min=-16`, `t_max=+50` (only in hourly scale)
- hum: Normalized humidity. The values are divided to 100 (max)
- windspeed: Normalized wind speed. The values are divided to 67 (max)

**Label columns:**
- casual: count of casual users
- registered: count of registered users
- cnt: count of total rental bikes including both casual and registered

**Extraneous columns:**
- instant: record index

For example, the first row is a record of hour 0 on January 1, 2011---and apparently 16 people rented bikes around midnight!",
		"subtype": "command",
		"position": 13.0,
		"command": "%md


In [None]:
df = spark.read.csv("/opt/spark-data/bike-sharing.csv", header=True)

df.cache()

df.show(10)

df.count()

### Preprocesamiento de datos

Aunque es un dataset preparado para ML y viene bastante limpio, debemos realizar algunas modificaciones antes de aprender un modelo.

Nuestro objetivo es predecir la variable `cnt`. ¿Podemos utilizar todas las variables del problema?

- Problema 1: La variable `cnt` se calcula como la suma `casual + registered`. Estas dos variables, al igual que `cnt`, nunca estarán disponibles cuando recibamos nuevos datos, así que es fundamental no incluirlas en nuestros modelos. De hacerlo estaríamos incurriendo en lo que se conoce como una *Fuga de datos* o *data leak*.

- Problema 2: La variable `dteday` es incómoda de representar para los algoritmos (que tipo de distribución tiene), y ha sido previamente separada en categorías más sencillas como el mes, la hora y el día de la semana. Es importante destacar que estas variables ya vienen en formato numérico o binario para que los algoritmos las traten correctamente, algo que nos agiliza enormemente el trabajo.

- Problema 3: El identificador `instant` no tiene ningún valor para nuestro problema y podría suponer un problema para algunos modelos incluso.

Con este pequeño análisis concluimos que es necesario eliminar estas columnas de nuestro dataset

In [None]:
dfClean = df.drop("instant").drop("dteday").drop("casual").drop("registered")

Como ya hemos comentado, las variables has sido correctamente preprocesadas en un formato compatible con nuestros algoritmos de ML. No obstante, al haberlo leído desde un fichero CSV *que no almacena el tipo* de las variables es probable que dichos tipos se hayan perdido.

Vamos a comprobarlo leyendo el esquema de los datos.

In [None]:
dfClean.printSchema()

El resultado que podemos observar es que todas las variables son de tipo string, por ello es importante que las convirtamos a tipo numérico.

In [None]:
from pyspark.sql.functions import col

dfCasted = df.select([ col(c).cast("double") for c in dfClean.columns ])

dfCasted.printSchema()

### Obtener una muestra de training y test

Antes de aprender el modelo debemos reservar algunos datos para su evaluación, asegurando así que el test que realicemos se hará sobre datos que el modelo nunca ha podido observar.

In [None]:
# Dividimos el dataset en 70% para training y 30% para testing.

dfTrain, dfTest = dfCasted.randomSplit([0.7, 0.3], seed=1234)
print("Tenemos %d filas de training y %d filas de test." % (dfTrain.count(), dfTest.count()))

### Visualización

Ahora que tenemos los datos preparados, vamos a observar el dataset para determinar la potencia discriminativa de las variables que tenemos a nuestra disposición y hacernos una idea de cómo debería funcionar nuestro algoritmo.

In [None]:
dfTrain.show()

### Entrenamiento de un modelo de regresión

Antes de poder lanzar el algoritmo de aprendizaje en sí, hemos de convertir el dataframe en un set de datos compatible como MLlib. Para ello debemos reducir nuestros datos a dos columnas (`features` y `label`).

Para ello debemos definir y utilizar algunos de los **transformadores** que pone a nuestra disposición la librería.

- `VectorAssembler`: Combina una serie de columnas en una única columna representada por vectores
- `VectorIndexer`: Identifica las columnas de un vector que deben ser tratadas como categorías o como variables contínuas. Para ello utiliza un algoritmo heurístico que explora los valores de la columna.

In [None]:
from pyspark.ml.feature import VectorAssembler

# We select the appropiate columns as features

featuresCols = dfTrain.columns
featuresCols.remove('cnt')

# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".

vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")

Los transformadores y estimadores tienen una sintaxis muy similar, para que podamos conectarlos unos con otros.

- `inputCol`/`inputCols` La columna o columnas que recibe la función, debe existir en el dataframe.
- `outputCol` La columna donde la función depositará su resultado en el dataframe resultante. Es importante poner nombres representativos para no cometer errores al encadenar métodos.
- Parámetros/Hyperparámetros: En función de si la función es del tipo transformador o estimador tendremos uno o ambos conjuntos de opciones para parametrizar el proceso de aprendizaje o de transformación de datos. Este es un concepto fundamental en Machine Learning ya que condicionará lo bien que funcionen nuestros algoritmos.

Además de estos conceptos básicos, las funciones de transformación tienen otra serie de métodos muy útiles

In [None]:
# Shows its actual parametrization

vectorAssembler.explainParams()

In [None]:
# Returns the name of input and output columns, useful for chaining methods programatically

print( vectorAssembler.getInputCols() )
print( vectorAssembler.getOutputCol() )

# La función `transform` es la más importante de todas
# Transformará nuestro dataframe, añadiendo la columna requerida.

dfTrainAssembled = vectorAssembler.transform(dfTrain)
dfTrainAssembled.show()

A continuación realizaremos la indexación de las variables categóricas. Para ello crearemos un estimador del tipo `VectorIndexer`.

**¿Por qué esta función es del tipo estimator?**

`VectorIndexer` necesita procesar nuestros datos previamente a poder transformarlos, para determinar qué atributos son del tipo categórico. Para ello ha de recopilar estadísticos y almacenar esta información en un **modelo**. Éste modelo que será del tipo `VectorIndexerModel` tendrá el método transform que nos dejará procesar nuestros datos.

Para entrenar un estimador se utiliza el método `fit` que recibe un dataframe de entrada.

**Importante!!** Solo se debe llamar a fit con los datos de *entrenamiento*, despues se podrán transformar con el mismo modelo los datos de entrenamiento y test, de lo contrario estaríamos introduciendo información del test en el modelo.

In [None]:
from pyspark.ml.feature import VectorIndexer

# This identifies categorical features and indexes them.

vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

vectorIndexerModel = vectorIndexer.fit(dfTrainAssembled)

dfTrainIndexed = vectorIndexerModel.transform(dfTrainAssembled)

dfTrainIndexed.show()

Ahora que hemos transformado nuestros datos al formato correcto, vamos a proseguir con el entrenamiento del modelo.

Comenzaremos con uno de los algoritmos más sencillos, la **regresión lineal**.

Este algoritmo está disponible bajo el paquete `LinearRegression`. Como ya podéis imaginar, esta clase es un **estimador** que debemos **entrenar** para poder obtener nuestro modelo. Para ello debemos proporcionarle los datos de entrenamiento e indicar las columnas que queremos utilizar como ´features´y como ´label´.

Además de especificar los datos de entrada también podremos configurar el algoritmo de aprendizaje ajustando los distintos hyperparámetros, lo que tendrá un impacto importante en el resultado que obtengamos. Para el caso de la regresión tenemos un gran número de parámetros como por ejemplo:

- `maxIter` Número de iteraciones a realizar
- `regParam` Parámetro que indica el tipo de regularización (ninguna, L1, L2...)

In [None]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol="cnt", featuresCol="features", maxIter=20, regParam=0.0)

lr.explainParam('maxIter')

# Una vez creado el algoritmo de aprendizaje podemos entrenar un modelo pasándole los datos de entrada.

lrModel = lr.fit(dfTrainIndexed)

# Ya hemos aprendido un modelo de Machine Learning con el que podemos realizar predicciones.
# En muchos casos es interesante echarle un vistazo a los parámetros del modelo. 

# En el caso de una regresión lineal los parámetros a consultar serían los coeficientes 
# y el término independiente (intercept).

print("Coefficients: %s" % (str(lrModel.coefficients.toArray())))
print("Intercept: %f" % (lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Podemos utilizar nuestro modelo para obtener predicciones sobre nuestras instancias en una nueva columna. De este modo podemos analizar el resultado obtenido mediante métricas de evaluación y visualización.

In [None]:
dfTrainPredicctions = lrModel.transform(dfTrainIndexed)

dfTrainPredicctions.show()

Aunque la visualización puede darnos una buena muestra de cómo ha funcionado nuestro algoritmo, en la mayoría de ocasiones necesitamos poder obtener una métrica con interpretación estadística, que nos permita comparar cuantitativamente el resultado. 

En el caso de la regresión lineal lo más habitual es medir el **error cuadrático medio** o **RMSE**. Para ello spark nos proporciona una función que lo computa de manera escalable.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="cnt", predictionCol="prediction")

evaluator.evaluate(dfTrainPredicctions)

Este valor por si solo no tiene mucha importancia, necesitamos otros datos para poder compararlo. De momento, aplicaremos el resultado también a los datos de test. Para ello debemos transformar los datos usando todo el proceso anterior.

**Muy importante**: Notad que solo estoy aplicando los métodos de transformación y en ningún momento volviendo a entrenar ningún modelo.

In [None]:
dfTestAssembled = vectorAssembler.transform(dfTest)

dfTestIndexed = vectorIndexerModel.transform(dfTestAssembled)

dfTestPredictions = lrModel.transform(dfTestIndexed)

evaluator.evaluate(dfTestPredictions)

El resultado es ligeramente peor, no obstante parece que el modelo ha **generalizado** correctamente y no se ha **sobreajustado** a los datos de entrenamiento. Para comprender un poco mejor este error en su contexto podemos contrastar el error con la distribución de la variable objetivo.

In [None]:
dfTrain.select(col("cnt")).summary().show()

## Abstracción del proceso mediante pipelines

¿Cuál sería el siguiente paso para tratar de mejorar este algoritmo?

Tenemos dos opciones:

- Reentrenar el algoritmo con otros hyperparametros
- Probar con otro método

En ambos casos acabamos de ver que el proceso está lejos de ser reproducible y será complicado llevarlo a producción.

Por ello Spark MLlib pone a nuestra disposición las pipelines (muy inspiradas en sklearn), para automatizar y exportar todo el proceso de entrenamiento y evaluación de modelos.

Básicamente una pipeline es una secuencia de transformadores y estimadores que abstrae todo el proceso como un único estimador, sobre el que únicamente hay que realizar las operaciones de `fit`y `transform` una sola vez.

In [None]:
from pyspark.ml import Pipeline

pipelineLr = Pipeline(stages=[vectorAssembler, vectorIndexer, lr])

# Usar el dataset de entrenamiento original

pipelineModel = pipelineLr.fit(dfTrain)

dfTestPredictions = pipelineModel.transform(dfTest) 

dfTestPredictions.show()

Una vez creada la pipeline es muy sencillo reusarla para entrenar y evaluar otros modelos.

A continuación vamos a resolver este mismo problema con otro modelo distinto, mucho más potente, como es la regresión mediante `GBT` Gradient boosted trees.

In [None]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(labelCol="cnt", maxDepth=3, maxIter=20)

pipelineGbt = Pipeline(stages=[vectorAssembler, vectorIndexer, gbt])

pipelineGbtModel = pipelineGbt.fit(dfTrain)

dfTestPredictions = pipelineGbtModel.transform(dfTest) 

dfTestPredictions.show()

evaluator.evaluate(dfTestPredictions)

Podemos ver claramente la superioridad de este nuevo modelo que hemos entrenado, aunque por otra parte el tiempo de ejecución ha sido más elevado.

Muchas veces hemos de sacrificar algo de potencia a cambio de velocidad. Para ello hay que estar atentos al problema que estemos resolviendo.

## Evaluación de modelos y ajuste de hyperparámetros

En el ejercicio anterior hemos contrastado el resultado de dos algoritmos de regresión diferentes y hemos obtenido dos resultados donde claramente uno era superior al anterior. 

Generalmente se suele evaluar un número de modelos diferente en base a distintas configuraciones, lo ideal sería poder realizar este proceso de manera sistemática y estadísticamente correcta.

En spark MLlib tenemos a nuestra disposición uno de los métodos más potentes para poder realizar este proceso, el algoritmo de **GridSearch** ejecutado sobre una Validación Cruzada (**Cross Validation**)

Esta funcionalidad está disponible en el paquete `pyspark.ml.tuning`

Lo primero que tenemos que hacer es crear una grid de parámetros, es decir, una especificación de distintas combinaciones de hyperparámetros con los que evaluar nuestro modelo. Podemos incluir parámetros tanto para el algoritmo de regresión como para los estimadores que se entrenan para preprocesado.

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(vectorIndexer.maxCategories, [2, 4, 8]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

Ahora configuraremos la validación cruzada, para lo que necesitamos especificar un estimador (que será nuestra pipeline), un evaluador (RMSE en nuestros caso) y opcionalmente un grid de parámetros. La validación se realizará conforme a un número de folds, que será el número de iteraciones a realizar.

In [None]:
crossvalLr = CrossValidator(estimator=pipelineLr,
    estimatorParamMaps=paramGrid,
    evaluator=RegressionEvaluator(metricName="rmse", labelCol="cnt", predictionCol="prediction") ,
    numFolds=5, 
    seed=1234)

Una vez definido el proceso podemos entrenarlo con los datos de entrenamiento del mismo modo que hacíamos en el ejemplo anterior.

In [None]:
crossvalModelLr = crossvalLr.fit(dfTrain)

print( crossvalModelLr.avgMetrics )

Durante el proceso de entrenamiento se evalúan las distintas folds y se generan modelos intermedios. Podemos comprobar cuál es el resultado de este proceso consultando las métricas así como la media.

In [None]:
list(zip(crossvalModelLr.avgMetrics, paramGrid))

Además de consultar todas las salidas generadas, el proceso de selección almacena la mejor configuración de todos los modelos. De hecho puede ser utilizado directamente como  transformador como si fuese dicho modelo.

El modelo almacenado corresponde con la mejor configuración, **reentrenada para todos los datos de training**.

In [None]:
bestLrPredictions = crossvalModelLr.transform(dfTest)

bestLrPredictions.show()

evaluator.evaluate(bestLrPredictions)

Si queremos disponer directamente del modelo en sí podemos recuperarlo, consultar sus parámetros, almacenarlo o utilizarlo más adelante. Es importante recordar que este estimador es el pipeline entero!

In [None]:
bestLrModel = crossvalModelLr.bestModel

bestPredictionDF = bestLrModel.transform(dfTest)

bestPredictionDF.show()

### Cómo seleccionar el mejor modelo

Ahora que tenemos preparado todo el proceso de aprendizaje vamos a contrastar de nuevo la mejor regresión contra el mejor ensemble de árboles. Para ello compararemos contra el conjunto de test el mejor modelo de ambos procesos.

In [None]:
paramGridGbt = ParamGridBuilder() \
    .addGrid(vectorIndexer.maxCategories, [2, 4]) \
    .addGrid(gbt.maxDepth, [3, 5]) \
    .build()

crossvalGBT = CrossValidator(estimator=pipelineGbt,
                          estimatorParamMaps=paramGridGbt,
                          evaluator=RegressionEvaluator(metricName="rmse", labelCol="cnt", predictionCol="prediction") ,
                          numFolds=5, 
                          seed=1234)

crossvalGBTModel = crossvalGBT.fit(dfTrain)


In [None]:
bestGBTPredictions = crossvalGBTModel.transform(dfTest)

evaluator.evaluate(bestGBTPredictions)