## PROJET HADOOP - MS-SIO-2019 - SNCF - API TRANSILIEN

#### SPARK STRUCTURED STREAMING (KAFKA CONSUMER)

P. Hamy,  N. Leclercq, L. Poncet - MS-SIO-2019

In [None]:
import os
import json
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [None]:
import logging
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.ERROR, datefmt='%H:%M:%S')

Changement du logging level afin d'éliminer le bruit généré dans la console par un [_warning_](https://stackoverflow.com/questions/39351690/got-interruptedexception-while-executing-word-count-mapreduce-job) récurrent

In [None]:
log4j = sc._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)

### QUESTION 1.1 : calcul et publication du temps d'attente moyen par station

#### Création de la session Spark associé au flux Kafka

In [None]:
kafka_session = SparkSession.builder.appName("MS-SIO-HADOOP-PROJECT-KAFKA-STREAM").getOrCreate()

Limitation du nombre de taches lancées par spark (conseil de configutation glané sur internet pour les configurations matérielles les plus modestes).

In [None]:
kafka_session.conf.set('spark.sql.shuffle.partitions', 4)

#### Création du flux Kafka
On utilise ici un [structured spark stream](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) associé à une source Kafka. 

Il s'agit de spécifier la source via l'adresse du serveur Kafka et le nom du topic auquel on souhaite s'abonner. 

In [None]:
kafka_broker = "sandbox-hdp.hortonworks.com:6667"
kafka_topic = "transilien-02"

In [None]:
kafka_stream = kafka_session \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

#### Schéma de désérialisation des messages  
Les messages injectés dans le flux Kafka sont sérialisés et encodés en binaire dans le champ _value_ du dataframe (format générique des dataframe issus d'un stream Kafka).
```
kafka_stream.printSchema()
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 ```
Il est donc nécessaire despécifier le schéma de désérialisation qui sera passé à la fonction **from_json**.

In [None]:
schema = StructType(
    [
        StructField("station", IntegerType(), True),
        StructField("train", StringType(), True),
        StructField("timestamp", TimestampType(), True),
        StructField("mode", StringType(), True)
    ]
)

A travers, la variable **json_options**, on précise également le format du champ _timestamp_ afin que les valeurs temporelles soient correctement interprétées.

In [None]:
json_options = {"timestampFormat": "yyyy-MM-dd'T'HH:mm:ss.sss'Z'"}

#### Séquence de calcul du temps d'attente moyen par station : étape-01 
Désérialisation/reformatage des messages.

A l'issue de opération le dataframe a le schéma suivant:
```
df.printSchema()
root
 |-- station: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- train: string (nullable = true)
```
#### Séquence de calcul du temps d'attente moyen par station : étape-02
Spécification de la watermark du stream spark. Nous n'acceptons pas les messages ayant plus d'une minute de retard. Il s'agit d'un choix arbitraire qui n'a que peu d'intérêt dans notre projet. Il est toutefois nécessaire de spécifier cette valeur car l'implémentation sous-jacente doit borner l'accumulation des données du stream. [La documentation de Spark explique clairement le concept de _Stateful Stream Processing in Structured Streaming_](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html).

#### Séquence de calcul du temps d'attente moyen par station : étape-03
Un train apparaitra dans les réponses aux requêtes de l'API SNCF tant que son heure de départ n'appartient pas au passé. On supprime donc les doublons associés aux couples (train, heure de départ). Inutile d'ajouter la station à la contrainte d'exclusion car l'idenfiant d'un train est unique.

#### Séquence de calcul du temps d'attente moyen par station : étape-04
Définition de la fênêtre (temporelle) dans laquelle les calculs sont aggrégés. Il s'agit ici de calculer le temps d'attente moyen par station **sur la dernière heure**. Notre fenêtre a donc une largeur temporelle de 60 minutes (_window length_). On choisit de suivre cette moyenne par pas de 2 minutes (_sliding interval_). Dans la mesure où le calcul est demandé par station, la fonction _groupBy_ s'applique au champ _station_ du dataframe.

#### Séquence de calcul du temps d'attente moyen par station : étape-05
Pour chaque station, on définit le temps moyen d'attente sur une période de P minutes comme le rapport de P vs le nombre de trains au départ de cette station sur la période P. Ici P = 60 minutes. 

On crée une _aggrégation_ qui contiendra, pour chaque station et pour chaque fenêtre d'une heure :
- une colonne _nt_ qui indiquant le nombre de trains sur la période
- une colonne _awt_ donnant le temps d'attente moyen recherché. 

La colonne _nt_ est injectée à titre indicatif (visualisation dans la console, validation du calcul).

#### Séquence de calcul du temps d'attente moyen par station : étape-06
Selection de la fenêtre temporelle associée à la dernière heure. 

Il s'agit de selectionner, parmis les N fenêtres temporelles produites par Spark, celle qui correspond à la dernière heure écoulée. On utilise ici la fonction **current_timestamp** de Spark afin de rendre la sélection dynamique (i.e. glissante). Le calcul est effectué est dans l'unité de **unix_timestamp** (la seconde) - beaucoup plus facile à manipulée dans ce contexte. 

Dans l'idée de pourvoir visualiser (si besoin) les valeurs mises en jeu dans le calcul, on choisit de créér les colonnes associées:
- oha = one hour ago = now - 62 minutes = now - (window length + sliding interval) 
- now = now - 1 minutes = now - (sliding interval / 2.) => **valeur ajustée pour n'obtenir qu'une seule fenêtre**
- wstart = window.start = borne inférieure de la fenêtre temporelle 
- wend = window.wend = borne supérieure de la fenêtre temporelle

**La clause _where_ permet de selectionner la fenêtre associée à la dernière heure**.  

#### Séquence de calcul du temps d'attente moyen par station : concaténation des étapes 01 à 06
On s'offre simplement la possibilité d'exécuter la séquence en un seul appel.

In [None]:
last_hour_stream = kafka_stream \
    .select(from_json(col("value").cast("string"), schema, json_options).alias("departure")) \
    .select("departure.*") \
    .select("station", "train", "timestamp") \
    .withWatermark("timestamp", "1 minutes") \
    .dropDuplicates(["train", "timestamp"]) \
    .groupBy("station", window("timestamp", "60 minutes", "2 minutes")) \
    .agg(count("train").alias("nt"), format_number((60. / count("train")), 2).cast("double").alias("awt")) \
    .withColumn("oha", unix_timestamp(current_timestamp()) - 3720) \
    .withColumn("now", unix_timestamp(current_timestamp()) - 60) \
    .withColumn("wstart", unix_timestamp("window.start")) \
    .withColumn("wend", unix_timestamp("window.end")) \
    .where((col("oha") <= col("wstart")) & (col("wend") <= col("now"))) \

**last_hour_stream** constitue 'l'état' de référence de notre stream Spark. C'est à partir de cet état que l'on produit les résultats, métriques, indicateurs, ... demandés

### QUESTION 1.2 à 1.6
- Calcul du temps moyen d’attente sur la ligne station par station sur la dernière heure
- Calcul du temps moyen d’attente globale sur la ligne sur la dernière heure
- Trier les stations par temps d’attente moyen sur la dernière heure
- Trouver la station avec le temps d’attente le plus élevée sur la dernière heure
- Trouver la station avec le temps d’attente le moins élevée sur la dernière heure
- Construire un tableau de bord dans Tableau Software sur la base de ces indicateurs

Les calculs demandés nécessiteraient une seconde opération aggrégation sur le stream _df0_. Or, en l'état actuel de Spark (2.4), [il n'est pas possible d'enchainer plusieurs opération d'aggrégation sur un même stream](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations). Il nous faut donc trouver une solution de contournement.

L'idée retenue est d'effectuer les calculs sur chaque batch de _df0_. Cette approche focntionne ici car chaque batch contient l'intégralité des données sur lesquelles les calculs doivent être réalisés pour produire les résultats attendus - i.e. les données d'attente moyenne par station. Ces résulats sont enregistrés sous _Hive_ dans des tables spécifiques. Ils sont ainsi rendus accéssibles depuis Tableau pour l'élaboration du tableau de bord. 

Les calculs (Q1.2 à Q.1.5) sont regroupés dans un callback du type _foreachBatch_ dont les appels sont déclenchés par une _StreamingQuery_.

La classe _TransilienIndicators_ implémente les fonctionnalités demandées.

In [None]:
class TransilienIndicators():
    
    def __init__(self, 
                 streaming_dataframe, 
                 awt_table="awt_table", 
                 drop_existing_tables=True, 
                 auto_start=True, 
                 verbose=False):
        # setup logger
        self.logger = logging.getLogger("TransilienIndicators")
        self.logger.setLevel(logging.DEBUG if verbose else logging.ERROR)
        self.logger.debug("TransilienIndicators: initializing...")
        # streaming dataframe 
        self.streaming_dataframe = streaming_dataframe
        # name of the  temp. (i.e. in memory) table 
        self.awt_table = awt_table
        # hive oriented spark session (configured to save the results at the right place in Hive)
        self.logger.debug(f"TransilienIndicators: creating SparkSession")
        self.session = SparkSession \
            .builder \
            .master("yarn") \
            .appName("MS-SIO-HADOOP-PROJECT-INDICATORS") \
            .config("spark.sql.warehouse.dir", "hdfs://sandbox-hdp.hortonworks.com:8020/api-transilien") \
            .config("hive.metastore.uris", "thrift://sandbox-hdp.hortonworks.com:9083") \
            .enableHiveSupport() \
            .getOrCreate() 
        self.logger.debug(f"`-> done!")
        # drop existing tables (optional)
        if drop_existing_tables:
            self.__drop_tables()
        # the 'save as table' streaming query - acts as a trigger for computeAndSaveAsTables
        self.sat_query = None
        # the 'console' streaming query - print batches in the console  
        self.csl_query = None
        # processing time (i.e. streaming queries trigger period)
        self.processing_time = "15 seconds" #"1 minutes"
        # start the streaming query?
        if auto_start:
            self.start()
        self.logger.debug(f"initialization done!")
            
    def start(self):
        # stop the streaming query if already running
        self.stop()
        # create then start the 'save' streaming query on the specified streaming dataframe
        # also make 'self.computeAndSaveAsTables' member function the associated 'foreachBatch' callback
        self.logger.debug(f"TransilienIndicators: starting 'SaveAsTables' streaming query")
        self.sat_query = self.streaming_dataframe \
            .writeStream \
            .trigger(processingTime=self.processing_time) \
            .foreachBatch(self.computeAndSaveAsTables) \
            .outputMode("complete") \
            .start() 
        self.logger.debug(f"`-> done!")
        # create then start the 'console' streaming query on the specified streaming dataframe
        self.logger.debug(f"TransilienIndicators: starting 'Console' streaming query")
        self.csl_query = self.streaming_dataframe \
            .select("station", "window", "nt", "awt") \
            .orderBy("awt") \
            .writeStream \
            .trigger(processingTime=self.processing_time) \
            .outputMode("complete") \
            .format("console") \
            .option("truncate", False) \
            .start() 
        self.logger.debug(f"`-> done!")
        self.logger.debug(f"TransilienIndicators: streaming queries are running")
        
    def stop(self):
        # stop the streaming query (best effort impl.)
        if self.sat_query is  not None:
            try:
                self.logger.debug(f"TransilienIndicators: stopping 'SaveAsTables' streaming query")
                self.sat_query.stop()
            except Exception as e:
                pass
            finally:
                self.sat_query = None
                self.logger.debug(f"`-> done!")
        if self.csl_query is  not None:
            try:
                self.logger.debug(f"TransilienIndicators: stopping 'Console' streaming query")
                self.csl_query.stop()
            except Exception as e:
                pass
            finally:
                self.csl_query = None
                self.logger.debug(f"`-> done!")
            
    def cleanup(self):
        # cleanup the underlying session 
        # TODO: not sure this is the right way to do the job
        self.logger.debug(f"TransilienIndicators: shutting down SparkSession")
        self.session.stop()
        self.logger.debug(f"`-> done!")
        
    def turnVerboseOn(self):
        # turn verbose on
        self.logger.setLevel(logging.DEBUG)
        
    def turnVerboseOff(self):
         # turn verbose off
        self.logger.setLevel(logging.ERROR)
     
    def __drop_tables(self):
        # drop existing tables (best effort impl.)
        for table in ["ordered_awt", "global_awt", "min_awt", "max_awt"]:
            try:
                self.logger.debug(f"TransilienIndicators: dropping table {table}")
                self.session.sql(f"drop table transilien.{table}")
                self.logger.debug(f"`-> done!")
            except Exception as e:
                pass
            
    def computeAndSaveAsTables(self, batch, batch_number):
        # the big trick... heart of the functionnality implemented by the Indicators class
        try:
            batch.persist()
            # be sure we have some data to handle (incoming dataframe not empty)
            # this will avoid creating empty tables on Hive side 
            if batch.rdd.isEmpty():
                self.logger.debug(f"TransilienIndicators: got empty batch #{batch_number} - aborting...")
                return
            # compute ordered average waiting time in minutes (on the last hour period)
            df1 = batch.orderBy(asc("awt")).select(col("station"),col("awt"))
            # compute global average waiting time in minutes (on the last hour period) 
            df2 = batch.agg(count("station").alias("number_of_stations"), avg("awt").alias("global_awt"))
            # compute min average waiting time in minutes (on the last hour period)
            df3 = batch.orderBy(asc("awt")).limit(1).select(col("station"),col("awt").alias("min_awt"))
            # compute max average waiting time in minutes (on the last hour period)
            df4 = batch.orderBy(desc("awt")).limit(1).select(col("station"),col("awt").alias("max_awt"))
            # verbose
            if self.logger.getEffectiveLevel() == logging.DEBUG:
                df1.show(3, False)
                df2.show(3, False)
                df3.show(3, False)
                df4.show(3, False)
            # save results as tables (this is slow!!!)
            if not df1.rdd.isEmpty(): # avoid creating empty tables on Hive side
                self.logger.debug(f"saving transilien.ordered_awt for batch #{batch_number}")
                df1.write.mode("overwrite").saveAsTable("transilien.ordered_awt")
            if not df2.rdd.isEmpty(): # avoid creating empty tables on Hive side 
                self.logger.debug(f"saving transilien.global_awt for batch #{batch_number}")
                df2.write.mode("overwrite").saveAsTable("transilien.global_awt")
            if not df3.rdd.isEmpty(): # avoid creating empty tables on Hive side
                self.logger.debug(f"saving transilien.min_awt for batch #{batch_number}")
                df3.write.mode("overwrite").saveAsTable("transilien.min_awt")
            if not df4.rdd.isEmpty(): # avoid creating empty tables on Hive side
                self.logger.debug(f"saving transilien.max_awt for batch #{batch_number}")
                df4.write.mode("overwrite").saveAsTable("transilien.max_awt")
            self.logger.debug(f"computeAndSaveAsTables successfully executed for batch #{batch_number}")
        except Exception as e:
            self.logger.error(f"TransilienIndicators: failed to update hive table with batch #{batch_number}")
            self.logger.error(e)
        finally:
            batch.unpersist()

In [None]:
ti = TransilienIndicators(last_hour_stream, drop_existing_tables=True, verbose=True, auto_start=False)

In [None]:
ti.start()

In [None]:
ti.stop()

In [None]:
ti.turnVerboseOff()

In [None]:
ti.turnVerboseOn()

## Divers 
Les cellules suivantes permettent de détruire proprement les sessions Spark

Note : dans un contexte de production, le code qui précède serait injecté dans un script Python lancé via _spark-submit_. Dans un tel cas, serait nécessaire placer un appel à _awaitTermination_ sur chaque streaming query.

In [None]:
kafka_session.streams.active

In [None]:
kafka_session.stop()

In [None]:
ti.cleanup()