# Travaillant avec Spark Structured Streaming

Tout comme Spark a migré de l'API "bas niveau" des RDD vers l'API déclarative DataFrame, le support au streaming a aussi deux bibliothèques différentes :
* Spark Streaming - ancienne bibliothèque, axée sur les RDD
* Spark Structured Streaming - nouvelle version, intégrant les DataFrames via spark.sql

La différence va au delà des RDD et des DataFrames. Par exemple, `Structured Streaming` adopte un traitement basé sur le **event time** plutôt que par ordre d'arrivée (**processing time**). Cela permet de traiter les événéments retardés par la transmission, mais demande plus d'organisation (suivi de l'état des événéments, fenêtres de timeout).

Autre différence, l'ancienne version ne suportait que le processement par **micro batches**, alors que Structured Streaming permet aussi le **continuous processing**. Cette dernière option permet une plus pétite latence de traitement car on n'attend plus qu'un certain nombre d'entrées soit arrivée.

Dans les paragraphes suivants on va regarder quelques exemples d'utilisation de Structured Streaming. Nous allons utiliser des données issues d'un suivi des activités d'une personne (https://archive.ics.uci.edu/ml/datasets/Heterogeneity+Activity+Recognition)

## Démarrage d'une session Spark

In [None]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [None]:
spark = SparkSession.builder.appName("readfromjson").master("local[2]").getOrCreate()

Dans cet exercice nous allons simuler un flux de données en lisant plusieurs fichiers au format json.
Comme les DataFrames ont besoin du schéma des données, l'étape suivante prend un peu de temps pour analyser la structure de tous les fichiers.

In [None]:
!wget http://urca.lsteffenel.fr/activity-data.zip
!unzip activity-data.zip

In [None]:
static = spark.read.json("activity-data/")
dataSchema = static.schema

In [None]:
static.printSchema()

Nous allons maintenant définir le flux streaming. On renseigne le schéma et la source (`.json()`), puis on limite la lecture à un fichier à la fois pour avoir plus de temps de traitement.

In [None]:
streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
  .json("activity-data")

Ensuite, nous définissions une transformation qui récupère les données, les regroupe par la colonne `gt` et met à jour le nombre d'éléments.  

In [None]:
activityCounts = streaming.groupBy("gt").count()

Pour le moment ces actions n'ont pas été démarrées. La ligne suivante démarre le traitement du stream.

Les données sont stockées en mémoire dans un tableau appelé **activity_counts**.

Le mode de traitement est **complet**, ce que veut dire que l'ensemble du tableau est mis à jour à chaque événément.

In [None]:
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
  .format("memory").outputMode("complete")\
  .start()

Pour finir, nous allons essayer de visualiser ces mises à jour.

La boucle suivante utilise la syntaxe `spark.sql`pour faire une requête sur le tableau *activity_counts*. En faisant des pauses d'une seconde entre chaque mise à jour, on peut visualiser l'évolution du remplissage du tableau.

In [None]:
from time import sleep
for x in range(20):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(1)

Le streaming est toujours actif, comme le prouve la ligne suivante.

Dans certains cas (lors d'une longue exécution), le *driver* peut être arrêté lorsque le flux de données est interrompu pendant trop de temps. Afin d'éviter ça, on peut demander explicitement au driver d'attendre la fin (un Ctrl-C, par exemple) avec la méthode `.awaitTermination()`. Dans notre cas, on va se contenter de faire un `.stop()`.

In [None]:
spark.streams.active

In [None]:
activityQuery.stop()

## Travaillant avec les timestamps

Le traitement des données au fur et à mesure de leur arrivée peut avoir des mauvaises conséquences si des messages sont perdus, retardés ou arrivent en doublon. Pour cela, Structured Streaming permet de manipuler les événéments et de contrôler la fenêtre d'utilité de ces données.

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", 5)
static = spark.read.json("activity-data")
streaming = spark\
  .readStream\
  .schema(static.schema)\
  .option("maxFilesPerTrigger", 10)\
  .json("activity-data")


In [None]:
streaming.printSchema()

La colonne `Creation_Time` est représentée en tant que des nanosecondes (*unixtime*). Nous allons d'abord les transformer en quelque chose plus facile à travailler.

In [None]:
withEventTime = streaming.selectExpr(
  "*",
  "cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")

### Traitement par blocs

La façon la plus simple de traiter les événéments entrants est de définir une fenêtre de traitement (un peu comme un batch). Dans le code suivant, les événéments sont regroupés par tranches de 10 minutes, sans superposition.

In [None]:
from pyspark.sql.functions import window, col
event_bloc = withEventTime.groupBy(window(col("event_time"), "10 minutes")).count()\
  .writeStream\
  .queryName("pyevents_per_window")\
  .format("memory")\
  .outputMode("complete")\
  .start()

Attendez quelques instants. À la fin du traitement, on peut visualiser les données regroupées par blocs de 10 minutes :

In [None]:
spark.sql("SELECT * FROM pyevents_per_window").show()

In [None]:
event_bloc.stop()

### Fenêtre glissante

Une autre façon de travailler est d'utiliser une fenêtre glissante pour mettre à jour le tableau. Dans le cas suivant, on déclenche une mise à jour à chaque 5 minutes, en utilisant les données reçues dans les 10 dernières minutes.

In [None]:
from pyspark.sql.functions import window, col
event_window = withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))\
  .count()\
  .writeStream\
  .queryName("pyevents_per_window")\
  .format("memory")\
  .outputMode("complete")\
  .start()

In [None]:
spark.sql("SELECT * FROM pyevents_per_window").show()

Répétez la ligne précédente quelques fois afin de voir l'avancée du traitement.

In [None]:
event_window.stop()

### Watermark

Dans certains cas, les données peuvent arriver trop tard. Afin de limiter l'impact sur le traitement, on peut limiter le temps maximum d'attente (*watermarking*).

L'exemple suivant donne un aperçu de cette procédure.

In [None]:
from pyspark.sql.functions import window, col
event_watermark = withEventTime\
  .withWatermark("event_time", "30 minutes")\
  .groupBy(window(col("event_time"), "10 minutes", "5 minutes"))\
  .count()\
  .writeStream\
  .queryName("pyevents_per_window")\
  .format("memory")\
  .outputMode("complete")\
  .start()

In [None]:
spark.sql("SELECT * FROM pyevents_per_window").show()

Répétez la ligne précédente quelques fois afin de voir l'avancée du traitement.

In [None]:
event_watermark.stop()