In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col
import pandas as pd
import matplotlib.pyplot as plt
import os, socket, traceback

# Detect cluster mode via env set in docker-compose
spark_master = os.environ.get('SPARK_MASTER')

# Resolve paths: in containers, repo is mounted at /data (RO), so CSVs live under /data/data
cwd = os.getcwd()
project_root = os.path.abspath(os.path.join(cwd, '..'))
local_data = os.path.join(project_root, 'data')
cluster_data = '/data/data'
cluster_shared = '/shared'

if spark_master:
    data_dir = cluster_data
    output_dir = cluster_shared
else:
    data_dir = local_data
    output_dir = os.path.join(project_root, 'output')

sample_csv = os.path.join(data_dir, 'sample.csv')
print('Mode:', 'cluster' if spark_master else 'local')
print('Reading sample from:', sample_csv)
print('Writing outputs to:', output_dir)

# Stop previous SparkSession if any
try:
    spark.stop()
except Exception:
    pass

# Prefer explicit env; in docker-compose, workers can resolve 'jupyter'
driver_host = os.environ.get('SPARK_DRIVER_HOST') or ('jupyter' if spark_master else socket.gethostname())

# Build SparkSession with safe defaults (UI disabled to avoid MetricsSystem issues)
builder = (
    SparkSession.builder
    .appName('notebook')
    .config('spark.ui.enabled', 'true')
)

if spark_master:
    print('Using cluster mode with master:', spark_master)
    print('Setting spark.driver.host ->', driver_host)
    builder = (
        builder
        .master(spark_master)
        .config('spark.driver.host', driver_host)
        .config('spark.driver.bindAddress', '0.0.0.0')
    )
else:
    print('No SPARK_MASTER -> using local[*]')
    builder = builder.master('local[*]')

try:
    spark = builder.getOrCreate()
except Exception as e:
    print('SparkSession.getOrCreate() failed:', e)
    traceback.print_exc()
    print('Retrying with event log disabled')
    builder = builder.config('spark.eventLog.enabled', 'false')
    spark = builder.getOrCreate()

sc = spark.sparkContext
print('Spark master:', sc.master)
print('Spark version:', spark.version)
print('spark.driver.host:', spark.conf.get('spark.driver.host'))


Mode: cluster
Reading sample from: /data/data/sample.csv
Writing outputs to: /shared
Using cluster mode with master: spark://spark-master:7077
Setting spark.driver.host -> jupyter


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/16 11:34:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark master: spark://spark-master:7077
Spark version: 4.0.0
spark.driver.host: jupyter


Look at you SPARK UI !!
Maybe -> localhost:4040

In [3]:
rdd=sc.parallelize(range(16),4)
print(rdd.getNumPartitions())
partitioned_data=rdd.glom().collect()
print(partitioned_data)
for i, part in enumerate(partitioned_data):
    print(f"Partition {i}: {part}")

4


[Stage 0:>                                                          (0 + 4) / 4]

[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
Partition 0: [0, 1, 2, 3]
Partition 1: [4, 5, 6, 7]
Partition 2: [8, 9, 10, 11]
Partition 3: [12, 13, 14, 15]


                                                                                

In [4]:
rdd2_filtered = rdd.filter(lambda x: x % 2 == 0)
print('Filtered RDD (even numbers):', rdd2_filtered.collect())
print('Original RDD still intact:', rdd.collect())
print('lineage (to debug):', rdd2_filtered.toDebugString().decode("utf-8"))

Filtered RDD (even numbers): [0, 2, 4, 6, 8, 10, 12, 14]
Original RDD still intact: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
lineage (to debug): (4) PythonRDD[2] at collect at /tmp/ipykernel_222/1110730522.py:2 []
 |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:297 []


In [5]:
rdd3_mapped = rdd2_filtered.map(lambda x: (x, x * 2)).setName('pairs_even')
print('Mapped RDD sample (x, x*2):', rdd3_mapped.take(10))
# Lineage du RDD mappé
dbg = rdd3_mapped.toDebugString()
print('lineage (to debug):', dbg.decode('utf-8') if hasattr(dbg, 'decode') else dbg)

Mapped RDD sample (x, x*2): [(0, 0), (2, 4), (4, 8), (6, 12), (8, 16), (10, 20), (12, 24), (14, 28)]
lineage (to debug): (4) pairs_even PythonRDD[4] at RDD at PythonRDD.scala:56 []
 |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:297 []


In [6]:
## is rdd3_mapped recomputed ??
print(rdd3_mapped.take(5))
print(rdd3_mapped.count())



[(0, 0), (2, 4), (4, 8), (6, 12), (8, 16)]
8


https://www.gutenberg.org/cache/epub/2701/pg2701.txt

In [29]:
# Téléchargement de Moby Dick (Project Gutenberg) vers data_dir
# -L pour suivre les redirections, -o pour choisir le chemin de sortie
!mkdir -p {data_dir} && curl -L -o {data_dir}/mobydick.txt https://www.gutenberg.org/cache/epub/2701/pg2701.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1246k  100 1246k    0     0  1402k      0 --:--:-- --:--:-- --:--:-- 1402k
100 1246k  100 1246k    0     0  1402k      0 --:--:-- --:--:-- --:--:-- 1402k


In [8]:
import re
moby_path = os.path.join(data_dir, 'mobydick.txt')

# RDD et tokenization (pattern compilé hors des lambdas)
pattern = re.compile(r"[a-z']+")

def tokenize(line):
    return pattern.findall(line.lower())

text_rdd = sc.textFile(moby_path)
words = text_rdd.flatMap(tokenize).filter(lambda w: len(w) > 0)
counts = words.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)

# Top 50 mots (par fréquence)
top50 = counts.takeOrdered(50, key=lambda x: -x[1])

# Affichage succinct (top 20)
print("Top 20 mots dans Moby Dick:")
for w, c in top50[:20]:
    print(f"{w}: {c}")

# Sauvegarde CSV dans output_dir (création si besoin)
os.makedirs(output_dir, exist_ok=True)
out_csv = os.path.join(output_dir, 'mobydick_wordcount.csv')

# Utilisation de pandas (importé dans le notebook) pour écrire un CSV lisible
wc_df = pd.DataFrame(top50, columns=['word', 'count'])
wc_df.to_csv(out_csv, index=False)
print("Résultats sauvegardés dans:", out_csv)



Top 20 mots dans Moby Dick:
the: 14727
of: 6746
and: 6514
a: 4805
to: 4709
in: 4244
that: 3100
it: 2537
his: 2532
i: 2127
he: 1900
s: 1827
but: 1822
with: 1770
as: 1753
is: 1748
was: 1647
for: 1644
all: 1544
this: 1441
Résultats sauvegardés dans: /shared/mobydick_wordcount.csv


                                                                                

How many Shuffle Read and Shuffle Write ??
Look in Spark UI, is it normal ??

### Pourquoi on ne voit pas `filter` dans la lineage ?

Avec l’API RDD en PySpark, les transformations Python (filter, map, flatMap, …) sont "enveloppées" dans des nœuds génériques `PythonRDD` côté JVM. La lineage montre donc des `PythonRDD` et leurs parents (ex: `ParallelCollectionRDD`) mais ne liste pas les noms de fonctions Python comme `filter`. 

Pour un plan plus explicite des opérateurs, utilisez l’API DataFrame (ex: `df.filter(...).explain('formatted')`).

In [13]:
# Démo DataFrame pour voir explicitement un Filter
from pyspark.sql import Row

df = spark.createDataFrame([Row(value=v) for v in range(16)])
filtered_df = df.filter((col('value') % 2) == 0)
print('Plan DataFrame avec Filter:')
filtered_df.explain('formatted')
print('\nAperçu:')
filtered_df.show(10, truncate=False, vertical=True)

Plan DataFrame avec Filter:
== Physical Plan ==
* Filter (2)
+- * Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [1]: [value#0L]
Arguments: [value#0L], MapPartitionsRDD[18] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter [codegen id : 1]
Input [1]: [value#0L]
Condition : (isnotnull(value#0L) AND ((value#0L % 2) = 0))



Aperçu:
== Physical Plan ==
* Filter (2)
+- * Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [1]: [value#0L]
Arguments: [value#0L], MapPartitionsRDD[18] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter [codegen id : 1]
Input [1]: [value#0L]
Condition : (isnotnull(value#0L) AND ((value#0L % 2) = 0))



Aperçu:
-RECORD 0----
 value | 0   
-RECORD 1----
 value | 2   
-RECORD 2----
 value | 4   
-RECORD 3----
 value | 6   
-RECORD 4----
 value | 8   
-RECORD 5----
 value | 10  
-RECORD 6----
 value | 12  
-RECOR

### Comment observer la recomputation

Même sans lineage détaillée, tu peux voir la recomputation:
- Jobs UI: chaque action (take, count, collect) déclenche un nouveau job. Tu verras plusieurs jobs successifs.
- Accumulateurs: instrumenter le filter/map pour incrémenter des compteurs, et comparer entre deux actions.
- Job groups: nommer les jobs pour les repérer facilement.
- Après persist/cache: refaire une action ne réincrémente pas les accumulateurs (pas de recomputation).


In [None]:
# Démo: observer la recomputation avec accumulateurs
passed_filter = sc.longAccumulator('passed_filter')
passed_map = sc.longAccumulator('passed_map')

def filt(x):
    if x % 2 == 0:
        passed_filter.add(1)
        return True
    return False

def mapper(x):
    passed_map.add(1)
    return (x, x*2)

base = sc.parallelize(range(1000), 4)
pipe = base.filter(filt).map(mapper)

sc.setJobGroup('recompute-1', 'Action take(5)')
print('take(5):', pipe.take(5))
print('accumulators after take -> filter:', passed_filter.value, 'map:', passed_map.value)

sc.setJobGroup('recompute-2', 'Action count()')
print('count():', pipe.count())
print('accumulators after count -> filter:', passed_filter.value, 'map:', passed_map.value)

# Maintenant on persiste pour éviter la recomputation
pipe_p = pipe.persist()

sc.setJobGroup('recompute-3', 'Action count() on cached')
print('count cached:', pipe_p.count())
print('accumulators after cached count -> filter:', passed_filter.value, 'map:', passed_map.value)

pipe_p.unpersist()
sc.clearJobGroup()