<a href="https://colab.research.google.com/github/simosaoudi/BigDataLabs/blob/main/TP_Spark_Streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



In [2]:
# Création du fichier source pour simulé par un fichier alimenté aléatoirement
with open("data_stream.txt", "w") as f:
    f.write("")

In [3]:
# Alimentation du fichier en continu (flux simulé)
import time
import threading
import random

messages = [
    "spark streaming dstream",
    "spark spark streaming",
    "big data spark"
]

def generate_stream():
    while True:
        with open("data_stream.txt", "a") as f:
            f.write(random.choice(messages) + "\n")
        time.sleep(2)

threading.Thread(target=generate_stream, daemon=True).start()
# Une ligne est ajoutée toutes les 2 secondes pour simuler un flux temps réel.


In [4]:
# Création du SparkContext et du StreamingContext

# SparkContext : point d’entrée Spark
from pyspark import SparkContext

#StreamingContext : gère le streaming
from pyspark.streaming import StreamingContext

sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, 5)
# 5 secondes correspond à la durée du micro-batch




In [5]:
# Lecture des données sous forme de DStream

# Spark surveille le dossier /content et lit les nouvelles données comme un DStream.
lines = ssc.textFileStream("/content")

In [6]:
# Application du WordCount en streaming

# Découpage des lignes en mots
words = lines.flatMap(lambda line: line.split(" "))

# Transformation en paires (mot, 1)
pairs = words.map(lambda word: (word, 1))

# Calcul du nombre d’occurrences par mot
counts = pairs.reduceByKey(lambda a, b: a + b)


In [7]:
# Affichage des résultats dans la console

# Les résultats sont affichés toutes les 5 secondes.
counts.pprint()

In [8]:
# Démarrage du Streaming

# Le streaming s’exécute pendant 30 secondes.
ssc.start()
ssc.awaitTerminationOrTimeout(30)
ssc.stop(stopSparkContext=False)

-------------------------------------------
Time: 2026-01-16 17:23:25
-------------------------------------------
('streaming', 104)
('dstream', 51)
('big', 52)
('spark', 209)
('data', 52)

-------------------------------------------
Time: 2026-01-16 17:23:30
-------------------------------------------

-------------------------------------------
Time: 2026-01-16 17:23:35
-------------------------------------------

-------------------------------------------
Time: 2026-01-16 17:23:40
-------------------------------------------

-------------------------------------------
Time: 2026-01-16 17:23:45
-------------------------------------------

-------------------------------------------
Time: 2026-01-16 17:23:50
-------------------------------------------



In [None]:
# Qu’est-ce qu’un micro-batch dans DStream ?
# Un micro-batch est un petit lot de données collectées pendant un intervalle de temps fixe et traité comme un batch classique.

In [None]:
# Sur quelle structure repose un DStream ?
# Un DStream repose sur une séquence de RDD.

In [None]:
# Quelle est la durée du batch utilisée ?
# ssc = StreamingContext(sc, 5)
# la durée du batch utilisée est 5 seconde