Tout d'abord, nous importons StreamingContext, qui est le principal point d'entrée de toutes les fonctionnalités de streaming. C'est lui qui va s'occuper de manager les streams de l'application.

In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

Nous créons ensuite un StreamingContext local avec deux threads d'exécution, et un intervalle de batch de 30 secondes.

In [2]:
sc = SparkContext("local[2]", "WordCount")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 30)

C'est maintenant que nous allons créer notre DStream. En utilisant notre StreamingContext, nous pouvons créer un DStream qui représente les données en continu à partir d'une source TCP, on spécifie donc localhost en nom d'hôte (netcat s'exécute sur la même machine que spark: dans le conteneur) et 8887 comme port (port défini lors du lancement de Netcat).

In [3]:
lines = ssc.socketTextStream("172.31.17.230", 8887)

Le DStream lines représente le stream d'event qui sera reçu du serveur de données. Chaque enregistrement de ce DStream est un message envoyé. Ensuite, nous voulons enlever les majuscules dans nos mots puis diviser les messages en mots.

In [4]:
words = lines.flatMap(lambda line: line.lower().split(" "))

flatMap va transformer le DStream initial contenant un flux de messages en un DStream contenant un flux de mot, ce sera mieux pour faire un MapReduce par la suite.

Le DStream de mots est donc ensuite mappé en un DStream de paires (mot, 1), qui est ensuite réduit pour obtenir la fréquence des mots dans chaque batch. Enfin, wordCounts.pprint() imprimera les 10 premières paires correspondant au word count toutes les 30 secondes.

In [5]:
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()

Pour lancer le traitement après que toutes les transformations aient été configurées, nous appelons finalement:

In [None]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2022-11-28 14:35:30
-------------------------------------------
('world!', 3)
('bienvenue', 1)
('à', 2)
('datascientest', 1)
('tous', 1)
('le', 4)
('est', 1)
('pour', 1)
('streaming', 2)
('tester', 1)
...

-------------------------------------------
Time: 2022-11-28 14:36:00
-------------------------------------------
('coucou', 1)
('que', 1)
('passa', 1)
('?', 1)
('el', 1)
('assim', 1)
('vosse', 1)
('mata', 1)
('la', 1)
('famille', 1)
...

-------------------------------------------
Time: 2022-11-28 14:36:30
-------------------------------------------
("j'aime", 1)
('les', 1)
('test', 4)
('aie', 3)
('caramba', 1)
('confetis', 1)
('et', 2)
('la', 1)
('confiture', 1)
('cornebidouille', 1)

