<a href="https://colab.research.google.com/github/tomasborrella/TheValley/blob/main/notebooks/mds%2B5/spark03/3_Ejemplo_entrenamiento_modelos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Ejemplo de entrenamiento de modelos

Notebook por [Tomás Borrella Martín](https://www.linkedin.com/in/tomasborrella/)
.

Usando los datos de salarios de [este dataset](https://archive.ics.uci.edu/ml/datasets/Adult), predecir si el salario es mayor o menor de 50K$ utilizando los datos censales.

### Enlaces de interés
*   [Slides de presentación](https://docs.google.com/presentation/d/176cobMzuzy_mRRe3YumHBoap98PvFNXhhgIxN1mYmH0/edit?usp=sharing)

# 1. Instalación Spark

In [1]:
# Install JAVA
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# Install Spark
!wget -q https://downloads.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop2.7.tgz
!tar xf spark-3.2.4-bin-hadoop2.7.tgz

In [3]:
# Install findspark
!pip install -q findspark

In [4]:
# Environment variables
import os 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop2.7"

In [5]:
# Find spark
import findspark
findspark.init()

In [None]:
# PySpark 
!pip install pyspark==3.2.4

# 2. Spark Session
Punto de entrada de la aplicación de Spark

In [7]:
# Imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [8]:
# Create Spark Session
spark = (SparkSession
         .builder
         .master("local[*]")
         .appName("Spark Dataframes")
         .getOrCreate()
)

# Ejemplo

# Datos

In [None]:
# Descargamos los datos al entorno de Colab
!wget -P /content/data 'https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data'

Nos hacemos una primera idea de los datos

In [None]:
!head /content/data/adult.data

La descripción completa del dataset está en [este enlace](https://archive.ics.uci.edu/ml/datasets/Adult).

Cargamos los datos en un DataFrame especificando el esquema:

In [11]:
from pyspark.sql.types import DoubleType, StringType, StructField, StructType
 
schema = StructType([
  StructField("age", DoubleType(), False),
  StructField("workclass", StringType(), False),
  StructField("fnlwgt", DoubleType(), False),
  StructField("education", StringType(), False),
  StructField("education_num", DoubleType(), False),
  StructField("marital_status", StringType(), False),
  StructField("occupation", StringType(), False),
  StructField("relationship", StringType(), False),
  StructField("race", StringType(), False),
  StructField("sex", StringType(), False),
  StructField("capital_gain", DoubleType(), False),
  StructField("capital_loss", DoubleType(), False),
  StructField("hours_per_week", DoubleType(), False),
  StructField("native_country", StringType(), False),
  StructField("income", StringType(), False)
])
 
dataset = spark.read.format("csv").schema(schema).load("/content/data/adult.data")
cols = dataset.columns

In [None]:
dataset.show(5)

# Preprocesado de los datos

Creamos un Pipeline con todas las transformaciones

Para usar algoritmos como la *Logistic Regression*, primero tenemos que convertir las variables categóricas en valores numéricos.

En este notebook vamos a usar una combinación de *StringIndexer* (que asigna un valor numérico a cada categoría) y *OneHotEncoder* (que combierte cada categoría en un vector binario).

Se crean los stages de todas las variables categóricas usando un bucle:

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
 
categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
# variable que va a contenter las stages del Pipeline
stages = []

for categoricalCol in categoricalColumns:
    # Primero StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    
    # y después OneHotEncoder para convertir variables categóricas en SparseVectors binarios
    from pyspark.ml.feature import OneHotEncoder
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # vamos añadiendo las stages a la variable.
    # No se ejecutan ahora, se añadirán al Pipeline más adelante.
    stages += [stringIndexer, encoder]

Podemos comprobar que el bucle a través de las 8 variables categóricas ha funcionado bien mirando el contenido de la variable *stages*:

In [None]:
stages

Añadimos también un stage para convertir la variable target (*label*) a numérica usando *StringIndexer*: 

In [15]:
label_stringIdx = StringIndexer(inputCol="income", outputCol="label")
stages += [label_stringIdx]

Y por último añadimos un stage de *VectorAssembler* para convertir todas las *features* en un único vector (así es como lo necesitan los modelos de clasificación):

In [16]:
# Transformamos todas las features en un vector con VectorAssembler
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

Ejecutamos todo el Pipeline de preparación y obtenemos un DataFrame que ya estará listo para el modelo:

In [17]:
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)

Comprobamos el DataFrame preparado:

In [None]:
preppedDataDF.show(5)

Nos quedamos solo con las columnas que nos interesan (las originales y "label" y "features" que son las 2 que necesitan los modelos):

In [None]:
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
dataset.show(5)

Partimos el DataFrame en train y test:

In [20]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

22832
9729


# Logistic Regression

## Versión inicial

In [None]:
from pyspark.ml.classification import LogisticRegression
 
# Se crea un modelo inicial
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
 
# Se entrena el modelo con los datos de train
lrModel = lr.fit(trainingData)

# Predecimos sobre los datos e test, para ello usamos el método transform().
# LogisticRegression.transform() realmente solo necesita la columna 'features'.
predictions = lrModel.transform(testData)

# Visualizamos la salida del modelo (predicciones y probabilidad de cada clase) 
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
# NOTA: Se podrían haber seleccionado otras columnas adicionales.
selected.show(5)

Para evaluar el modelo podemos usar  *BinaryClassificationEvaluator*:

In [22]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
# Creamos el evaluador
evaluator = BinaryClassificationEvaluator()

La métrica que este evaluador va usar por defecto es el AUC (*Area Under the Curve*), pero podríamos hacer que usara *areaUnderPR* de la siguiente manera:
`evaluator.setMetricName("areaUnderPR")`

In [None]:
evaluator.getMetricName()

Evaluamos las predicciones:

In [None]:
evaluator.evaluate(predictions)

## Tuning

Se va a afinar el modelo usando *ParamGridBuilder* y *CrossValidator*.

Para saber qué parámetros podemos modificar de este modelo usamos `explainParams()`:

In [None]:
print(lr.explainParams())

Si usamos tres valores para *regParam*, tres para *maxIter*, y dos para *elasticNetParam*, las combinaciones de parámetros serán 3 x 3 x 3 = 27 posibilidades para el *CrossValidator*.

**Esto va a llevar mucho tiempo en una sola máquina**

Para las pruebas podemos reducirlo a 2 x 2

In [25]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
 
# Se crea el ParamGrid que se usará en el CrossValidator
# Esta es la versión simplificada para que tarde poco
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.5])
             .build())

# Esta sería una versión más completa que tarda demasiado sin un cluster
# paramGrid = (ParamGridBuilder()
#              .addGrid(lr.regParam, [0.01, 0.5, 2.0])
#              .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
#              .addGrid(lr.maxIter, [1, 5, 10])
#              .build())

# Se crea un CrossValidator de  5-fold 
cv = CrossValidator(estimator=lr, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=evaluator, 
                    numFolds=5)
 
# Se ejecuta el CrossValidator (con los 5-folds y el ParamGrid)
cvModel = cv.fit(trainingData)

Usamos el nuevo modelo para hacer una predicción sobre los datos de test y medir su precisión:

In [None]:
# Usamos los datos de test para crear una nueva predicción
# cvModel utiliza el mejor modelo encontrado en la validación cruzada
predictions = cvModel.transform(testData)

# Y evaluamos las predicción
evaluator.evaluate(predictions)

Podemos ver los pesos de los coeficientes y el intercepto del modelo:

In [None]:
print('Model Intercept: ', cvModel.bestModel.intercept)

In [None]:
weights = cvModel.bestModel.coefficients
weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
weightsDF = spark.createDataFrame(weights, ["Feature Weight"])
weightsDF.show()

Y por último podemos echar un vistazo a las predicciones:

In [None]:
# Ver las predicciones del mejor modelo obtenido en la validación cruzada
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
selected.show()

# Ejercicio propuesto: Random Forest

Entrenar un *RandomForestClassifier* y comprobar si sus métricas son mejores que las del *LogisticRegression*.

1.   Primero una versión inicial del Random Forest
2.   Después intentar tuning de hiperparámetros



# Spark Stop

In [30]:
spark.stop()