In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = pyspark.SparkConf().setAppName('appName').setMaster('local[4]').set('spark.jars.packages', 'graphframes:graphframes:0.8.0-spark2.4-s_2.11')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

# Utilización del servicio de alquiler de bicicletas en Toronto en el año 2018

### Disponible en Kaggle en:
https://www.kaggle.com/jackywang529/toronto-bikeshare-data


El propósito de este análisis es utilizar los conjuntos de datos trimestrales del año 2018 de la empresa de alquiler de bicicletas en Toronto. Se trata de *cuatro* conjuntos de datos separados, que incluyen entre 178.559 y 822.536 observaciones, siempre con nueve variables. Cada fila representa un viaje realizado.

### Variables y significado

Las variables utilizadas para describir cada viaje son:

* trip_id – identificador global del viaje
* trip_duration_seconds – duración del viaje en segundos
* from_station_id – identificador numérico de la estación de origen
* trip_start_time – instante (timestamp) en el que se inició el viaje
* from_station_name – nombre de la intersección más cercana a la estación origen
* trip_stop_time – instante (timestamp) en el que finalizó el viaje
* to_station_id – identificador numérico de la estación de destino
* to_station_name – nombre de la intersección más cercana a la estación de destino
* user_type – tipo de usuario (indicador binario): miembro registrado con cuota anual / usuario ocasional no registrado

**Nombre completo del alumno:**  

**INSTRUCCIONES**: en cada celda debes responder a la pregunta formulada, asegurándote de que el resultado queda guardado en la(s) variable(s) que por defecto vienen inicializadas a `None`. No se necesita usar variables intermedias, pero puedes hacerlo siempre que el resultado final del cálculo quede guardado exactamente en la variable que venía inicializada a None (debes reemplazar None por la secuencia de transformaciones necesarias, pero nunca cambiar el nombre de esa variable). **No olvides borrar la línea *raise NotImplementedError()* de cada celda cuando hayas completado la solución de esa celda y quieras probarla**.

Después de cada celda evaluable verás una celda con código. Ejecútala (no modifiques su código) y te dirá si tu solución es correcta o no. En caso de ser correcta, se ejecutará correctamente y no mostrará nada, pero si no lo es mostrará un error. Además de esas pruebas, se realizarán algunas más (ocultas) a la hora de puntuar el ejercicio, pero evaluar dicha celda es un indicador bastante fiable acerca de si realmente has implementado la solución correcta o no. Asegúrate de que, al menos, todas las celdas indican que el código es correcto antes de enviar el notebook terminado.

### Sobre los cuatro datasets anteriores (Bike Share Toronto Ridership_Q1 2018.csv hasta Q4) se pide:

**(1 punto)** Ejercicio 1

* Leer por separado cada uno de ellos (sin cachear), tratando de que Spark infiera el tipo de dato de cada columna, y **unirlos en un solo DF** que tampoco debe ser cacheada todavía, ya que en el siguiente paso aún realizaremos otro pre-procesamiento.
* Los cuatro contienen las mismas columnas por lo que no habrá problemas para utilizar la operación `union` encadenada tres veces para crear el DF final.

In [None]:
# LÍNEA EVALUABLE, NO RENOMBRAR LAS VARIABLES
tripsQ1 = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .option("quote", "\"")\
                 .option("escape", "\"")\
                 .option("mode", "DROPMALFORMED")\
                 .csv("ModeloB Major_Contract_Awards.csv").cache()
tripsQ2 = None # Segundo CSV
tripsQ3 = None # Tercer CSV
tripsQ4 = None # Cuarto CSV
tripsTorontoRawDF = None # Unión de todos
# YOUR CODE HERE
raise NotImplementedError

In [None]:
from pyspark.sql.types import DoubleType
assert(tripsTorontoRawDF.count() == 1922955)

**(1 punto)** Ejercicio 2

* Las columnas `trip_start_time` y `trip_stop_time` son en realidad instantes de tiempo que Spark debería procesar como timestamp. Reemplaza **ambas columnas** por su versión convertida a timestamp, utilizando `withColumn` y donde el nuevo valor de la columna viene dado por el siguiente código:
        F.from_unixtime(F.unix_timestamp('nombreColumna', 'MM/dd/yyyy HH:mm')).cast("timestamp"))
El DF resultante debe ser almacenado en la variable `tripsTorontoDF`.


In [None]:
# No olvides los imports que necesites...
# LÍNEAS EVALUABLES, NO RENOMBRAR LAS VARIABLES
tripsTorontoDF = None
# YOUR CODE HERE
raise NotImplementedError

In [None]:
typesDict = dict(tripsTorontoDF.dtypes)
assert(typesDict["trip_start_time"] == "timestamp") 
assert(typesDict["trip_stop_time"] == "timestamp") 

**(1 punto)** Ejercicio 3

Partiendo de `tripsTorontoDF`, realizar las siguientes transformaciones encadenadas en este orden para crear un nuevo DF:
* Primero, debemos quedarnos solamente con las filas donde `trip_start_time` no sea null.
* Sobre el DF resultado de lo anterior, añadir una columna adicional **Mes** y con el mes representado en **trip_start_time**. Dicha columna será de tipo entero y se puede obtener usando `withColumn` con la función `F.month("colName")`, que recibe un nombre de columna y devuelve un objeto columna de enteros que van de 1 a 12. 
* Encadenar esta transformación con otra en la que la columna **Mes** sea reemplazada por su traducción a  cadena de caracteres de 3 letras, siendo la correspondencia 1: Ene, 2: Feb, 3: Mar, 4: Abr, 5: May, 6: Jun, 7: Jul, 8: Ago, 9: Sep, 10: Oct, 11: Nov, 12: Dic.
* Finalmente, añadir una nueva columna **Hora** que contenga la hora de inicio del viaje, aplicando `withColumn` con la función `F.hour("colName")` que recibe un nombre de columna y recibe un objeto columna de enteros de 0 a 23.
* El DF resultante de todas estas transformaciones debe guardarse en la variable `tripsTorontoTimesDF`, que por tanto tendrá 2 columnas más que el DF original `tripsTorontoDF`, y que debe quedar **cacheado**.

In [None]:
# LÍNEA EVALUABLE, NO RENOMBRAR VARIABLES
# imports......
tripsTorontoTimesDF = None
# YOUR CODE HERE
raise NotImplementedError

In [None]:
tripsPerMonth = tripsTorontoTimesDF.groupBy("Mes").count().sort("Mes").collect()
assert(tripsPerMonth[0]["count"] == 94783)
assert(tripsPerMonth[1]["count"] == 281219)
assert(tripsPerMonth[2]["count"] == 83324)
assert(tripsPerMonth[3]["count"] == 43859)
assert(tripsPerMonth[4]["count"] == 49731)
assert(tripsPerMonth[5]["count"] == 286316)
assert(tripsPerMonth[6]["count"] == 250837)
assert((tripsPerMonth[7]["count"] == 84959) | (tripsPerMonth[7]["count"] == 84969))
assert(tripsPerMonth[8]["count"] == 212750)
assert(tripsPerMonth[9]["count"] == 104287)
assert(tripsPerMonth[10]["count"] == 175879)
assert(tripsPerMonth[11]["count"] == 255001)

**(1 punto)** Ejercicio 4

* Partiendo de `tripsTorontoTimesDF`, crear un nuevo DataFrame con **tantas filas como horas tiene el día, y tantas columnas como meses del año** de manera que cada celda indique el **número de viajes** que comenzaron a esa hora en ese mes del año. Guardar el resultado en la variable `tripsPerMonthAndHourDF`, cuyas filas deben quedar ordenadas en base a la hora (de 0 a 23), y cuyas columnas deben estar también ordenadas desde `"Ene"` a `"Dic"`, con `"Hora"` como primera columna.

In [None]:
# LÍNEA EVALUABLE, NO RENOMBRAR VARIABLES
tripsPerMonthAndHourDF = None
# YOUR CODE HERE
raise NotImplementedError

In [None]:
assert(len(tripsPerMonthAndHourDF.columns) == 13)
assert(tripsPerMonthAndHourDF.columns[0] == "Hora")
assert(tripsPerMonthAndHourDF.columns[12] == "Dic")
assert(tripsPerMonthAndHourDF.count() == 24)
todasHoras = tripsPerMonthAndHourDF.collect()
assert((todasHoras[0]["Hora"] == 0) & (todasHoras[0]["Dic"]==782))
assert((todasHoras[23]["Hora"] == 23) & (todasHoras[23]["Dic"]==1208))

**(3 puntos)** Ejercicio 5. 

Partiendo de `tripsTorontoTimesDF` definido anteriormente, añadir las siguientes columnas:

* Primero, tres columnas adicionales llamadas `dur_media`, `dur_min`, `dur_max` que contengan, respectivamente, **la duración media, mínima y máxima de los viajes que parten de esa misma estación de origen (from_station_id) a esa misma hora y en ese mismo mes del año**. Es decir, queremos una columna extra para que podamos tener, junto a cada viaje, información agregada de los viajes similares, entendidos como aquellos que salieron a la misma hora de la misma estación. **No se debe utilizar JOIN sino solo funciones de ventana**.
* A continuación, otra columna adicional `diff_dur_porc` que contenga la diferencia, medida en porcentaje, entre la duración del viaje y la duración media de los viajes similares calculada en el apartado anterior. Dicha diferencia debe calcularse como la resta de la duración del viaje menos la duración media, dividida entre la duración media y multiplicada por 100. El resultado debe obtenerse aplicando operaciones aritméticas con columnas existentes, **sin utilizar `when`**.
* El DF resultante con las 4 columnas nuevas que hemos añadido debe almacenarse en la variable `tripsTorontoExtraInfoDF`.

In [None]:
# LÍNEA EVALUABLE, NO RENOMBRAR VARIABLES
# imports necesarios..........
windowHoraMesEstacion = None
tripsTorontoExtraInfoDF = None
# YOUR CODE HERE
raise NotImplementedError

In [None]:
r = tripsTorontoExtraInfoDF.where("trip_id = '2970611'").head()
assert(r.dur_media - 783.366666667 < 0.001)
assert(r.diff_dur_porc - 44.24918088591975 < 0.001)
assert(r.dur_min == 167)
assert(r.dur_max == 2333)

**(3 puntos)** Ejercicio 6

* Partiendo de `tripsTorontoTimesDF`, crear un **grafo** llamado `bikeGraph` utilizando como identificador de los vértices los identificadores de las estaciones. Construye primero un DF con todos los identificadores de las estaciones, simplemente seleccionando **from_station_id**, renombrando adecuadamente el nombre de columna. Puedes almacenar este DF en la variable `verticesDF`. También tendrás que renombrar las columnas **from_station_id** y **to_station_id** en el DF de aristas, para el que además deberás seleccionar solo dichas columnas y quitar las filas repetidas ya que solo necesitamos considerar una vez cada ruta (cada pareja de estación inicial y final). Puedes almacenar el resultado del renombramiento y la eliminación de repetidos en la variable `edgesDF`.
* Una vez creado, aplica el algoritmo `pageRank` pasando como único parámetro `maxIter = 5`. El algoritmo puede llegar a emplear más de 10 minutos. 
* Almacena el grafo devuelto por dicha función en la variable `pageRankGraph`, recupera el DF de sus vértices, ordénalo descendentemente en base a la columna `pagerank` y almacena el resultado en la variable `sortedPageRankGraphVerticesDF`
* Obtén el identificador de la estación más relevante (con mayor valor de la métrica pageRank, que ocupará la primera fila tras la ordenación), y almacena dicho identificador en la variable `id_mas_relevante`.
* Crea un nuevo DF de una sola fila y tres columnas llamadas `dur_media`, `dur_min` y `dur_max` con la duración **media, mínima y máxima** de los viajes de `tripsTorontoTimesDF` que **empiezan** en dicha estación (sin tener en cuenta distinción de horas o meses). **No debe usarse la función `withColumn` sino crear las columnas al vuelo con `select`**. Debe quedar almacenado en la variable `durEstMasRelevantesDF`

In [None]:
# LÍNEA EVALUABLE, NO RENOMBRAR VARIABLES
# imports necesarios..........
# Descomentar la siguiente línea antes de lanzar pageRank:
# spark.sparkContext.setCheckpointDir("/tmp")
verticesDF = None
edgesDF = None
bikeGraph = None
pageRankGraph = None
sortedPageRankGraphVerticesDF = None
id_mas_relevante = None
durEstMasRelevantesDF = None
# YOUR CODE HERE
raise NotImplementedError

In [None]:
assert(sortedPageRankGraphVerticesDF.head()["pagerank"] - 1.4427 < 0.01)
assert(durEstMasRelevantesDF.count() == 1)
assert(len(durEstMasRelevantesDF.columns) == 3)
rEstMasRelevantes = durEstMasRelevantesDF.head()
assert(rEstMasRelevantes.dur_min == 61)
assert(id_mas_relevante == 7060)
assert(rEstMasRelevantes.dur_media - 747.6957692082626 < 0.001)
assert(rEstMasRelevantes.dur_max == 35130)