
## Traitement de flux structuré avec l'API DataFrame Python
Apache Spark inclut une API de traitement de flux de haut niveau : Structured Streaming.
<br>Dans ce TP, nous jetons un coup d'œil rapide l'utilisation de l'API DataFrame pour construire des applications Structured Streaming.
<br>Dans un premier temps, nous réviserons les éléments vus précédemment pour explorer rapidement les données.
<br>Dans un second temps, nous calculerons des métriques en temps réel (dans notre exemple, ce seront des "running counts" et "windowed counts" sur un flux d'actions horodatées).


Uploader les 50 fichiers du dossier events dans le dossier /FileStore/tables/events. Vérifier qu'ils sont bien présents :

In [0]:
%fs ls /FileStore/tables/events

path,name,size,modificationTime
dbfs:/FileStore/tables/events/file_0.json,file_0.json,74529,1728393377000
dbfs:/FileStore/tables/events/file_1.json,file_1.json,74960,1728393377000
dbfs:/FileStore/tables/events/file_10.json,file_10.json,75024,1728393378000
dbfs:/FileStore/tables/events/file_11.json,file_11.json,74998,1728393378000
dbfs:/FileStore/tables/events/file_12.json,file_12.json,74986,1728393378000
dbfs:/FileStore/tables/events/file_13.json,file_13.json,75005,1728393379000
dbfs:/FileStore/tables/events/file_14.json,file_14.json,75002,1728393379000
dbfs:/FileStore/tables/events/file_15.json,file_15.json,75006,1728393379000
dbfs:/FileStore/tables/events/file_16.json,file_16.json,74977,1728393379000
dbfs:/FileStore/tables/events/file_17.json,file_17.json,75007,1728393379000


Exécuter la commande suivante pour voir rapidement ce que les fichiers contiennent à partir de l'exemple du fichier "file_0.json".

In [0]:
%fs head /FileStore/tables/events/file_0.json

Chaque ligne du fichier contient un enregistrement json avec deux champs : `time` and `action`. Nous allons analyser ces fichiers de manière interactive.

## Traitement par lots/interactif
La première étape pour tenter de traiter les données est de les interroger de manière interactive.
Nous allons définir un DataFrame statique sur les fichiers.

In [0]:
input_path = "/FileStore/tables/events"

Pour changer (et en pratique accélérer le processus), définir le schéma (au lieu de demander à Spark de l'inférer).

In [0]:
from pyspark.sql.types import TimestampType, StringType, StructField, StructType

schema = StructType([
  StructField("time", TimestampType(), False),
  StructField("action", StringType(), False),
])

static_in_df = spark.read.schema(schema).json(input_path)
display(static_in_df)

time,action
2016-07-28T04:19:28Z,Close
2016-07-28T04:19:28Z,Close
2016-07-28T04:19:29Z,Open
2016-07-28T04:19:31Z,Close
2016-07-28T04:19:31Z,Open
2016-07-28T04:19:31Z,Open
2016-07-28T04:19:32Z,Close
2016-07-28T04:19:33Z,Close
2016-07-28T04:19:35Z,Close
2016-07-28T04:19:36Z,Open


Calculer le nombre d'actions "open" et "close" dans des fenêtres d'une heure.
<br>Indication : utiliser un GROUP BY sur la colonne `action` et des fenêtres (windows) sur la colonne `time`.
<br>Réaliser cette tâche en DSL.
<br>Mettre en cache (en mémoire) le DataFrame obtenu.

In [0]:
from pyspark.sql.functions import window

static_counts_df = static_in_df.groupBy(
  static_in_df["action"],
  window(static_in_df["time"], "1 hour"),
).count()

static_counts_df.cache()

display(static_counts_df)

action,window,count
Close,"List(2016-07-26T13:00:00Z, 2016-07-26T14:00:00Z)",1028
Open,"List(2016-07-26T18:00:00Z, 2016-07-26T19:00:00Z)",1004
Close,"List(2016-07-27T02:00:00Z, 2016-07-27T03:00:00Z)",971
Open,"List(2016-07-27T04:00:00Z, 2016-07-27T05:00:00Z)",995
Open,"List(2016-07-27T05:00:00Z, 2016-07-27T06:00:00Z)",986
Open,"List(2016-07-26T05:00:00Z, 2016-07-26T06:00:00Z)",1000
Open,"List(2016-07-26T11:00:00Z, 2016-07-26T12:00:00Z)",991
Close,"List(2016-07-26T06:00:00Z, 2016-07-26T07:00:00Z)",1011
Close,"List(2016-07-27T05:00:00Z, 2016-07-27T06:00:00Z)",987
Open,"List(2016-07-26T10:00:00Z, 2016-07-26T11:00:00Z)",1007


<br>Déterminer le nombre total d'actions "open" et d'actions "close" (sur l'ensemble de la période).
<br>Réaliser cette tâche en SQL.

In [0]:
static_counts_df.createOrReplaceTempView("static_counts")

total_nb_df = spark.sql(
  """
  SELECT action, sum(count) as total_nb
  FROM static_counts_view
  GROUP BY action
  """
)

total_nb_df.show()

+------+--------+
|action|total_nb|
+------+--------+
| Close|   50000|
|  Open|   50000|
+------+--------+



Dans un notebook Databricks, il est possible d'exécuter une commande sql en ligne de commande avec la commande %sql, et d'obtenir un visuel automatique.
<br> L'essayer sur l'instruction précédente.

In [0]:
# Votre code ici

L'utilisation des windows se fait de la manière suivante en SQL. Exécuter la commande suivante pour obtenir un visuel des actions dans le temps.

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from static_counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


Vous devriez remarquer que les actions de fermeture suivent les actions d'ouverture correspondantes (il y a plus d'"open" au début et plus de "close" à la fin).

### Traitement de flux (stream processing)
Maintenant que nous avons analysé les données de manière interactive, nous allons créer une requête de traitement de flux qui se met à jour en continu, à mesure que les données arrivent.
<br>Étant donné que nous avons simplement un ensemble statique de fichiers, nous allons émuler un flux à partir d'eux en lisant un fichier à la fois, dans l'ordre chronologique de leur création.
<br> A noter que le code que nous allons écrire est à peu près le même que précédemment. Merci l'API de Spark !

Adapter le code créé précédemment pour calculer le nombre d'actions "open" et "close", en utilisant readStream à la place de read. Utiliser l'option maxFilesPerTrigger afin de spécifier qu'on ne prend qu'un fichier à la fois (les options doivent être placées avant le chargement effectif à partir de la méthode json).

In [0]:
streaming_in_df = spark.readStream.option("maxFilesPerTrigger", 1).json(input_path, schema=schema)

In [0]:
streaming_counts_df = streaming_in_df.groupBy(
  streaming_in_df["action"],
  window(streaming_in_df["time"], "1 hour"),
).count()

En fouillant (avec le nez fin) la documentation des DataFrames (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html), vérifier si le DataFrame que nous venons de créer est bien un DataFrame de streaming.

In [0]:
streaming_counts_df.isStreaming

True

<br>Vous pouvez démarrer le calcul en continu en définissant la destination (le "sink") et en le démarrant.
<br>Dans notre cas, nous voulons interroger de manière interactive les comptages (mêmes requêtes que ci-dessus), nous allons donc définir l'ensemble complet des comptages d'une heure dans une table en mémoire (pour cela il faudra utiliser format avec l'option "memory").
<br>Utiliser la fonction writeStream pour démarrer la requête.
<br>Donner un nom à la table in-memory, afin de pouvoir y faire référence dans la suite. Pour cela, utiliser queryName.
<br>Choisir un outputMode "complete".

In [0]:
query = streaming_counts_df.writeStream.format("memory").queryName("counts").outputMode("complete").start()

`query` est une référence à la requête de streaming qui s'exécute en arrière-plan. Cette requête récupère continuellement les fichiers et met à jour les comptages fenêtrés. A noter que nous ne faisons que simuler une arrivée continue de fichiers, grâce à l'option maxFilesPerTrigger. Nous n'avons pas précisé d'intervalle de temps entre chaque trigger (il est possible de le paramétrer).

Remarquer le statut de la requête dans la cellule ci-dessus (la barre de progression montre que la requête est active).
<br> Développer `> counts` : on y trouve le nombre de fichiers traités.

Attendre un peu (physiquement ou en utilisant la fonction sleep de la bibliothèque time) puis lancer la requête sql permettant de visualiser les comptages dans le temps sur le DataFrame de streaming. Ne pas oublier que nous avons donné un nom particulier à la table in-memory.

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

action,time,count
Close,Jul-27 10:00,690
Open,Jul-27 10:00,663
Close,Jul-27 11:00,1001
Open,Jul-27 11:00,1006
Close,Jul-27 12:00,1006
Open,Jul-27 12:00,998
Close,Jul-27 13:00,1035
Open,Jul-27 13:00,994
Close,Jul-27 14:00,986
Open,Jul-27 14:00,1008


Continuer à exécuter la requête de manière itérative.

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


Visualiser le nombre total d'actions "open" et "close".

In [0]:
%sql select action, sum(count) as total_nb from counts group by action

action,total_nb
Open,22989
Close,23011


Si vous exécutez la requête ci-dessus de manière répétée, vous constaterez que le nombre d'actions "open" est bien en permanence plus élevé que le nombre d'actions "close", comme anticipable dans un flux de données où une fermeture apparaît toujours après l'ouverture qui lui correspondant. Cela montre que Structured Streaming garantit l'"intégrité du préfixe" (prefix integrity).

Arrêter la requête en exécutant query.stop(). Vérifier le statut de la cellule correspondante à la requête.
<br>Ici bien entendu, nous n'avons que quelques fichiers, et il est possible qu'au moment où vous arrivez à cette cellule, ils soient tous consommés. Dans ce cas, il n'y a plus de mise à jour des comptages. Mais il faut quand même arrêter notre requête proprement !

In [0]:
query.stop()