# Initialisation de Spark

In [1]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("Notebook").setMaster("local")
sc = SparkContext(conf=conf)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/27 18:08:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/27 18:08:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Une fonction pour simplifier l'accès aux données

In [2]:
def extract_field(s: str, field_number: int, sep: str = ';') -> str:
    fields = s.split(sep)
    return fields[field_number] if (field_number < len(fields)) else ""

In [3]:
print(extract_field("2;CASSIOPEE;2009;33;3", 0))
print(extract_field("2;CASSIOPEE;2009;33;3", 1))
print(extract_field("2;CASSIOPEE;2009;33;3", 2))
print(extract_field("2;CASSIOPEE;2009;33;3", 3))
print(extract_field("2;CASSIOPEE;2009;33;3", 4))
print(extract_field("2;CASSIOPEE;2009;33;3", 5))

2
CASSIOPEE
2009
33
3



# Charger les données
1. Créer le RDD `lignes` à partir du répertoire `prenoms_sample.txt`

In [4]:
lignes = sc.textFile("../data/prenoms_sample.txt")
print(lignes.take(10))

['2;MARIE-JOSÉ;1957;04;6', '2;MARIE-JOSÉ;1968;971;6', '2;MARIE-JOSÉ;1973;94;3', '2;MARIE-JOSEPHE;1955;75;10', '2;MARIE-LAURE;1959;49;4', '2;MARIE-LAURE;1965;87;14', '2;MARIE-LAURE;1982;50;14', '2;MARIE-LINE;1955;59;47', '2;MARIE-LINE;1955;77;6', '2;MARIE-LINE;1955;79;4']


# Transformer les lignes en prénoms
1. En appliquant la méthode `map`, créer le RDD `prenoms` à partir de `lignes`

In [5]:
prenoms = lignes.map(lambda l: (
    int(extract_field(l, 0)),
    extract_field(l, 1),
    int(extract_field(l, 2)),
    extract_field(l, 3),
    int(extract_field(l, 4))
))
for n in prenoms.take(10):
    print(n)

(2, 'MARIE-JOSÉ', 1957, '04', 6)
(2, 'MARIE-JOSÉ', 1968, '971', 6)
(2, 'MARIE-JOSÉ', 1973, '94', 3)
(2, 'MARIE-JOSEPHE', 1955, '75', 10)
(2, 'MARIE-LAURE', 1959, '49', 4)
(2, 'MARIE-LAURE', 1965, '87', 14)
(2, 'MARIE-LAURE', 1982, '50', 14)
(2, 'MARIE-LINE', 1955, '59', 47)
(2, 'MARIE-LINE', 1955, '77', 6)
(2, 'MARIE-LINE', 1955, '79', 4)


# Interroger les données
La documentation des méthodes d'un RDD est disponible ([RDD](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html)).

1. Rappeler ce que sont les *transformations* et les *actions*
1. Donner, pour chaque prénom, son nombre d'occurences (`map` et `reduceByKey`)

Transformation : transforme le RDD courant et retourne un nouveau RDD.  
Action : exécute le calcul d'une valeur sur un RDD.

In [6]:
from operator import add
prenoms.map(lambda t: (t[1], 1)).reduceByKey(add).collect()

[('MARIE-JOSÉ', 6),
 ('MARIE-PASCALE', 1),
 ('MARINETTE', 3),
 ('MARTHE', 1),
 ('MARYLINE', 3),
 ('MARYSE', 4),
 ('MAUD', 4),
 ('MAURICETTE', 2),
 ('MÉLANIE', 3),
 ('MÉLODIE', 1),
 ('MIA', 1),
 ('MICHELINE', 3),
 ('MINA', 2),
 ('MYLA', 2),
 ('MYRIAM', 5),
 ('NADEGE', 1),
 ('NADIA', 4),
 ('NADINE', 4),
 ('NAIMA', 1),
 ('NAOMY', 1),
 ('NASSIMA', 1),
 ('NATACHA', 4),
 ('NEJMA', 1),
 ('NELLY', 5),
 ('NESRINE', 3),
 ('NICOLLE', 1),
 ('NOELIE', 1),
 ('NOËLLE', 2),
 ('NORA', 6),
 ('NORAH', 3),
 ('NOURIA', 1),
 ('ODILE', 8),
 ('OLGA', 3),
 ('ORLANE', 1),
 ('ORNELLA', 1),
 ('PAOLA', 1),
 ('PASCALE', 2),
 ('PAULA', 1),
 ('PAULETTE', 7),
 ('PRISCILLIA', 1),
 ('RACHIDA', 1),
 ('REINE', 1),
 ('RENELIA', 1),
 ('ROMANE', 1),
 ('ROSA', 1),
 ('ROSE-MAY', 1),
 ('ROSELINE', 5),
 ('SALIHA', 1),
 ('SAMIA', 1),
 ('SANDRA', 7),
 ('SARA', 5),
 ('SELENA', 1),
 ('SELMA', 1),
 ('SÉVERINE', 4),
 ('SHAIMA', 1),
 ('SHARON', 3),
 ('SHERINE', 1),
 ('SHERLEY', 1),
 ('SIDONIE', 2),
 ('SILOÉ', 1),
 ('SILVIA', 1),
 ('SOH

1. Donner le nombre total de naissances avec un prénom féminin (`filter`, `map`, `reduce` ou `sum`)

In [7]:
prenoms.filter(lambda t : t[0] == 2).map(lambda t: t[4]).sum()

41593

1. Donner l'effectif maximal et minimal par prénom (`map`, `aggregateByKey`)

In [8]:
prenoms.map(lambda t: (t[1], t[4])).aggregateByKey(
    (float('inf'), float('-inf')),
    lambda acc, v: (min(acc[0], v), max(acc[1], v)),
    lambda acc1, acc2: (min(acc1[0], acc2[0]), max(acc1[1], acc2[1]))
).collect()

[('MARIE-JOSÉ', (3, 65)),
 ('MARIE-PASCALE', (4, 4)),
 ('MARINETTE', (5, 12)),
 ('MARTHE', (12, 12)),
 ('MARYLINE', (3, 8)),
 ('MARYSE', (5, 41)),
 ('MAUD', (3, 12)),
 ('MAURICETTE', (3, 9)),
 ('MÉLANIE', (4, 18)),
 ('MÉLODIE', (3, 3)),
 ('MIA', (20, 20)),
 ('MICHELINE', (28, 62)),
 ('MINA', (4, 4)),
 ('MYLA', (12, 13)),
 ('MYRIAM', (3, 18)),
 ('NADEGE', (51, 51)),
 ('NADIA', (3, 30)),
 ('NADINE', (9, 59)),
 ('NAIMA', (5, 5)),
 ('NAOMY', (3, 3)),
 ('NASSIMA', (6, 6)),
 ('NATACHA', (6, 62)),
 ('NEJMA', (3, 3)),
 ('NELLY', (7, 63)),
 ('NESRINE', (3, 6)),
 ('NICOLLE', (9, 9)),
 ('NOELIE', (3, 3)),
 ('NOËLLE', (6, 14)),
 ('NORA', (4, 13)),
 ('NORAH', (3, 11)),
 ('NOURIA', (7, 7)),
 ('ODILE', (5, 28)),
 ('OLGA', (8, 10)),
 ('ORLANE', (4, 4)),
 ('ORNELLA', (3, 3)),
 ('PAOLA', (3, 3)),
 ('PASCALE', (60, 101)),
 ('PAULA', (3, 3)),
 ('PAULETTE', (12, 85)),
 ('PRISCILLIA', (3, 3)),
 ('RACHIDA', (6, 6)),
 ('REINE', (6, 6)),
 ('RENELIA', (3, 3)),
 ('ROMANE', (3, 3)),
 ('ROSA', (5, 5)),
 ('ROSE-MAY

1. Sur le modèle des prénoms, charger les données des départements
1. Donner, pour chaque nom de département, le prénom le plus fréquent depuis l'année 2000

In [9]:
lignes = sc.textFile("../data/dpts.txt")
header = lignes.first()
lignes_sans_entete = lignes.filter(lambda l: l != header)

dpts = lignes_sans_entete.map(lambda l: (
    extract_field(l, 0, ","),
    int(extract_field(l, 1, ",")),
    extract_field(l, 2, ","),
    int(extract_field(l, 3, ",")),
    extract_field(l, 4, ","),
    extract_field(l, 5, ","),
    extract_field(l, 6, ",")
))

for n in dpts.take(10):
    print(n)

('01', 84, '01053', 5, 'AIN', 'Ain', 'Ain')
('02', 32, '02408', 5, 'AISNE', 'Aisne', 'Aisne')
('03', 84, '03190', 5, 'ALLIER', 'Allier', 'Allier')
('04', 93, '04070', 4, 'ALPES DE HAUTE PROVENCE', 'Alpes-de-Haute-Provence', 'Alpes-de-Haute-Provence')
('05', 93, '05061', 4, 'HAUTES ALPES', 'Hautes-Alpes', 'Hautes-Alpes')
('06', 93, '06088', 4, 'ALPES MARITIMES', 'Alpes-Maritimes', 'Alpes-Maritimes')
('07', 84, '07186', 5, 'ARDECHE', 'Ardèche', 'Ardèche')
('08', 44, '08105', 4, 'ARDENNES', 'Ardennes', 'Ardennes')
('09', 76, '09122', 5, 'ARIEGE', 'Ariège', 'Ariège')
('10', 44, '10387', 5, 'AUBE', 'Aube', 'Aube')


In [10]:
prenoms.filter(lambda t: int(t[2]) >= 2000) \
        .map(lambda t: ((t[3], t[1]), t[4])).reduceByKey(add) \
        .map(lambda t: (t[0][0], (t[0][1], t[1]))) \
        .reduceByKey(lambda a, b: a if a[1] >= b[1] else b) \
        .map(lambda t: (t[0], t[1][0])) \
        .join(dpts.map(lambda t: (t[0], t[6]))) \
        .map(lambda t: (t[1][1], t[1][0])) \
        .collect()

[("Côte-d'Or", 'ZOÉ'),
 ('Ain', 'ENZO'),
 ('La Réunion', '_PRENOMS_RARES'),
 ('Guadeloupe', 'ALYSSA'),
 ('Drôme', 'BAPTISTE'),
 ('Moselle', 'HUGO'),
 ('Yvelines', 'PAULINE'),
 ('Charente-Maritime', 'NOÉ'),
 ('Isère', 'ANAÏS'),
 ('Vaucluse', 'MANON'),
 ('Aube', '_PRENOMS_RARES'),
 ('Pyrénées-Atlantiques', 'BAPTISTE'),
 ("Côtes-d'Armor", 'GABIN'),
 ('Savoie', 'AMÉLIE'),
 ('Haute-Loire', 'LOAN'),
 ('Loire-Atlantique', 'HUGO'),
 ('Charente', 'CLÉMENT'),
 ('Marne', 'MAXIME'),
 ('Loiret', 'ELISA'),
 ('Aisne', 'LÉO'),
 ('Seine-et-Marne', 'LOUISE'),
 ('Finistère', 'AUDREY'),
 ('Gers', 'MATHIEU'),
 ('Ariège', 'CLARA'),
 ('Rhône', 'NATHAN'),
 ('Bas-Rhin', 'LAURIE'),
 ('Oise', 'ELOISE'),
 ('Morbihan', '_PRENOMS_RARES'),
 ('Loire', 'RAYAN'),
 ('Haute-Marne', 'CAMILLE'),
 ('Tarn-et-Garonne', 'CHIARA'),
 ('Tarn', 'DYLAN'),
 ('Hautes-Pyrénées', 'ZOÉ'),
 ('Vosges', 'HUGO'),
 ('Maine-et-Loire', 'QUENTIN'),
 ('Aveyron', 'MARGAUX'),
 ('Haute-Savoie', 'QUENTIN'),
 ('Vendée', 'LOLA'),
 ('Alpes-Maritimes', 