BDLE 2023

date du document  :  10/11/2023

# TP7 2023 Structured Streaming  (ETUDIANT)





## Préparation du TP

Installer pyspark et findspark (durée 1'):


In [1]:
!pip install -q pyspark
!pip install -q findspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


Démarrer la session spark

In [2]:
import os
import glob

pyspark_dir = glob.glob('/usr/local/lib/python*/dist-packages/pyspark')[0]
print("pyspark directory is", pyspark_dir)
os.environ["SPARK_HOME"] = pyspark_dir
os.environ["JAVA_HOME"] = "/usr"

pyspark directory is /usr/local/lib/python3.10/dist-packages/pyspark


In [3]:
# Principaux import
import findspark
from pyspark.sql import SparkSession
from pyspark import SparkConf

# pour les dataframe et udf
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *

# pour le chronomètre
import time

# initialise les variables d'environnement pour spark
findspark.init()

# Démarrage session spark
# --------------------------
def demarrer_spark():
  local = "local[*]"
  appName = "TP"
  configLocale = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "4G").\
  set("spark.driver.memory","4G").\
  set("spark.sql.catalogImplementation","in-memory")

  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")

  spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")

  # On ajuste l'environnement d'exécution des requêtes à la taille du cluster (4 coeurs)
  spark.conf.set("spark.sql.shuffle.partitions","4")
  print("session démarrée, son id est ", sc.applicationId)
  return spark
spark = demarrer_spark()

# pour le TP streaming : 1 seule partition pour avoir un seul fichier de sortie par exécution
spark.conf.set("spark.sql.shuffle.partitions", "1")

session démarrée, son id est  local-1701388356232


In [4]:
# on utilise 8 partitions au lieu de 200 par défaut
spark.conf.set("spark.sql.shuffle.partitions", "8")
print("Nombre de partitions utilisées : ", spark.conf.get("spark.sql.shuffle.partitions"))

Nombre de partitions utilisées :  8


In [None]:
# Optionnel :
# pour l'accès à spark UI : voir https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/
# !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
# !unzip ngrok-stable-linux-amd64.zip
# get_ipython().system_raw('./ngrok http 4050 &')
# !curl -s http://localhost:4040/api/tunnels

Redéfinir la fonction **display** pour afficher le resutltat des requêtes dans un tableau

In [5]:
import pandas as pd
from google.colab import data_table

# alternatives to Databricks display function.

def display(df, n=100):
  return data_table.DataTable(df.limit(n).toPandas(), include_index=False, num_rows_per_page=10)

def display2(df, n=20):
  pd.set_option('max_columns', None)
  pd.set_option('max_colwidth', None)
  return df.limit(n).toPandas()


Définir le tag **%%sql** pour pouvoir écrire plus simplement des requêtes en SQL dans une cellule

In [6]:
from IPython.core.magic import (register_line_magic, register_cell_magic, register_line_cell_magic)

def removeComments(query):
  result = ""
  for line in query.split('\n'):
    if not(line.strip().startswith("--")):
      result += line + "\n"
  return result

@register_line_cell_magic
def sql(line, cell=None):
    "To run a sql query. Use:  %%sql"
    val = cell if cell is not None else line
    tabRequetes = removeComments(val).split(";")
    derniere = None
    est_requete = False
    for r in tabRequetes:
        r = r.strip()
        if len(r) > 2:
          derniere = spark.sql(r)
          est_requete = ( r.lower().startswith('select')or r.lower().startswith('with'))
    if(est_requete):
      return display(derniere)
    else:
      return print('ok')

In [None]:
# facultatif (à ne pas utiliser)
# %load_ext google.colab.data_table
# %unload_ext google.colab.data_table

## Accès aux données

### URL pour l'accès aux datasets

In [7]:
# URL du dossier PUBLIC_DATASET contenant des fichiers de données pour les TP
# ---------------------------------------------------------------------------
# en cas de problème avec le téléchargement des datasets, aller directement sur l'URL ci-dessous
PUBLIC_DATASET_URL = "https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4"
PUBLIC_DATASET=PUBLIC_DATASET_URL + "/download?path="

print("URL du dossier contenant les datasets ", PUBLIC_DATASET_URL)

URL du dossier contenant les datasets  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4


## Exercice 1 : group by window

On commence par étudier le cas de données statiques.

On considère la table suivante : S1(time, v)

In [8]:
data1 = """
8:06 A
8:10 B
8:12 B
8:12 B
8:12 A
8:20 C
8:20 B
8:20 B
"""
date_prefix = "2023-11-10T"

tab = [line.split(" ") for line in data1.split('\n') if len(line)>0]
tab = [(date_prefix+n, v) for (n, v) in tab]
S1 = spark.createDataFrame(tab, ["time", "v"]).selectExpr("timestamp(time) as time", "v")
S1.printSchema()
display(S1.selectExpr("date_format(time, 'HH:mm') as time", "v"))

root
 |-- time: timestamp (nullable = true)
 |-- v: string (nullable = true)



Unnamed: 0,time,v
0,08:06,A
1,08:10,B
2,08:12,B
3,08:12,B
4,08:12,A
5,08:20,C
6,08:20,B
7,08:20,B


On considère la requête S2:

In [10]:
S2 = (S1.groupBy( window(col("time"), "10 minute", "5 minute").alias("w"), "v")
      .agg(count("*").alias("nb"))
      .selectExpr("v","w.start as start", "w.end as end", "nb")
      .selectExpr("date_format(start, 'HH:mm') as start","date_format(end, 'HH:mm') as end","v", "nb").orderBy("start", "v")
)

display(S2)

Unnamed: 0,start,end,v,nb
0,08:00,08:10,A,1
1,08:05,08:15,A,2
2,08:05,08:15,B,3
3,08:10,08:20,A,1
4,08:10,08:20,B,3
5,08:15,08:25,B,2
6,08:15,08:25,C,1
7,08:20,08:30,B,2
8,08:20,08:30,C,1


## Mon brouillon de découverte et de test

In [15]:
S1.createOrReplaceTempView("S1")
S1.show()

+-------------------+---+
|               time|  v|
+-------------------+---+
|2023-11-10 08:06:00|  A|
|2023-11-10 08:10:00|  B|
|2023-11-10 08:12:00|  B|
|2023-11-10 08:12:00|  B|
|2023-11-10 08:12:00|  A|
|2023-11-10 08:20:00|  C|
|2023-11-10 08:20:00|  B|
|2023-11-10 08:20:00|  B|
+-------------------+---+



In [37]:
%%sql

-- Creation de la fenetre sur 10min avec un decalage de 5min pour chaque valeur de v
create or replace temp view Window_10m_5m as
SELECT
  v,
  WINDOW(time, '10 minutes', '5 minutes') AS w,
  COUNT(*) AS nb,
  collect_list(v) AS liste_v
FROM
  S1
GROUP BY
  w,
  v
;

-- Affichage du nombre d'element de la fenetre et de son debut par valeur v
-- ordonne par start et v
SELECT
  date_format(w.start, "HH:mm") AS start,
  date_format(w.end, "HH:mm") AS end,
  v,
  nb,
  liste_v
FROM
  Window_10m_5m
ORDER BY
  start,
  v

Unnamed: 0,start,end,v,nb,liste_v
0,08:00,08:10,A,1,[A]
1,08:05,08:15,A,2,"[A, A]"
2,08:05,08:15,B,3,"[B, B, B]"
3,08:10,08:20,A,1,[A]
4,08:10,08:20,B,3,"[B, B, B]"
5,08:15,08:25,B,2,"[B, B]"
6,08:15,08:25,C,1,[C]
7,08:20,08:30,B,2,"[B, B]"
8,08:20,08:30,C,1,[C]


Question 1)
Proposer une solution pour calculer incrémentalement le résultat de la fenètre courante en réutilisant certains résultats des fenetres précédentes.
Pour cela définir par exemple une fonction traiterS2(donnees)

On fait des fenetres de 5min avec un decalage de 5min alors pas de chevauchement. Ainsi le resultat d'une fenetre de 10min avec un decalage de 5min correspond justement à la somme ainsi calculée de la current row et 1 preceding(la fenetre precedante, 5min plus les 5 prochaines minutes en font 10min pile).

In [112]:
from datetime import datetime, timedelta


def traiterS2(l):

  l = sorted(l, key=lambda x: x[0])
  result = []

  # on commence à i1=0 (premier elt de la liste) avec (start2, end2, nb2) le deuxime de la liste
  for i1, (start2, end2, nb2) in enumerate(l[1:]):
    (start1, end1, nb1) = l[i1]

    # si les deux fenetres sont successives
    if end1 == start2:
      nb = nb1 + nb2
      end = end2
    else:
      nb = nb1
      start1_time = datetime.strptime(start1, "%H:%M")
      new_time = start1_time + timedelta(minutes=10) # Ajouter 10 minutes du debut
      end = new_time.strftime("%H:%M") # Formatter la nouvelle heure au format HH:mm
    r = start1, end, nb
    result.append(r)

  print("Aqui")
  # dernier
  (start1, end1, nb) = l[-1]
  start1_time = datetime.strptime(start1, "%H:%M")
  new_time = start1_time + timedelta(minutes=10) # Ajouter 10 minutes du debut
  end = new_time.strftime("%H:%M") # Formatter la nouvelle heure au format HH:mm
  r = start1, end, nb
  result.append(r)

  return result
spark.udf.register("traiterS2", traiterS2, ArrayType(StructType([StructField("start", StringType()), StructField("end", StringType()), StructField("nb_10", IntegerType())] )))

<function __main__.traiterS2(l)>

In [120]:
%%sql

-- Creation de la fenetre sur 5min avec un decalage de 5min pour chaque valeur de v
create or replace temp view Window_5m_5m as
SELECT
  v,
  WINDOW(time, '5 minutes', '5 minutes',"0 minutes") AS w,
  date_format(w.start, "HH:mm") AS start,
  date_format(w.end, "HH:mm") AS end,
  COUNT(*) AS nb_5,
  collect_list(date_format(time, "HH:mm")) AS liste_time,
  collect_list(v) AS liste_v
FROM
  S1
GROUP BY
  w,
  v
;

-- Pour chaque valeur de v, on collect la liste de ses feentres sur 5min
create or replace temp view Window_10m_5m as
SELECT
  v,
  collect_list((start, end, nb_5)) as liste_v_5,
  traiterS2(collect_list((start, end, nb_5))) AS liste_v
FROM
  Window_5m_5m
GROUP BY v
ORDER BY
  v
;

SELECT
  explode(liste_v) as start_end_nb,
  start_end_nb.start,
  start_end_nb.end,
  v,
  start_end_nb.nb_10
FROM
  Window_10m_5m
ORDER BY
  start,
  v

Unnamed: 0,start_end_nb,start,end,v,nb_10
0,"(08:05, 08:15, 2)",08:05,08:15,A,2
1,"(08:10, 08:20, 1)",08:10,08:20,A,1
2,"(08:10, 08:20, 3)",08:10,08:20,B,3
3,"(08:20, 08:30, 2)",08:20,08:30,B,2
4,"(08:20, 08:30, 1)",08:20,08:30,C,1


Remarque: Il n'y a pas les dates à 08:00 et 08:15

## Exercice 2 : Streaming

Proposer une implémentation pour l'exercice Streaming (celui de l'ER1 2022) vu en cours.

## Exercice 3 : Mise en pratique Spark Streaming


In [121]:
import os

dir_name = "/content/input"
os.makedirs(dir_name, exist_ok=True)

checkpoint = "/content/checkpoint"
os.makedirs(checkpoint, exist_ok=True)

streaming_query = None

Clean the input source

In [122]:
# Vider l'input
!rm -f /content/input/*

np.random.seed(1)

NEXT_FILE_ID=1

def add_input_data(dir_name):
  global NEXT_FILE_ID
  nb_lines = 5
  with open(dir_name + f"/file_{NEXT_FILE_ID}.csv", "w") as f:
    content = [ f"{i} {a}\n" for i,a in enumerate(np.random.randint(1, 20, nb_lines))]
    for l in content:
      print(l)
    f.writelines(''.join(content))
    print("added file:", NEXT_FILE_ID)
    NEXT_FILE_ID += 1

print("function defined")

function defined


Define the input stream

In [123]:
schema = "ID int, note int"
input_stream = spark.readStream.schema(schema).option("delimiter", " ").option("maxFilesPerTrigger", 1).csv(dir_name)

Define a **query** on the input stream

In [124]:
query = input_stream.groupBy("ID").agg(avg("note").alias("moyenne")).orderBy("ID")
# print("Streaming is: ", query.isStreaming)

Define the **output** stream and **execute the streaming query**

In [None]:
if(streaming_query is not None and streaming_query.isActive):
  print("Stopping current streaming query before defining a new one.")
  streaming_query.stop()

streaming_query = query.writeStream.format("memory").queryName("Result").outputMode("complete").start()

print("Active stream:", streaming_query.isActive)

Stopping current streaming query before defining a new one.
Active stream: True


Produire des données d'entrée

In [None]:
add_input_data(dir_name)

0 6

1 12

2 13

3 9

4 10

added file: 1


Afficher le résultat de la requete de streaming

In [None]:
%%sql
select * from Result

Unnamed: 0,ID,moyenne
0,0,6.0
1,1,12.0
2,2,13.0
3,3,9.0
4,4,10.0


In [None]:
add_input_data(dir_name)

0 2

1 13

2 8

3 14

4 7

added file: 3


In [None]:
%%sql
select * from Result

Unnamed: 0,ID,moyenne
0,0,6.666667
1,1,10.333333
2,2,12.333333
3,3,8.0
4,4,11.333333


In [None]:
# streaming_query.lastProgress

### Question 1 : Watermark
Adapter l'exemple ci dessus et proposer une requete avec un regroupement par fenetre et une watermark

### Question 2: Jointure
Proposer une requete avec jointure entre un flux et une table

Divers exemples

In [None]:
!rm -rf /content/checkpoint
os.makedirs("/content/checkpoint", exist_ok=True)

# Effacer la sortie
!rm -rf /content/out
output_name = "/content/out"
os.makedirs(output_name, exist_ok=True)

# test
# streaming_query = input_stream.select("*").writeStream.format("memory").queryName("Result").outputMode("append").start()
# streaming_query = input_stream.select("*").writeStream.format("csv").option("checkpointLocation", checkpoint).outputMode("append").start(output_name)
