# Une fonction pour simplifier l'accès aux données

In [19]:
def extractField(s: String, fieldNumber: Int): String = {
    val fields = s.split(';')
    if (fieldNumber >= fields.length) "" else fields(fieldNumber)
}

extractField: (s: String, fieldNumber: Int)String


In [18]:
def extractFieldS(s: String, fieldNumber: Int): String = {
    val fields = s.split(',')
    if (fieldNumber >= fields.length) "" else fields(fieldNumber)
}

extractFieldS: (s: String, fieldNumber: Int)String


In [2]:
println(extractField("2;CASSIOPEE;2009;33;3", 0))
println(extractField("2;CASSIOPEE;2009;33;3", 1))
println(extractField("2;CASSIOPEE;2009;33;3", 2))
println(extractField("2;CASSIOPEE;2009;33;3", 3))
println(extractField("2;CASSIOPEE;2009;33;3", 4))
println(extractField("2;CASSIOPEE;2009;33;3", 5))

2
CASSIOPEE
2009
33
3



# Charger les données
1. Créer le RDD `lignes` à partir du répertoire `prenoms_sample.txt`

In [3]:
val lignes = sc.textFile("prenoms_sample.txt")
lignes.take(10).foreach(println)

2;MELISSANDRE;2006;54;3
2;MELLINA;2016;59;11
2;MELODY;1989;77;6
2;MÉLYNE;2014;68;5
2;MERCEDES;1966;75;10
2;MEREDITH;2002;974;7
2;MERIEM;1991;25;3
2;MERVE;1999;25;3
2;MERYL;2000;91;3
2;MICHÈLE;1942;59;204


lignes: org.apache.spark.rdd.RDD[String] = prenoms_sample.txt MapPartitionsRDD[1] at textFile at <console>:25


# Transformer les lignes en prénoms
1. En appliquant la méthode `map`, créer le RDD `prenoms` à partir de `lignes`

In [4]:
val prenoms = lignes.map(l => (
    extractField(l, 0).charAt(0),
    extractField(l, 1),
    extractField(l, 2).toInt,
    extractField(l, 3).toInt,
    extractField(l, 4).toDouble.toInt
))
prenoms.take(10).foreach(println)

(2,MELISSANDRE,2006,54,3)
(2,MELLINA,2016,59,11)
(2,MELODY,1989,77,6)
(2,MÉLYNE,2014,68,5)
(2,MERCEDES,1966,75,10)
(2,MEREDITH,2002,974,7)
(2,MERIEM,1991,25,3)
(2,MERVE,1999,25,3)
(2,MERYL,2000,91,3)
(2,MICHÈLE,1942,59,204)


prenoms: org.apache.spark.rdd.RDD[(Char, String, Int, Int, Int)] = MapPartitionsRDD[2] at map at <console>:28


# Interroger les données
La documentation des méthodes d'un RDD est disponible ([RDD](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), [PairRDDFunctions](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions)).

1. Rappeler ce que sont les *transformations* et les *actions*
1. Donner, pour chaque prénom, son nombre d'occurences (`map` et `reduceByKey`)

In [5]:
//une transformation retourne un nouvel RDD par transformation du RDD courant
// une action déclenche le calcul d’une valeur sur un RDD
val prenom = sc.textFile("prenoms.txt")
val prenomMap = prenom.map(l=> (extractField(l, 1),1))
val prenomReduce = prenomMap.reduceByKey((l,x)=> (l+x))
prenomReduce.take(20).foreach(println)


(JOVANE,1)
(SALAHDINE,3)
(JENNYFER,321)
(KONA,2)
(YUSSRA,1)
(NATIVA,1)
(MOHAMED-SAÏD,1)
(CAGLAR,2)
(HAFIZA,2)
(PAUL,10622)
(MAYEDINE,1)
(HORTENSE,1534)
(INAH,1)
(AUXENCE,80)
(AXÈLE,1)
(IVELISE,1)
(MAÏLYNN,1)
(FALLON,9)
(MARLYSE,105)
(HÉLIANE,1)


prenom: org.apache.spark.rdd.RDD[String] = prenoms.txt MapPartitionsRDD[4] at textFile at <console>:29
prenomMap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:30
prenomReduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:31


1. Donner le nombre total de naissances avec un prénom féminin (`filter`, `map`, `reduce` ou `sum`)

In [7]:
//TODO
prenoms.filter(l => l._1=='2').map(l=>l._5).reduce((x,y) => x+y)

res5: Int = 36604


1. Donner l'effectif maximal et minimal par prénom (`map`, `aggregateByKey`)

In [44]:
//TODO
//val prenomEffect = prenoms.map(t=>(t._2, t._5)).aggregateByKey((Int.MaxValue,Int.MinValue))(
//    {case((m,n),c)=>(n min c, n max c)}, {case((m,n),(m1,n1))=>(m min m1, n max n1)})
                                 
//prenomEffect.take(10).foreach(println)
val result = lignes.map(l => (extractField(l, 1),extractField(l, 4).toDouble.toInt))
            .aggregateByKey((Int.MaxValue,Int.MinValue))(
{case ((k,u),v) => (k min v, u max v)},{case ((k,u),(k1, u1)) => (k min k1, u max u1)},                
            )
result.take(10).foreach(println)

(ELÉA,(4,4))
(NAÏL,(5,5))
(BRICE,(6,28))
(MAROUANE,(4,4))
(ROSALIE,(9,9))
(ASSIA,(3,3))
(FAUVE,(4,4))
(JOËLLE,(3,12))
(JOSUE,(6,6))
(NICOLLE,(7,7))


result: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[56] at aggregateByKey at <console>:36


1. Sur le modèle des prénoms, charger les données des départements
1. Donner, pour chaque nom de département, le prénom le plus fréquent depuis l'année 2000

In [33]:
//TODO
val nom = lignes.map(l => (extractField(l, 3),extractField(l, 1),extractField(l, 2).toInt))
val depart = sc.textFile("dpts.txt")
val departement = depart.map(t => (extractFieldS(t, 0),extractFieldS(t, 6)))
val join = departement.join(nom, on="dpt")
//departement.take(10).foreach(println)

<console>: 39: error: overloaded method value join with alternatives:

In [49]:
//TODO
//val nom = prenoms.filter(l=>l._3>=2000).map(l=>(l._4,(l._2,l._5)))
//val dp = dpts.map(l=>(l._2,l._5))
//val join = nom.join(nom)
//val nbre = join.map(l=>(l._2._1,(l._2._2._1,l._2._2._2)))
//val result = nbre.aggregateByKey(("",Int.MinValue)) ({case((m,n),(v1,v2))=>(if(n>v2) m else v1 , n max v2)},
//          {case((m,n),(v1,v2))=>(if(n>v2) m else v1 , n max v2)})
//result.take(10).foreach(println)
var maxValue = (k:(String,Int), v:(String,Int)) => if ((k._2 max v._2)==k._2 )k else v
var freqByDept= (prenoms.
filter (p=> ((p._3)>1999)).
map(p => ((p._2, p._4),p._5)).
reduceByKey(_+_).
map(p=>(p._1._2, (p._1._1,p._2))).
aggregateByKey(("NONE",Int.MinValue))(maxValue,maxValue)
)
val depart = sc.textFile("dpts.txt")
val result= depart.join(freqByDept).map(d =>(d._2._1._5, (d._2._2._1, d._2._2._2)))
result.take(10).foreach(println)


<console>: 48: error: value join is not a member of org.apache.spark.rdd.RDD[String]