# Actividad 1: HDFS, Spark SQL y MLlib

## Recuerda borrar siempre las líneas que dicen `raise NotImplementedError`

Lee con detenimiento cada ejercicio. Las variables utilizadas para almacenar las soluciones, al igual que las nuevas columnas creadas, deben llamarse **exactamente** como indica el ejercicio, o de lo contrario los tests fallarán y el ejercicio no puntuará. Debe reemplazarse el valor `None` al que están inicializadas por el código necesario para resolver el ejercicio.

## Leemos el fichero flights.csv que hemos subido a HDFS

Indicamos que contiene encabezados (nombres de columnas) y que intente inferir el esquema, aunque después comprobaremos si lo
ha inferido correctamente o no. La ruta del archivo en HDFS debería ser /<nombre_alumno>/flights.csv

In [0]:
ruta_hdfs = '/Antonio_Lopez/flights.csv' # Reemplaza esto por la ruta correcta del fichero flights.csv en HDFS
flightsDF = None

# Descomentar estas líneas
flightsDF = spark.read\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .csv(ruta_hdfs)

# YOUR CODE HERE
flightsDF

Imprimimos el esquema para comprobar qué tipo de dato ha inferido en cada columna

In [0]:
flightsDF.printSchema()

Mostramos el número de filas que tiene el DataFrame para hacernos una idea de su tamaño:

In [0]:
flightsDF.count()

Vemos que tenemos 162049 filas. Si imprimimos por pantalla las 5 primeras filas, veremos qué tipos parecen tener y en qué columnas no coincide el tipo que podríamos esperar con el tipo que ha inferido Spark.

In [0]:
flightsDF.show(5)

La causa del problema es que en muchas columnas existe un valor faltante llamado "NA". Spark no reconoce ese valor como
*no disponible* ni nada similar, sino que lo considera como un string de texto normal, y por tanto, asigna a toda la columna
el tipo de dato string (cadena de caracteres). Concretamente, las siguientes columnas deberían ser de tipo entero pero Spark
las muestra como string:
<ul>
 <li>dep_time: string (nullable = true)
 <li>dep_delay: string (nullable = true)
 <li>arr_time: string (nullable = true)
 <li>arr_delay: string (nullable = true)
 <li>air_time: string (nullable = true)
 <li>hour: string (nullable = true)
 <li>minute: string (nullable = true)    
</ul>

Vamos a averiguar cuántas filas tienen el valor "NA" (como string) en la columna dep_time:

In [0]:
from pyspark.sql import functions as F
cuantos_NA = flightsDF\
                .where(F.col("dep_time") == "NA")\
                .count()
cuantos_NA

Por tanto, hay 857 filas que no tienen un dato válido en esa columna. Hay distintas maneras de trabajar con los valores faltantes, como por ejemplo imputarlos (reemplazarlos por un valor generado por nosotros según cierta lógica, por ejemplo la media de esa columna, etc). Lo más sencillo es quitar toda la fila, aunque esto depende de si nos lo podemos permitir en base
a la cantidad de datos que tenemos. En nuestro caso, como tenemos un número considerable de filas, vamos a quitar todas las filas donde hay un NA en cualquiera de las columnas.

In [0]:
columnas_limpiar = ["dep_time", "dep_delay", "arr_time", "arr_delay", "air_time", "hour", "minute"]

flightsLimpiado = flightsDF
for nombreColumna in columnas_limpiar:  # para cada columna, nos quedamos con las filas que no tienen NA en esa columna
    flightsLimpiado = flightsLimpiado.where(F.col(nombreColumna) != "NA")

flightsLimpiado.cache()

Si ahora mostramos el número de filas que tiene el DataFrame `flightsLimpiado` tras eliminar todas esas filas, vemos que ha disminuido ligeramente
pero sigue siendo un número considerable como para realizar analítica y sacar conclusiones sobre estos datos

In [0]:
flightsLimpiado.count()

Una vez que hemos eliminado los NA, vamos a convertir a tipo entero cada una de esas columnas que eran de tipo string. 
Ahora no debe haber problema ya que todas las cadenas de texto contienen dentro un número que puede ser convertido de texto a número. Vamos también a convertir la columna `arr_delay` de tipo entero a número real, necesario para los pasos posteriores donde ajustaremos un modelo predictivo.

In [0]:
from pyspark.sql.types import IntegerType, DoubleType

flightsConvertido = flightsLimpiado

for c in columnas_limpiar:
    # método que crea una columna o reemplaza una existente
    flightsConvertido = flightsConvertido.withColumn(c, F.col(c).cast(IntegerType())) 

flightsConvertido = flightsConvertido.withColumn("arr_delay", F.col("arr_delay").cast(DoubleType()))
flightsConvertido.cache()

In [0]:
flightsConvertido.printSchema()

Vamos a volver a mostrar las 5 primeras filas del DataFrame limpio. Aparentemente son iguales a las que ya teníamos, pero ahora
Spark sí está tratando como enteros las columnas que deberían serlo, y si queremos podemos hacer operaciones aritméticas
con ellas.

In [0]:
flightsConvertido.show(5)


### Ejercicio 1

Partiendo del DataFrame `flightsConvertido` que ya tiene los tipos correctos en las columnas, se pide: 

* Crear un nuevo DataFrame llamado `aeropuertosOrigenDF` que tenga una columna `origin` y que tenga tantas filas como aeropuertos distintos de *origen* existan. ¿Cuántas filas tiene? Almacenar dicho recuento en la variable entera `n_origen`.
* Crear un nuevo DataFrame llamado `rutasDistintasDF` que tenga dos columnas `origin`, `dest` y que tenga tantas filas como rutas diferentes existan (es decir, como combinaciones distintas haya entre un origen y un destino). Una vez creado, contar cuántas combinaciones hay, almacenando dicho recuento en la variable entera `n_rutas`.

In [0]:
# Reemplaza None por el código necesario para calcular sus valores correctos
aeropuertosOrigenDF = flightsConvertido.select("origin").distinct()
n_origen = aeropuertosOrigenDF.count()
rutasDistintasDF = flightsConvertido.select("origin", "dest").distinct()
n_rutas = rutasDistintasDF.count()

# YOUR CODE HERE

In [0]:
assert(n_origen == 2)
assert(n_rutas == 115)
assert(aeropuertosOrigenDF.count() == n_origen)
assert(rutasDistintasDF.count() == n_rutas)

### Ejercicio 2

* Partiendo de nuevo de `flightsConvertido`, se pide calcular, *sólo para los vuelos que llegan con* ***retraso positivo***, el retraso medio a la llegada de dichos vuelos, para cada aeropuerto de destino. La nueva columna con el retraso medio a la llegada debe llamarse `retraso_medio`. El DF resultante debe estar **ordenado de mayor a menor retraso medio**. El código que calcule esto debería ir encapsulado en una función de Python llamada `retrasoMedio` que reciba como argumento un DataFrame y devuelva como resultado el DataFrame con el cálculo descrito anteriormente.

* Una vez hecha la función, invocarla pasándole como argumento `flightsConvertido` y almacenar el resultado devuelto en la variable `retrasoMedioDF`.

In [0]:
def retrasoMedio(df):  # El argumento que recibe la función es un DataFrame de Spark
    import pyspark.sql.functions#Introducido aquí para evitar errores en las correciones
    from pyspark.sql.functions import desc
    # Escribe el código de tu función
    data = df.filter(df.arr_delay > 0).groupBy(df.dest).agg({"arr_delay": "avg"}).withColumnRenamed("AVG(arr_delay)", "retraso_medio").sort(desc("avg(arr_delay)")) #.orderBy("avg(arr_delay)").desc()

    return data  # Reemplaza None por la variable que quieras devolver

retrasoMedioDF = retrasoMedio(flightsConvertido)  # Reemplazar None por el código necesario

# YOUR CODE HERE



In [0]:
lista = retrasoMedio(flightsConvertido).take(3)
assert((lista[0].retraso_medio == 64.75) & (lista[0].dest == "BOI"))
assert((lista[1].retraso_medio == 46.8) & (lista[1].dest == "HDN"))
assert((round(lista[2].retraso_medio, 2) == 41.19) & (lista[2].dest == "SFO"))

Ahora invocamos a nuestra función `retrasoMedio` pasándole como argumento `flightsConvertido`. ¿Cuáles son los tres aeropuertos con mayor retraso medio? ¿Cuáles son sus retrasos medios en minutos?

In [0]:
# ESCRIBE AQUÍ TU CÓDIGO PARA MOSTRAR EL CONTENIDO DE retrasoMedioDF
retrasoMedioDF.show(3)

### Ejercicio 3

Ajustar un modelo de DecisionTree de Spark para predecir si un vuelo vendrá o no con retraso (problema de clasificación binaria), utilizando como variables predictoras el mes, el día del mes, la hora de partida `dep_time`, la hora de llegada `arr_time`, el tipo de avión (`carrier`), la distancia y el tiempo que permanece en el aire. Para ello, sigue los siguientes pasos.

Notemos que en estos datos hay variables numéricas y variables categóricas que ahora mismo están tipadas como numéricas, como por ejemplo el mes del año (`month`), que es en realidad categórica. Debemos indicar a Spark cuáles son categóricas e indexarlas. Para ello se pide: 

* Crear un `StringIndexer` llamado `indexerMonth` y otro llamado `indexerCarrier` sobre las variables categóricas `month` y `carrier` (tipo de avión). El nombre de las columnas indexadas que se crearán debe ser, respectivamente, `monthIndexed` y `carrierIndexed`.

In [0]:
# Incluye aquí los imports que necesites
from pyspark.ml.feature import StringIndexer

indexerMonth = StringIndexer(inputCol="month", outputCol="monthIndexed")
indexerCarrier = StringIndexer(inputCol="carrier", outputCol="carrierIndexed")

# YOUR CODE HERE

In [0]:
assert(isinstance(indexerMonth, StringIndexer))
assert(isinstance(indexerCarrier, StringIndexer))
assert(indexerMonth.getInputCol() == "month")
assert(indexerMonth.getOutputCol() == "monthIndexed")
assert(indexerCarrier.getInputCol() == "carrier")
assert(indexerCarrier.getOutputCol() == "carrierIndexed")

Recordemos también que Spark requiere que todas las variables estén en una única columna de tipo vector, por lo que después de indexar estas dos variables, tendremos que fusionar en una columna de tipo vector todas ellas, utilizando un `VectorAssembler`. Se pide:

* Crear en una variable llamada `vectorAssembler` un `VectorAssembler` que reciba como entrada una lista de todas las variables de entrada (y que no debe incluir `arr_delay`) que serán las que formarán parte del modelo. Crear primero esta lista de variables (lista de strings) en la variable `columnas_ensamblar` y pasar dicha variable como argumento al crear el `VectorAssembler`. Como es lógico, en el caso de las columnas `month` y `carrier`, no usaremos las variables originales sino las indexadas en el apartado anterior. La columna de tipo vector creada con las características ensambladas debe llamarse `features`.

In [0]:
# Incluye aquí los imports que necesites
from pyspark.ml.feature import VectorAssembler

#columnas_ensamblar = ['year','month','day','dep_time','dep_delay','arr_time','arr_delay','carrier','tailnum','flight','origin','dest','air_time','distance','hour','minute']
#columnas_ensamblar = [indexerMonth,'day','dep_time','arr_time',indexerCarrier,'air_time','distance']
columnas_ensamblar = [indexerMonth.getOutputCol(),'day','dep_time','arr_time',indexerCarrier.getOutputCol(),'air_time','distance']
vectorAssembler = VectorAssembler(inputCols=columnas_ensamblar, outputCol="features")


# YOUR CODE HERE


In [0]:
assert(isinstance(vectorAssembler, VectorAssembler))
assert(vectorAssembler.getOutputCol() == "features")
input_cols = vectorAssembler.getInputCols()
assert(len(input_cols) == 7)
assert("arr_delay" not in input_cols)

Finalmente, vemos que la columna `arr_delay` es continua, y no binaria como requiere un problema de clasificación con dos clases. Vamos a convertirla en binaria. Para ello se pide:

* Utilizar un binarizador de Spark, fijando a 15 el umbral, y guardarlo en la variable `delayBinarizer`. Consideramos retrasado un vuelo que ha llegado con más de 15 minutos de retraso, y no retrasado en caso contrario. La nueva columna creada con la variable binaria debe llamarse `arr_delay_binary` y debe ser interpretada como la columna target para nuestro algoritmo. Por ese motivo, esta columna **no** se incluyó en el apartado anterior entre las columnas que se ensamblan para formar las features.

In [0]:
# Incluye aquí los imports que necesites y que no hayas incluido ya en alguna celda anterior
from pyspark.ml.feature import Binarizer

delayBinarizer = Binarizer(threshold=15.0, inputCol="arr_delay", outputCol="arr_delay_binary")

# YOUR CODE HERE


In [0]:
assert(isinstance(delayBinarizer, Binarizer))
assert(delayBinarizer.getThreshold() == 15)
assert(delayBinarizer.getInputCol() == "arr_delay")
assert(delayBinarizer.getOutputCol() == "arr_delay_binary")

Por último, crearemos el modelo de clasificación.

* Crear en una variable `decisionTree` un árbol de clasificación de Spark (`DecisionTreeClassifier` del paquete `pyspark.ml.classification`)
* Indicar como columna de entrada la nueva columna creada por el `VectorAssembler` creado en un apartado anterior.
* Indicar como columna objetivo (target) la nueva columna creada por el `Binarizer` del apartado anterior.

In [0]:
# Incluye aquí los imports que necesites y que no hayas incluido ya en alguna celda anterior
from pyspark.ml.classification import DecisionTreeClassifier

decisionTree = DecisionTreeClassifier(featuresCol=vectorAssembler.getOutputCol(), labelCol=delayBinarizer.getOutputCol())



In [0]:
assert(isinstance(decisionTree, DecisionTreeClassifier))
assert(decisionTree.getFeaturesCol() == "features")
assert(decisionTree.getLabelCol() == "arr_delay_binary")

Ahora vamos a encapsular todas las fases en un sólo pipeline y procederemos a entrenarlo. Se pide:

* Crear en una variable llamada `pipeline` un objeto `Pipeline` de Spark con las etapas anteriores en el orden adecuado para poder entrenar un modelo. 

* Entrenarlo invocando sobre ella al método `fit` y guardar el pipeline entrenado devuelto por dicho método en una variable llamada `pipelineModel`. 

* Aplicar el pipeline entrenado para transformar (predecir) el DataFrame `flightsConvertido`, guardando las predicciones devueltas en la variable `flightsPredictions` que será un DataFrame. Nótese que estamos prediciendo los propios datos de entrenamiento y que, por simplicidad, no habíamos hecho (aunque habría sido lo correcto) ninguna división de nuestros datos originales en subconjuntos distintos de entrenamiento y test antes de entrenar.

In [0]:
# Incluye aquí los imports que necesites y que no hayas incluido ya en alguna celda anterior
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [indexerMonth, indexerCarrier, vectorAssembler, delayBinarizer, decisionTree])
pipelineModel = pipeline.fit(flightsConvertido)
flightsPredictions = pipelineModel.transform(flightsConvertido)

# YOUR CODE HERE


In [0]:
from pyspark.ml import PipelineModel
assert(isinstance(pipeline, Pipeline))
assert(len(pipeline.getStages()) == 5)
assert(isinstance(pipelineModel, PipelineModel))
assert("probability" in flightsPredictions.columns)
assert("prediction" in flightsPredictions.columns)
assert("rawPrediction" in flightsPredictions.columns)

Vamos a mostrar la matriz de confusión (este apartado no es evaluable). Agrupamos por la variable que tiene la clase verdadera y la que tiene la clase predicha, para ver en cuántos casos coinciden y en cuántos difieren.

In [0]:
flightsPredictions.groupBy("arr_delay_binary", "prediction").count().show()