# Initialisation de Spark

In [1]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("Notebook").setMaster("local")
sc = SparkContext(conf=conf)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/04 04:47:08 WARN Utils: Your hostname, papamor-ROG-Zephyrus-G14-GA401IV-GA401IV, resolves to a loopback address: 127.0.1.1; using 172.16.6.49 instead (on interface wlp2s0)
25/11/04 04:47:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/04 04:47:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Une fonction pour simplifier l'accès aux données

In [2]:
def extract_field(s: str, field_number: int, sep: str = ';') -> str:
    fields = s.split(sep)
    return fields[field_number] if (field_number < len(fields)) else ""

In [3]:
print(extract_field("2;CASSIOPEE;2009;33;3", 0))
print(extract_field("2;CASSIOPEE;2009;33;3", 1))
print(extract_field("2;CASSIOPEE;2009;33;3", 2))
print(extract_field("2;CASSIOPEE;2009;33;3", 3))
print(extract_field("2;CASSIOPEE;2009;33;3", 4))
print(extract_field("2;CASSIOPEE;2009;33;3", 5))

2
CASSIOPEE
2009
33
3



# Charger les données
1. Créer le RDD `lignes` à partir du répertoire `prenoms_sample.txt`

In [4]:
lignes = sc.textFile("prenoms_sample.txt")
print(lignes.take(10))

[Stage 0:>                                                          (0 + 1) / 1]

['2;MÉLISSA;2011;71;3', '2;MÉLODIE;1996;93;3', '2;MERIEM;1999;13;4', '2;MERYAM;2014;69;3', '2;MIA;2007;94;6', '2;MIA;2017;92;60', '2;MIA;2019;14;23', '2;MICHÈLE;1951;64;48', '2;MICHÈLE;1954;03;35', '2;MICHÈLE;1967;37;3']


                                                                                

In [5]:
lignes.take(20)
lignes.count()
lignes.first()
lignes.map(lambda ligne: extract_field(ligne, 1)).take(10)

['MÉLISSA',
 'MÉLODIE',
 'MERIEM',
 'MERYAM',
 'MIA',
 'MIA',
 'MIA',
 'MICHÈLE',
 'MICHÈLE',
 'MICHÈLE']

# Transformer les lignes en prénoms
1. En appliquant la méthode `map`, créer le RDD `prenoms` à partir de `lignes`

In [6]:
prenoms = lignes.map(lambda l: (
    extract_field(l, 0)[0],
    extract_field(l, 1),
    int(extract_field(l, 2)),
    int(extract_field(l, 3)),
    int(extract_field(l, 4))
))
for n in prenoms.take(10):
    print(n)

('2', 'MÉLISSA', 2011, 71, 3)
('2', 'MÉLODIE', 1996, 93, 3)
('2', 'MERIEM', 1999, 13, 4)
('2', 'MERYAM', 2014, 69, 3)
('2', 'MIA', 2007, 94, 6)
('2', 'MIA', 2017, 92, 60)
('2', 'MIA', 2019, 14, 23)
('2', 'MICHÈLE', 1951, 64, 48)
('2', 'MICHÈLE', 1954, 3, 35)
('2', 'MICHÈLE', 1967, 37, 3)


# Interroger les données
La documentation des méthodes d'un RDD est disponible ([RDD](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html)).

1. Rappeler ce que sont les *transformations* et les *actions*
1. Donner, pour chaque prénom, son nombre d'occurences (`map` et `reduceByKey`)

In [9]:
#TODO
counts_par_prenom = prenoms.map(lambda p: (p[1], p[4])) \
                           .reduceByKey(lambda a, b: a + b)
for n in counts_par_prenom.take(10):
    print(n)

('MÉLODIE', 3)
('MICHELINE', 196)
('MICHELLE', 31)
('MIREILLE', 302)
('MIREN', 5)
('MURIEL', 73)
('MYLÈNE', 9)
('NADEGE', 31)
('NAIMA', 5)
('NAJWA', 3)


In [10]:
# afficher les 20 prénoms les plus fréquents (tri descendant par effectif)
top20 = counts_par_prenom.map(lambda kv: (kv[1], kv[0])) \
                         .sortByKey(ascending=False) \
                         .map(lambda kv: (kv[1], kv[0])) \
                         .take(20)
for n in top20:
    print(n)

('JEAN', 2086)
('_PRENOMS_RARES', 1528)
('MICHEL', 1426)
('MARIE', 1343)
('PIERRE', 1003)
('STÉPHANIE', 878)
('PHILIPPE', 804)
('STÉPHANE', 792)
('MADELEINE', 776)
('PAUL', 743)
('LOUIS', 720)
('HENRI', 673)
('DENISE', 583)
('JACQUELINE', 569)
('ANNICK', 540)
('SANDRINE', 531)
('ROGER', 506)
('MONIQUE', 499)
('THIERRY', 488)
('ERIC', 483)


1. Donner le nombre total de naissances avec un prénom féminin (`filter`, `map`, `reduce` ou `sum`)

In [11]:
#TODO
total_feminines = prenoms.filter(lambda p: p[0] == '2') \
                         .map(lambda p: p[4]) \
                         .sum()
print("Total naissances (femmes) :", total_feminines)

Total naissances (femmes) : 35716


1. Donner l'effectif maximal et minimal par prénom (`map`, `aggregateByKey`)

In [12]:
#TODO
name_counts = prenoms.map(lambda p: (p[1], p[4]))

zero = (float("inf"), 0)  # (min, max)
def seq_op(acc, v):
    return (min(acc[0], v), max(acc[1], v))
def comb_op(a, b):
    return (min(a[0], b[0]), max(a[1], b[1]))

min_max_par_prenom = name_counts.aggregateByKey(zero, seq_op, comb_op)

# afficher un extrait
for name, (mn, mx) in min_max_par_prenom.take(20):
    print(name, "min:", mn, "max:", mx)

MÉLODIE min: 3 max: 3
MICHELINE min: 3 max: 105
MICHELLE min: 5 max: 17
MIREILLE min: 4 max: 232
MIREN min: 5 max: 5
MURIEL min: 3 max: 24
MYLÈNE min: 3 max: 6
NADEGE min: 9 max: 22
NAIMA min: 5 max: 5
NAJWA min: 3 max: 3
NAWELLE min: 3 max: 3
NEILA min: 11 max: 11
NELL min: 5 max: 5
NICOLE min: 4 max: 128
NINA min: 10 max: 31
NOÉLINE min: 7 max: 7
NOÉMIE min: 4 max: 22
NORIA min: 4 max: 6
NOUR min: 6 max: 6
ODETTE min: 22 max: 49


In [13]:
name_counts = prenoms.map(lambda p: (p[1], p[4]))
name_counts.groupByKey() \
           .mapValues(lambda counts: (min(counts), max(counts))) \
           .take(20)  # afficher un extrait

[('MÉLODIE', (3, 3)),
 ('MICHELINE', (3, 105)),
 ('MICHELLE', (5, 17)),
 ('MIREILLE', (4, 232)),
 ('MIREN', (5, 5)),
 ('MURIEL', (3, 24)),
 ('MYLÈNE', (3, 6)),
 ('NADEGE', (9, 22)),
 ('NAIMA', (5, 5)),
 ('NAJWA', (3, 3)),
 ('NAWELLE', (3, 3)),
 ('NEILA', (11, 11)),
 ('NELL', (5, 5)),
 ('NICOLE', (4, 128)),
 ('NINA', (10, 31)),
 ('NOÉLINE', (7, 7)),
 ('NOÉMIE', (4, 22)),
 ('NORIA', (4, 6)),
 ('NOUR', (6, 6)),
 ('ODETTE', (22, 49))]

1. Sur le modèle des prénoms, charger les données des départements
1. Donner, pour chaque nom de département, le prénom le plus fréquent depuis l'année 2000

In [18]:
#TODO

# ...existing code...
# Charger les départements et calculer, pour chaque département, le prénom le plus fréquent depuis 2000

from operator import add

# charger et parser dpts.txt en (code_dept:int_or_str, nom_dept)
dpts = sc.textFile("dpts.txt") \
         .map(lambda l: l.strip()) \
         .filter(lambda l: l != "")

def parse_dpt(line):
    for sep in [';', ',']:
        if sep in line:
            parts = [p.strip() for p in line.split(sep)]
            break
    else:
        parts = [p.strip() for p in line.split()]
    code = parts[0] if len(parts) > 0 else ""
    name = parts[4] if len(parts) > 1 else ""
    # essayer de convertir le code en int pour matcher prenoms (qui utilise int pour le dept)
    try:
        code_int = int(code)
        return (code_int, name)
    except:
        return (code, name)

dpt_pairs = dpts.map(parse_dpt)  # (dept_code, dept_name)

# agréger les effectifs par (dept, prenom) pour les années >= 2000
counts_by_dept_name = prenoms \
    .filter(lambda p: p[2] >= 2000) \
    .map(lambda p: ((p[3], p[1]), p[4])) \
    .reduceByKey(add)  # ((dept_code, prenom), total)

# passer à (dept_code, (prenom, total))
dept_name_totals = counts_by_dept_name.map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))

# pour chaque département, garder le (prenom, total) maximal
top_name_by_dept = dept_name_totals.reduceByKey(lambda a, b: a if a[1] >= b[1] else b)
# top_name_by_dept : (dept_code, (prenom, total))

# joindre avec les noms des départements et afficher
result = top_name_by_dept.join(dpt_pairs) \
         .map(lambda kv: (kv[1][1], (kv[1][0][0], kv[1][0][1]))) \
         .collect()

for dept_name, (prenom, total) in sorted(result):
    print(dept_name, ":", prenom, "-", total)
# ...existing code...

AIN : LORENZO - 6
AISNE : CHARLIE - 6
ALLIER : TONY - 5
ALPES DE HAUTE PROVENCE : JULIE - 3
ALPES MARITIMES : MATTÉO - 80
ARDECHE : SACHA - 17
ARDENNES : CHLOÉ - 19
ARIEGE : OCÉANE - 10
AUBE : TIM - 6
AUDE : GABRIEL - 26
AVEYRON : BASTIEN - 8
BAS RHIN : NATHAN - 99
BOUCHES DU RHONE : MATHIS - 81
CALVADOS : LINA - 30
CANTAL : LÉA - 15
CHARENTE : JULIE - 7
CHARENTE MARITIME : LUCAS - 51
CHER : ADAM - 10
CORREZE : ANGÈLE - 3
COTE D OR : MANON - 42
COTES D ARMOR : NOAH - 44
CREUSE : LOUIS - 4
DEUX SEVRES : CHLOÉ - 44
DORDOGNE : LÉA - 23
DOUBS : EVA - 26
DROME : FLORA - 8
ESSONNE : LAURA - 96
EURE : LOUNA - 21
EURE ET LOIR : KHADIJA - 8
FINISTERE : JULIETTE - 23
GARD : THÉO - 74
GERS : _PRENOMS_RARES - 22
GIRONDE : _PRENOMS_RARES - 402
GUADELOUPE : SAMUEL - 24
GUYANE : LAURA - 8
HAUT RHIN : SACHA - 29
HAUTE GARONNE : MATTÉO - 50
HAUTE LOIRE : SAMUEL - 4
HAUTE SAONE : ARTHUR - 8
HAUTE SAVOIE : EMMA - 104
HAUTE VIENNE : ILAN - 4
HAUTES ALPES : ENZO - 18
HAUTS DE SEINE : THOMAS - 163
HERAULT :

In [19]:
dpt_pairs.take(10)

[('dep', 'ncc'),
 (1, 'AIN'),
 (2, 'AISNE'),
 (3, 'ALLIER'),
 (4, 'ALPES DE HAUTE PROVENCE'),
 (5, 'HAUTES ALPES'),
 (6, 'ALPES MARITIMES'),
 (7, 'ARDECHE'),
 (8, 'ARDENNES'),
 (9, 'ARIEGE')]

In [50]:
#TODO

[(68, ('ANAS', 4)), (22, ('DAVID', 10)), (91, ('MALIK', 4)), (61, ('ENOLA', 4)), (36, ('LÉA', 29)), (30, ('LÉNA', 22)), (94, ('FRANCK', 7)), (6, ('ISABELLA', 3)), (52, ('MATHEO', 8)), (29, ('NOLAN', 37))]
[(1, ('LINA', 8)), (71, ('ETHAN', 29)), (43, ('JULIA', 7)), (75, ('VALENTIN', 121)), (73, ('BAPTISTE', 18)), (37, ('LOUISE', 56)), (84, ('JULIE', 47)), (19, ('DAMIEN', 8)), (6, ('ZOÉ', 26)), (44, ('LILOU', 97))]
[(38, 'ISERE'), (46, 'LOT'), (94, 'VAL DE MARNE'), (18, 'CHER'), (52, 'HAUTE MARNE'), (14, 'CALVADOS'), (40, 'LANDES'), (3, 'ALLIER'), (37, 'INDRE ET LOIRE'), (85, 'VENDEE')]
('AIN', ('LINA', 8))
('AISNE', ('MATHIS', 55))
('ALLIER', ('SAMY', 3))
('ALPES DE HAUTE PROVENCE', ('SARAH', 6))
('ALPES MARITIMES', ('ZOÉ', 26))
('ARDECHE', ('ELOÏSE', 5))
('ARDENNES', ('JULIE', 33))
('ARIEGE', ('KENZO', 3))
('AUBE', ('MAELYS', 11))
('AUDE', ('SAMUEL', 3))
('AVEYRON', ('JADE', 19))
('BAS RHIN', ('CLARA', 136))
('BOUCHES DU RHONE', ('MATTÉO', 145))
('CALVADOS', ('ENZO', 83))
('CHARENTE', 