# Des classes pour représenter les données

## Pour les prénoms

In [4]:
case class Prenom(sexe: String, prenom: String, annee: Int, codeDept: Int, nombre: Int)

In [5]:
val ariane = Prenom("2", "ARIANE", 2007, 91, 10)
val cassiopee = Prenom("2", "CASSIOPEE", 2009, 91, 3)
val helios = Prenom("1", "HELIOS", 2012, 91, 1)
println(s"$ariane, $cassiopee, $helios")

Prenom(2,ARIANE,2007,91,10), Prenom(2,CASSIOPEE,2009,91,3), Prenom(1,HELIOS,2012,91,1)

## Pour les départements

In [18]:
case class Dept(region: Int, codeDept: String, chefLieu: String, typeNom: Int, nom: String, nomEnrichi: String)

In [20]:
val puyDeDome = Dept(84, "63", "63113", 2, "PUY-DE-DOME", "Puy-de-Dôme")
println(puyDeDome)

Dept(84,63,63113,2,PUY-DE-DOME,Puy-de-Dôme)


# Chargement des données

In [21]:
// Pour les conversions implicites de RDDs vers DataFrames
val sparkRO = spark // bricolage pour que cela fonctionne dans le notebool (inutile sinon)
import sparkRO.implicits._

## Chargement des prénoms

In [22]:
val lignes = spark.sparkContext.textFile("prenoms_sample.txt")
val prenomsRDD = lignes.map(_.split('\t')).map(a => Prenom(a(0), a(1), a(2).toInt, a(3).toInt, a(4).toDouble.toInt))
val prenoms = prenomsRDD.toDS()
prenoms.show() // échoue ici mais fonctionne dans spark-shell ?!?

Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost, executor driver): java.lang.ClassCastException: $line23.$read$$iw$$iw$Prenom cannot be cast to $line23.$read$$iw$$iw$Prenom
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.

## Chargement des départements

In [25]:
val lignesDept = spark.sparkContext.textFile("dpts.txt").filter(l => l.startsWith("REGION") == false)
val deptsRDD = lignesDept.map(_.split('\t')).map(a => Dept(a(0).toInt, a(1), a(2), a(3).toInt, a(4), a(5)))
val depts = deptsRDD.toDS()
depts.show() // échoue ici mais fonctionne dans spark-shell ?!?

Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): java.lang.ClassCastException: $line109.$read$$iw$$iw$Dept cannot be cast to $line109.$read$$iw$$iw$Dept
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.ap

# Sauve les données au format parquet

## Les prénoms partitionnés par départements et années et compressés (Snappy)

In [28]:
prenoms.write.partitionBy("codeDept", "annee").format("parquet").save("prenomsParDeptsEtAnnees.parquet")

Name: org.apache.spark.sql.AnalysisException
Message: path file:/home/jovyan/notebooks/prenomsParDeptsEtAnnees.parquet already exists.;
StackTrace: org.apache.spark.sql.AnalysisException: path file:/home/jovyan/notebooks/prenomsParDeptsEtAnnees.parquet already exists.;
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:106)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
  at org.apach

## Les prénoms partitionnés par départements et années et compressés (gzip)

In [30]:
prenoms.write.partitionBy("codeDept", "annee").option("compression", "gzip").format("parquet").save("prenomsParDeptsEtAnnees.gzip.parquet")

Name: org.apache.spark.SparkException
Message: Job aborted.
StackTrace:   at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
  at org.apache.spark.sql.execution.com

## Les départements

In [31]:
depts.write.format("parquet").save("depts.parquet")

Name: org.apache.spark.SparkException
Message: Job aborted.
StackTrace: org.apache.spark.SparkException: Job aborted.
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.sca