In [1]:
println("Hello World!")

Hello World!


In [2]:
new java.io.File("/home/jovyan/work/data/shakespeare").list

Array(merrywivesofwindsor, twelfthnight, midsummersnightsdream, loveslabourslost, asyoulikeit, comedyoferrors, muchadoaboutnothing, tamingoftheshrew)

# Ce qu'il faut de Scala pour Spark

Dean Wampler, Ph.D. [@deanwampler](http://twitter.com/deanwampler) ([email](mailto:deanwampler@gmail.com))

Bienvenue. Ce notebook vous enseigne les principes fondamentaux de [Scala](http://scala-lang.org) nécessaires à l'utilisation de l'API Scala d'[Apache Spark](http://spark.apache.org). Spark utilise les meilleures fonctionnalités de Scala et évite d'utiliser celles qui sont plus difficiles et obscures.

## Introduction : Pourquoi Scala ?
Spark vous laisse le choix entre Scala, Java, Python, R et SQL pour vos travaux. Les _data engineers_ préfèreront Scala, Java ou SQL et ce sont eux qui sont chargés de construire des infrastructures scalables et résilientes pour le _Big Data_. Les _data scientists_ vont préferer Python, R et SQL et vont pouvoir construire des modèles pour analyser la donnée à l'aide du machine learning. Ils utiliseront aussi du SQL lors de l'exploration des données.

Bien sûr certains data engineers ne se limiteront pas à Scala et Java et pourront aussi utiliser Python et R et vice-versa.

Quelques avantages à utiliser Scala pour Spark :
* **Performance:** Spark étant écrit en Scala, vous obtiendrez une meilleure performance et une API plus complète en utilisant Scala. Il est vrai qu'avec les [DataFrames](http://spark.apache.org/docs/latest/sql-programming-guide.html), la performance ne dépend pas du langage choisi. Si vous avez besoin d'utiliser les [RDD](http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds), alors Scala vous donnera de meilleures performances avec Java arrivant en deuxième position.
* **Debugging:** Quand vous êtes face à une erreur de runtime, comprendre la stack d'erreur vous sera plus facile si vous connaissez Scala.
* **Code concis et expressif:** Comparé à Java, coder en Scala est beaucoup plus concis. Cela vous permet d'être plus productif et de pouvoir écrire vos idées en code sans avoir à batailler avec des API moins flexibles qui reflètent des contraintes de langage idiomatiques (vous pourrez observer cela au fur et à mesure)
* **Type Safety:** Comparé à Python et R, coder en Scala vous permet de bénéficier du _static typing_ avec un _type inference_. _Static typing_ signifie que le parser Scala pourra trouver beaucoup plus d'erreur dans vos expressions lors de la compilation au lieu de les découvrir au runtime. Cependant le _type inference_ vous permet de ne pas à avoir à écrire explicitement certaines informations sur le type car ce sera Scala qui s'en chargera pour vous.

### Pourquoi pas Scala ?
Scala n'est pas un langage parfait, deux désavantages :
* **Librairies:** Python et R ont tous les deux un riche ecosystème de librairies dédiées à l'analyse de données. Bien que Scala s'améliore sur ce point, Python et R restent devant.
* **Fonctionnalités avancées:** Maîtriser des fonctionnalités avancées d'un langage vous donne beaucoup de possibilités dans l'élaboration de vos programmes. Mais si vous ne comprenez pas ces fonctionnalités, elles peuvent vous rendre la tâche plus ardue tant bien que votre unique but était de finir votre tâche coûte que coûte. Scala a des concepts compliqués, particulièrement dans son _type system_. Heureusement, Spark cache ces concepts avancés.

### Pour plus en savoir plus sur Scala
Nous ne pourrons qu'aborder qu'une toute petite partie des concepts de Scala ici. Vous en apprendrez assez pour les utiliser mais éventuellement vous allez devoir en savoir un peu plus.

Quand vous aurez besoin de plus d'informations, allez voir ces ressources :

* [Programming Scala, Second Edition](http://shop.oreilly.com/product/0636920033073.do): Introduction à Scala
* [Scala Language Website](http://scala-lang.org/): Comment télécharger Scala, où trouver de la documentation(e.g., [Scaladoc](http://www.scala-lang.org/api/current/#package): documentation de la librairie Scala comme [Javadocs](https://docs.oracle.com/javase/8/docs/api/)), et autres informations.
* [Lightbend Scala Services](http://www.lightbend.com/services/) training, consulting, et support pour Scala.
* [Lightbend Fast Data Platform](http://www.lightbend.com/fast-data-platform/): Notre nouvelle distribution pour la _Fast Data_ (stream processing), incluant Spark, Flink, Kafka, Akka Streams, Kafka Streams, HDFS, et notre outil de gestion de production et de monitoring, tournant sur Mesosphere DC/OS.

Pour l'instant je vous recommande d'ouvrir la page du Scaladoc et de l'API Scala pour Spark. Vous pouvez y accéder en cliquant sur ces deux liens :
* Scaladocs for <a href="http://www.scala-lang.org/api/current/#package" target="scala_scaladocs">Scala</a>.
* Scaladocs for <a href="http://spark.apache.org/docs/latest/api/scala/index.html#package" target="spark_scaladocs">Spark</a>.

> **Astuce pour utiliser Scaladoc:**
* Utilisez la barre de recherche dans le corner en haut à gauche pour trouver un _type_ particulier. (Par exemple, essayez de trouver RDD) 
* Pour chercher une _méthode_ particulière, cliquez sur un caractère en dessous de la barre de recherche pour la première lettre de la méthode et scrollez y.

### Pré-requis
Sera assumé une expérience avec un langage de programmation quelconque ainsi qu'une familiarité avec Java mais si vous ne connaissez pas Java, une simple recherche vous suffira à comprendre.
Ceci n'est pas une introduction à Spark, une expérience avec Spark est utile mais certains concepts seront expliqués brièvement.

Tout au long vous pourrez trouver plus d'informations sur les sujets importants.

## A propos des Notebooks
Vous utilisez [Jupyter](http://jupyter.org/) [All Spark Notebook Docker image](https://hub.docker.com/r/jupyter/all-spark-notebook/). Comme décrit dans le [GitHub README](https://github.com/deanwampler/JustEnoughScalaForSpark) vous importez ce notebook dans Jupyter qui tourne dans un container Docker.

Les notebooks vous permettent de faire un mélange de documentation comme cette [Markdown](https://daringfireball.net/projects/markdown/) "cellule", avec des cellules qui contiennent du code, des graphs, etc. Dans la vie réelle, cela correspondrait à un cahier qu'un étudiant ou scientifique pourrait utiliser lorsqu'il travaille en laboratoire.

Le menu et la barre d'outils en haut vous donne des options pour évaluer une cellule, créer et supprimer des cellules, etc. Pensez à retenir les principaux raccourcis car vous les utiliserez souvent.

> **Astuces:**

> Vous pouvez utiliser le menu _Help > Keyboard Shortcuts_, puis capturer la page en tant qu'image. Apprenons quelques raccourcis chaque jour.

> Pour le moment, sachez juste que vous pouvez cliquer dans n'importe quelle cellule pour changer de focus. Quand vous êtes dans une cellule, `shift+enter` évalue la cellule (parcourt la cellule et affiche le Markdown ou bien fait tourner le code), puis passe à la cellule suivante. Essayez cela pour quelques cellules.

Une fonctionnalité utile est de pouvoir éditer une cellule que vous avez lancé précedemment et la relancer. C'est très utile lorsque vous expérimentez avec votre code.

### L'environment
Configurons l'environnement pour qu'il nous montre toujours les types des expressions.

In [3]:
%showTypes on

Types will be printed.


Quand vous commencez ce notebook, le plugin Jupyter Spark plugin crée un [SparkContext](http://spark.apache.org/docs/latest/programming-guide.html#initializing-spark) pour vous. C'est le point d'entrée de n'importe quelle application Spark (quand bien même vous utilisez la plus récente `SparkSession`). Ce SparkContext ou SparkSession sait comment se connecter à votre cluster (ou tourner localement sur la même JVM), comment configuer les propriétés, etc. Il lance aussi une interface web qui vous permet de monitorer vos jobs lancés. L'instance du `SparkContext` est appelée `sc`. La prochaine cellule confirme simplement qu'il existe.

In [4]:
sc

org.apache.spark.SparkContext = org.apache.spark.SparkContext@190942b2


Quelques informations utiles :

In [5]:
println("Spark version:      " + sc.version)
println("Spark master:       " + sc.master)
println("Running 'locally'?: " + sc.isLocal)

Spark version:      2.4.0
Spark master:       local[*]
Running 'locally'?: true


## Chargeons des données (et commençons à apprendre le Scala)
Nous allons écrire de vrai programmes Spark et les utiliser pour apprendre Scala et Spark en même temps. 

Mais tout d'abord, nous allons avoir besoin d'utiliser quelques fichiers texte qui contiennent des pièces de Shakespeare. Les prochaines cellules définissent des méthodes helper pour configurer le tout. Nous apprendrons des concepts Scala à la volée.

> **Note:** "méthode" vs. "fonction"

> Scala utilise une convention commune provenant l'orienté objet, où le terme _méthode_ est utilisé pour une fonction qui est attachée à une classe ou instance. Contrairement à Java, tout du moins avant Java 8, Scala possède aussi des _fonctions_ qui ne sont pas associés à une classe ou instance particulière.

> Dans notre prochain exemple de code, nous définissons plusieurs _méthodes_ helper pour afficher des informations, mais vous ne verrez pas de définition de classe ici. Ainsi, quelle est la classe associée à ces méthodes ? Quand vous utilisez Scala dans un notebook, vous utilisez en fait l'interpréteur Scala, qui prend chaque expression et définition que nous avons écrit et les met dans une classe cachée. L'interpréteur fait cela pour générer du byte code valide pour la JVM.

> Malheuresement, cela peut-être assez confus de savoir quand il faut utiliser une méthode ou une fonction et cela reflète la nature hybride de Scala comme un langage orienté objet et un langage fonctionnel. Heureusement, dans la majorité des cas, nous pouvons utiliser des méthodes et fonctions interchangeablement, donc ne vous préoccupez pas trop de la distinction à partir de maintenant.

> Nous allons maintenant définir des méthodes. Nous allons voir ce qu'est une véritable _fonction_ bientôt.

Voici deux méthodes pour printer un message d'erreur ou un simple message d'information. Nous allons expliquer la syntaxe dans la cellule qui suit.

In [6]:
/*
 * "info" takes a single String argument, prints it on a line,
 * and returns it. 
 */
def info(message: String): String = {
    println(message)

    // The last expression in the block, message, is the return value. 
    // "return" keyword not required.
    // Do no additional formatting for the return string.
    message  
}

info: (message: String)String


In [7]:
/*
 * "error" takes a single String argument, prints a formatted error message,
 * and returns the message. 
 */
def error(message: String): String = {   
    
    // Print the string passed to "println" and add a linefeed ("ln"):
    // See the next cell for an explanation of how the string is constructed.
    val fullMessage = s"""
        |********************************************************************
        |
        |  ERROR: $message
        |
        |********************************************************************
        |""".stripMargin
    println(fullMessage)
    
    fullMessage
}

error: (message: String)String


Essayons les.

In [8]:
val infoString = info("All is well.")

All is well.


infoString = All is well.


All is well.

In [8]:
val errorString = error("Uh oh...")


********************************************************************

  ERROR: Uh oh...

********************************************************************



errorString: String = 


"
********************************************************************
  ERROR: Uh oh...
********************************************************************
"


errorString: String = 


In [9]:
errorString

"
********************************************************************
  ERROR: Uh oh...
********************************************************************
"


String = 


Les définitions de méthodes possèdent ces éléments suivants dans cet ordre :
* Le mot clé `def` 
* Le nom de la méthode (`error` et `info` ici)
* La liste des arguments entre parenthèses. S'il n'y a pas d'arguments, les parenthèses vides peuvent être omises. Commun pour `toString` et les getter qui retournent un champ dans une instance, etc.
* Deux points suivi par le type de retour de la méthode. Ce type est souvent inferé par Scala et il n'est donc pas obligatoire mais il est fortement recommandé de le mettre tout le temps !
* Un signe `=` qui sépare la _signature_ de la méthode du _corps_
* Le corps entre accolades `{ ... }`, cependant si le corps consiste d'une seule expression alors les accolades sont optionnels
* La dernière expression dans le corps est utilisé comme valeur de retour. Le mot clé `return` est déconseillé
* Les point-virgules `;` sont inferés et ne sont pas utilisés dans la grande majorité des cas

Regardez la liste des arguments pour `error`. Il s'agit de `(message: String)`, où `message` est le nom de l'argument et son type est `String`. La convention pour les _annotations de type_, `name: Type`, est aussi utilisé pour le type de retour, `error(...): String`. Les annotations de type sont requis par Scala pour les arguments des méthodes. Elles sont optionnelles dans la majorité des cas pour le type de retour. Nous allons voir que Scala peut inférer les types de beaucoup d'expressions et de déclarations de variables.

Scala utilise les mêmes conventions de commentaires que Java, `// ...` pour une ligne seule, et `/* ... */` pour un bloc de commentaire.

> **Note:** Expression vs. Déclaration

> Une _expression_ a une valeur, tandis qu'une _déclaration_ n'en a pas. Ainsi lorsque nous assignons une expression à une variable, la valeur que l'expression retourne est assignée à une variable.

Dans `error`, nous utilisons une combinaison d'interpolation de string avec la syntaxe `s"""..."""` :
* **String avec trois guillemets :** `"""..."""`. Utile lorsque la string contient des retours à la ligne, comme dans `error`. (Nous allons aussi voir un autre avantage plus tard)
* **Interpolation de String :** Utilisé en mettant `s` au début de la string `s"..."` ou `s"""..."""`. Cela nous permet de mettre des références à des variables et des expressions où la conversion en string sera insérée automatiquement. Par exemple : 

In [10]:
s"""Utilisez des accolades pour les expressions : ${sc.version}.
Elles sont optionnelles quand vous utilisez juste une variable : $sc
Faites attention dans les cas comme : ${sc}etunautretruc"""

Utilisez des accolades pour les expressions : 2.4.0.
Elles sont optionnelles quand vous utilisez juste une variable : org.apache.spark.SparkContext@190942b2
Faites attention dans les cas comme : org.apache.spark.SparkContext@190942b2etunautretruc


String = 


Une autre fonctionnalité que nous utilisons pour les strings entre trois guillemets est la possiblité d'enlever le premier espace de chaque ligne. La méthode `stripMargin` enlève tous les espaces avant en incluant `|`. Cela vous permet d'indenter ces lignes pour que votre code soit formaté proprement mais sans avoir d'espace dans votre string. Dans l'exemple suivant, la string résultante possède des espaces en début de ligne et à la fin de la ligne. Observez ce qui arrive avec les espaces avant `line2` et `line3` quand la string entière est affichée :

In [11]:
s"""
    |line 1
    |  line 2
    |  |  line 3
    |""".stripMargin

"
line 1
  line 2
  line 3
"


String = 


Les caractères 'littéraux' sont indiqués avec des apostrophes, '/', tandis que les string sont indiquées avec des guillemets, "/".

In [12]:
'/'

Char = /


In [13]:
"/"

String = /


### Variables mutable vs. valeurs immutables
Regardons comment déclarer une valeur immutable avec `val` :
* `val immutableValue = ...`: Une fois initialisée, nous ne pouvons pas attribuer une valeur _différente_ à `immutableValue`.
* `var mutableVariable = ...`: Nous pouvons attribuer de nouvelles valeurs à `mutableVariable` autant de fois qu'on le souhaite.

Il est _très recommandé_ de n'utiliser que des `vals` à moins que vous ayez une très bonne raison d'avoir recours à de la mutabilité, qui est une source très commune de bugs !

> Un `val immutableValue` peut pointer vers une instance qui _elle_ est mutable par exemple un [Array](http://www.scala-lang.org/api/current/#scala.Array). Dans ce cas là, même si nous ne pouvons pas assigner un nouveau Array à `immutableValue`, nous pouvons changer les éléments dans l'Array ! Dis d'une autre façon, l'immutabilité n'est pas _transitif_.

### Préparer les fichiers
Ce notebook possède déjà les fichiers de données nécessaires, à savoir plusieurs pièces de Shakespeare. Elles sont dans
le dossier `/home/jovyan/work/data/shakespeare` dans le container (`data/shakespeare` dans le projet git). Il y a un fichier par pièce.

Nous allons écrire du code Scala pour vérifier que les fichiers sont bien présents et en même temps apprendre un peu de Scala.

Beaucoup de types utilisés en Scala proviennent de la librairie Java (JDK). Parce que Scala compile vers du byte code JVM, vous pouvez utiliser n'importe quelle librairie Java en Scala. Nous étions en train d'utiliser [java.lang.String](https://docs.oracle.com/javase/8/docs/api/java/lang/String.html). Nous allons maintenant utiliser [java.io.File](https://docs.oracle.com/javase/8/docs/api/java/io/File.html) pour travailler avec les différents fichiers et dossiers.

Comme avant, nous allons utiliser des commentaires pour expliquer quelques nouvelles notions de Scala.

In [15]:
// Import File. Unlike Java, the semicolon ';' is not required.
import java.io.File

Voici le dossier où les fichiers devraient être situés.

In [16]:
val shakespeare = new File("/home/jovyan/work/data/shakespeare")

shakespeare = /home/jovyan/work/data/shakespeare


/home/jovyan/work/data/shakespeare

Le `if` en Scala est en fait une expression (en Java ce sont des _déclarations_). L'expression `if` retourne `true` ou `false` et l'assigne à un `success` que nous allons utiliser juste après.

In [17]:
val success = if (shakespeare.exists == false) {   // doesn't exist already?
    error(s"Data directory path doesn't exist! $shakespeare")  // ignore returned string
    false
} else {
    info(s"$shakespeare exists")
    true
}
println("success = " + success)

/home/jovyan/work/data/shakespeare exists
success = true


success = true


true

Vérifions maintenant que les fichiers sont bien là.

In [18]:
val pathSeparator = File.separator
val targetDirName = shakespeare.toString
val plays = Seq(
    "tamingoftheshrew", "comedyoferrors", "loveslabourslost", "midsummersnightsdream",
    "merrywivesofwindsor", "muchadoaboutnothing", "asyoulikeit", "twelfthnight")

if (success) {
    println(s"Checking that the plays are in $shakespeare:")
    val failures = for {
        play <- plays
        playFileName = targetDirName + pathSeparator + play
        playFile = new File(playFileName)
        if (playFile.exists == false)
    } yield {
        s"$playFileName:\tNOT FOUND!"
    }
  
    println("Finished!")
    if (failures.size == 0) {
        info("All plays found!")
    } else {
        println("The following expected plays were not found:")
        failures.foreach(play => error(play))
    }
}

Checking that the plays are in /home/jovyan/work/data/shakespeare:
Finished!
All plays found!


pathSeparator = /
targetDirName = /home/jovyan/work/data/shakespeare
plays = List(tamingoftheshrew, comedyoferrors, loveslabourslost, midsummersnightsdream, merrywivesofwindsor, muchadoaboutnothing, asyoulikeit, twelfthnight)


All plays found!

Nous utilisons ici un `for` _comprehension_. Ce sont des _expressions_, et non pas des _déclarations_ comme les `for` Java. Elles ont la forme :

```
for {
  play <- plays
  ...
} yield { block_of_final_expressions }
```
Nous itérons sur une collection `plays`, et assignons chacun à la variable `play` (qui est en fait une valeur immutable pour chaque itération). 

Après avoir assigné à `play`, les prochaines étapes dans la `for` comprehension l'utilise. Premièrement, une instance de  [java.io.File](https://docs.oracle.com/javase/8/docs/api/java/io/File.html) `playFile`, est crée. Ensuite `playFile` est utilisé pour évaluer un prédicat - le fichier existe t'il déjà ? (Il devrait !)

Si le fichier existe déjà alors `false` est retourné, ce qui casse la boucle et nous passons à la prochaine itération.
Si le fichier n'existe pas, alors le mot clé `yield` dit à Scala que nous voulons utiliser l'expression qui suit pour construire un nouvel élément, une string _interpolée_ pour les pièces manquantes. Parmi ces éléments retournés, pouvant aller de zéro à un nombre, une nouvelle collection est construite. Le bloc final `if` détermine si la nouvelle collection a zéro éléments (attendu), puis affiche un message `info`. Si des fichiers manquaient, alors un message `error` serait affiché pour chacun des fichiers manquant.

## Utiliser des fonctions comme argument

Notez comment nous avons affiché les `successes`. L'idiome `collection.foreach(println)` est utile pour itérer sur des éléments et les afficher, un par ligne. Mais comment cela marche t'il exactement ? (Nous utiliserons `plays` au lieu de `failures`, parce que le deuxième est normalement toujours vide !)

In [19]:
println("Pass println as the function to use for each element:")
plays.foreach(println)

println("\nUsing an anonymous function that calls println: `str => println(str)`")
println("(Note that the type of the argument `str` is inferred to be String.)")
plays.foreach(str => println(str))

println("\nAdding the argument type explicitly. Note that the parentheses are required.")
plays.foreach((str: String) => println(str))

println("\nWhy do we need to name this argument? Scala lets us use _ as a placeholder.")
plays.foreach(println(_))

println("\nFor longer functions, you can use {...} instead of (...).")
println("Why? Because it gives you the familiar multiline block syntax with {...}")
plays.foreach {
  (str: String) => println(str)
}

println("\nThe _ placeholder can be used *once* for each argument in the list.")
println("As an assume, use `reduceLeft` to sum some integers.")
val integers = 0 to 10   // Return a "range" from 0 to 10, inclusive
integers.reduceLeft((i,j) => i+j)
integers.reduceLeft(_+_)

Pass println as the function to use for each element:
tamingoftheshrew
comedyoferrors
loveslabourslost
midsummersnightsdream
merrywivesofwindsor
muchadoaboutnothing
asyoulikeit
twelfthnight

Using an anonymous function that calls println: `str => println(str)`
(Note that the type of the argument `str` is inferred to be String.)
tamingoftheshrew
comedyoferrors
loveslabourslost
midsummersnightsdream
merrywivesofwindsor
muchadoaboutnothing
asyoulikeit
twelfthnight

Adding the argument type explicitly. Note that the parentheses are required.
tamingoftheshrew
comedyoferrors
loveslabourslost
midsummersnightsdream
merrywivesofwindsor
muchadoaboutnothing
asyoulikeit
twelfthnight

Why do we need to name this argument? Scala lets us use _ as a placeholder.
tamingoftheshrew
comedyoferrors
loveslabourslost
midsummersnightsdream
merrywivesofwindsor
muchadoaboutnothing
asyoulikeit
twelfthnight

For longer functions, you can use {...} instead of (...).
Why? Because it gives you the familiar multiline

integers = Range(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


55

# Notre premier programme Spark
Ouf ! Nous avons déjà appris beaucoup de Scala en faisant des tâches typiques de data science (par exemple récupérer de la donnée)
Maintenant nous pouvons implémenter un algorithme en utilisant Spark, _Inverted Index_

## Inverted Index - Quand vous ne voulez plus comptez les mots...

Vpus allez avoir besoin d'utiliser _Inverted Index_ quand vous aller créer votre prochain "Google killer". Il prend en entrée un corpus de documents (des pages webs par exemple), tokenize les mots et sort pour chaque mot une liste des documents qui contienne ce mot avec à côté le nombre de fois que ce mot apparaît.

C'est un algorithme un peu plus intéressant que le _Word Count_, qui est en quelque sorte le "hello world" que tout le monde fait quand la personne apprend Spark.

Le terme _inverted_  signifie que nous commencons avec des mots comme valeurs d'entrée tandis que les clés sont des identifiants de documents et que nous allons inverser le fait d'utiliser les mots comme clé et les identifiants de document comme valeur.

Voici notre première version dans son entièreté. C'est une _unique, longue expression_. Notez les points `.` à la fin des sous expressions.

In [20]:
val iiFirstPass1 = sc.wholeTextFiles(shakespeare.toString).
    flatMap { location_contents_tuple2 => 
        val words = location_contents_tuple2._2.split("""\W+""")
        val fileName = location_contents_tuple2._1.split(pathSeparator).last
        words.map(word => ((word, fileName), 1))
    }.
    reduceByKey((count1, count2) => count1 + count2).
    map { word_file_count_tup3 => 
        (word_file_count_tup3._1._1, (word_file_count_tup3._1._2, word_file_count_tup3._2)) 
    }.
    groupByKey.
    sortByKey(ascending = true).
    mapValues { iterable => 
        val vect = iterable.toVector.sortBy { file_count_tup2 => 
            (-file_count_tup2._2, file_count_tup2._1)
        }
        vect.mkString(",")
    }

iiFirstPass1 = MapPartitionsRDD[9] at mapValues at <console>:44


MapPartitionsRDD[9] at mapValues at <console>:44

Maintenant cherchons à décomposer en étapes, en assignant chaque étape à une variable. La verbosité supplémentaire que cela apporte nous permettra de voir ce que Scala infère pour le type retourné de chaque expression, dans des buts d'apprentissage.

C'est une des fonctionnalités très agréable de Scala. Nous n'avons pas à mettre d'information de type manuellement la plupart du temps comme nous aurions du le faire avec du code Java. Au contraire, nous laissons le compilateur nous donner un retour sur ce que nous venons de créer. C'est très utile lorsque vous apprenez une nouvelle API, comme Spark.

In [21]:
val fileContents = sc.wholeTextFiles(shakespeare.toString)
fileContents   // force the notebook to print the type.

fileContents = /home/jovyan/work/data/shakespeare MapPartitionsRDD[11] at wholeTextFiles at <console>:30


/home/jovyan/work/data/shakespeare MapPartitionsRDD[11] at wholeTextFiles at <console>:30

La deuxième ligne, avec `fileContents` tout seul, est là pour que le notebook nous montre les informations de type.(Essayez de l'enlever et de réévaluer la cellule, rien ne sera affiché)

La sortie nous indique que `fileContents` possède le type [RDD[(String,String)]](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), cependant `RDD` est une classe de base qui est en fait une instance de `MapPartitionsRDD`, une implémentation "privée" de la sous classe `RDD`. 

Un nom suivi de crochets, `[...]`, signifie que le `RDD[...]` a besoin d'un ou plusieurs paramètres de type dans les crochets. Dans ce cas là, il y a un unique paramètre de type qui représente le type des enregistrements du `RDD`. 

Le paramètre de type unique est donné par `(String,String)`, qui est un raccourci bien utile pour [Tuple2[String,String]](http://www.scala-lang.org/api/current/index.html#scala.Tuple2).
Cela signifie que nous avons des tuples de deux éléments comme enregistrements, où le premier élément est une `String` représentant le chemin absolu d'un fichier et que le second élément est aussi une `String` qui représente le contenu du fichier. C'est ce que retourne `SparkContext.wholeTextFiles` pour nous. Nous utiliserons le chemin du fichier pour se rappeler où nous avons trouvé des mots, tandis que le contenu contiendra les mots même.

Pour résumer, ces deux types sont équivalents :
* `RDD[(String,String)]` - Notez les parenthèses entre les crochets, `[(...)]`.
* `RDD[Tuple2[String,String]]` - Notez les crochets dans les crochets `[...[...]]`, et non pas `[(...)]`.

Nous allons voir sous peu que nous pouvons aussi écrire des _instances_ de [Tuple2[T1,T2]](http://www.scala-lang.org/api/current/index.html#scala.Tuple2) avec la même syntaxe, e.g., `("foo", 101)`, pour un tuple `(String,Int)`, et similairement pour des tuples d'_arité supérieur_ (jusqu'à 22 elements...), e.g., `("foo", 101, 3.14159, ("bar", 202L))`. Faites tourner la prochaine cellule pour voir le type du tuple.

In [22]:
("foo", 101, 3.14159, ("bar", 202L))

(foo,101,3.14159,(bar,202))

Avez-vous compris ? Voyez-vous qu'il s'agit d'un tuple de quatre élément et non pas d'un tuple de cinq élément ? C'est parce que `("bar", 202L)` est un tuple imbriqué. C'est le quatrième élément du tuple extérieur.

**Exercice:** Essayez de créer des tuples avec des éléments de type différent, utilisez la prochaine cellule.

In [23]:
(1,2)

(1,2)

Combien de `fileContents` avons-nous ? Pas énormément. Nous devrions normalement avoir le même nombre que le nombre de fichier que nous avons chargé en haut.

In [24]:
fileContents.count

8

> **NOTE:** Nous avons utilisé la méthode`RDD.count`, alors que la majorité des collections Scala ont une méthode `size`.

Maintenant pour la prochaine étape dans notre calcul. En premier, nous allons tokenizer le contenu des mots en les séparant sur les caractères non alphanumériques, donc toutes les instances d'espaces, de retour à la lignes, de ponctuations, etc.

Ensuite, le chemin absolu étant verbeux et le même préfixe étant répété pour tous les fichiers, extrayons juste le dernier élément, le nom unique du fichier.

Ainsi nous allons former des tuples avec des mots et des noms de fichier.

> **Note:** Cette tokenization est très brute. Elle ne prend pas en compte les contractions comme `it's` et les mots séparés par un trait d'union comme `world-changing`. Lorsque vous aller tuer Google, soyez sûr d'utiliser une vraie technique de tokenization NLP, NLP pour Natural Language Processing ou Traitement de Langage Naturel.

In [25]:
val wordFileNameOnes = fileContents.flatMap { location_contents_tuple2 => 
    // example input record: (file_path, "all the words in the file")
    // mytuple._2 => give me the 2nd element
    val words = location_contents_tuple2._2.split("""\W+""")              
    // mytuple._1 => give me the 1st element
    val fileName = location_contents_tuple2._1.split(pathSeparator).last  
    // create a new tuple to return. Note how we structured it!
    words.map(word => ((word, fileName), 1))
}
wordFileNameOnes

wordFileNameOnes = MapPartitionsRDD[12] at flatMap at <console>:34


MapPartitionsRDD[12] at flatMap at <console>:34

Je trouve que cela est dur à lire et nous allons bientôt voir une solution plus élegante avec une syntaxe alternative.

Essayons de comprendre la différence entre `map` et `flatMap`. Si j'appellais `fileContents.map`, cela retournerait exactement _un_ nouvel enregistrement pour chaque enregistrement dans _fileContents_. Ce que nous voulons en réalité sont des nouveaux enregistrements pour chaque couple de mot-nomDeFichier, qui correspondrait à un nombre plus large (mais la donnée dans chaque enregistrement serait beaucoup plus petite)

Utiliser `fileContents.flatMap` nous donne ce que nous voulons. Au lieu de retourner en sortie un enregistrement pour chaque enregistrement en entrée, un `flatMap` retourne une _collection_ de nouveaux enregistrements, de taille supérieure à 0, pour _chaque_ enregistrement d'entrée. Ces collections sont ensuite _aplaties_ dans une grosse collection, un autre `RDD` dans notre cas.

Que devrais faire `flatMap` pour chaque enregistrement ? Nous passons une _fonction_ pour définir le comportement. Nous utilisons donc une fonction sans nom ou _anonyme_. La syntaxe est `liste_des_arguments => body`:

```scala
location_contents_tuple2 => 
    val words = ...
    ...
}
```

Nous avons un argument unique, l'enregistrement, que nous avons nommé `location_contents_tuple2`, une façon verbeuse de dire que nous avons à faire à un tuple de deux éléments avec en entrée le chemin du fichier et son contenu. Nous n'avons pas besoin d'un paramètre de type après `location_contents_tuple2` car c'est inféré par Scala. La flèche `=>` sépare la liste des arguments du corps.

Quand une fonction prend plus qu'un seul argument ou que vous ajoutez des annotations de type explicite (`: (String,Int,Double)`), alors vous aurez besoin de parenthèses. Voici trois exemples :

```scala
(some_tuple3: (String,Int,Double)) => ...
(arg1, arg2, arg3) => ...
(arg1: String, arg2: Int, arg3: Double) => ...
```
Nous laissons Scala inférer le type des arguments, et dans notre cas c'est `(String,String)`.

Une seconde, nous avons précedemment dit que nous passions une fonction en tant qu'argument de `flatMap`. Si c'est le cas, pourquoi utilisons nous des accolades `{...}` autour de l'argument de cette fonction au lieu de parenthèses `(...)` qu'il serait normal de retrouver lorsque vous passez des arguments à une méthode comme `flatMap`? 

C'est parce que Scala nous permet d'utiliser des accolades au lieu de parenthèses pour avoir la syntaxe familière du bloc `{...}` que nous connaissons tous et aimons pour les expressions `if` et `for`. Nous pourrions utiliser soit des accolades ou des parenthèses ici. La convention dans la communauté Scala est d'utiliser des accolades pour les fonctions anonymes qui font plusieurs lignes et des parenthèses pour les expressions tenant sur une ligne.

Maintenant pour chaque `location_contents_tuple2`, nous accédons au _premier_ élément en utilisant la méthode `_1` et le _deuxième_ élément en utilisant `_2`.

Le fichier `contents` est dans le deuxième élément. Nous les séparons en appelant la méthode Java `String.split`, qui prend une string correspondant à une _expression régulière_. Ici nous spécifions une expression régulière pour un ou plusieurs caractères non alphanumériques. `String.split` retourne un `Array[String]` des mots. 

```scala
val words = location_contents_tuple2._2.split("""\W+""")
```

Pour le premier élément du tuple, nous extrayons le nom du fichier en fin de path. Ce n'est pas nécessaire, mais ça permet d'avoir une sortie plus lisible si nous retirons le long préfixe commun du path.

```scala
val fileName = location_contents_tuple2._1.split(pathSeparator).last
```

Finalement, nous utilisons `Array.map` (et non pas `RDD.map`) dans la fonction anonyme passé au `flatMap` pour transformer chaque `word` en un tuple de la forme `((word, fileName), 1)`.

```scala
words.map(word => ((word, fileName), 1))
```

Pourquoi avons nous imbriqué un tuple de `(word, fileName)` dans un tuple "extérieur" avec un `1` comme deuxième élément ? Pourquoi ne pas seulement avoir crée un tuple de trois éléments `(word, fileName, 1)`? C'est parce que nous utilisons `(word, fileName)` comme une _clé_ dans la prochaine étape, où nous allons trouver des combinaisons uniques de word-fileName (en utilisant l'équivalent du `group by`). Ainsi, utiliser le `(word, fileName)` imbriqué comme la clé _key_ est plus pratique. La _valeur_ `1` _value_ est un count "seed", que nous allons utiliser pour compter les occurencese uniques des paires de `(word, fileName)`.

> **Notes:**
> * Pour des raisons historiques, les indices de tuple commencent à 1 et non 0. Les Arrays et autre collections de Scala ont un index commencant à 0.
> * Nous avons précédemment dit que les arguments d'une _méthode_ ont besoin d'être déclarés avec des types. Ce n'est pas nécessairement requis pour les arguments de _fonctions_ comme ici.
> * Un autre bénéfice d'une string avec trois guillemets qui les rend sympathique pour les expressions régulières est que vous n'avez pas à échapper vos caractères comme `\W`. Si nous avions utilisé un seul guillemet alors nous aurions du écrire `"\\W+"`. A vous de faire votre choix

Comptons le nombre de lignes que nous avons et regardons quelques lignes. Nous utiliserons la méthode `RDD.take` pour prendre les 10 premières lignes, itérer dessus et les afficher.

In [26]:
wordFileNameOnes.count

173336

In [27]:
wordFileNameOnes.take(10).foreach(println)

((,merrywivesofwindsor),1)
((THE,merrywivesofwindsor),1)
((MERRY,merrywivesofwindsor),1)
((WIVES,merrywivesofwindsor),1)
((OF,merrywivesofwindsor),1)
((WINDSOR,merrywivesofwindsor),1)
((DRAMATIS,merrywivesofwindsor),1)
((PERSONAE,merrywivesofwindsor),1)
((SIR,merrywivesofwindsor),1)
((JOHN,merrywivesofwindsor),1)


Nous avons demandé des résultats, nous forçons donc Spark à lancer un job pour calculer le résultat. Les pipelines Spark comme `iiFirstPass1` sont _lazy_; rien n'est calculé tant que nous ne demandons pas de résultat. 

Quand vous apprenez, il est utile d'afficher des données pour comprendre mieux ce qui se passe. Faites attention toutefois car cela utilise plus de ressources.

Le premier enregistrement nous montre "" (vide) comme mot :

```
((,asyoulikeit),1)
```

Aussi certains mots sont en majuscules :
```
((DRAMATIS,asyoulikeit),1)
```
(Vous pouvez voir que ces mots en majuscules apparaissent si vous regardez dans les fichiers sources) Plus tard nous allons filtrer tous ces mots vide et passer tous les mots en minuscule.

Maintenant joignons toutes les paires uniques de `(word,fileName)`. 

In [28]:
val uniques = wordFileNameOnes.reduceByKey((count1, count2) => count1 + count2)
uniques

uniques = ShuffledRDD[13] at reduceByKey at <console>:36


ShuffledRDD[13] at reduceByKey at <console>:36

En SQL vous pouvez utiliser `GROUP BY` pour ça (en incluant les requêtes SQL que vous pourriez écrire avec l'API Spark [DataFrame](http://spark.apache.org/docs/latest/sql-programming-guide.html)). Néanmoins, dans l'API `RDD`, c'est trop couteux pour nos besoins car nous ne nous préoccupons pas des groupes eux-mêmes, la longue liste des paires répétées de `(word,fileName)`. Nous nous préoccupons juste de combien d'éléments il y a dans chaque groupe, c'est à dire leur _size_. C'est la raison du `1` dans les tuples et pourquoi nous utilisons `RDD.reduceByKey`. Cela ramène ensemble toutes les lignes avec la même clé, les paires unique de `(word,fileName)`, et applique ensuite une fonction anonyme pour "réduire" les valeurs, les`1`. Nous faisons juste une somme après pour calculer le count des groupes.

Notez que la fonction anonyme `reduceByKey` attend deux arguments, nous avons donc besoin de parenthèses autour de la liste des arguments. Puisque la fonction tient sur une seule ligne, nous utilisons des parenthèses au lieu d'accolades.

> **Note:** Toutes les méthodes `*ByKey` opèrent sur des tuples de deux éléments et traitent le premier élément comme la clé par défaut.

Combien y en a t-il ?

In [29]:
uniques.count

27276

Comme attendu d'un `GROUP BY`, le nombre d'enregistrements est plus petit qu'avant. Il y a environ 1/6 de lignes comparé à avant, cela signifie qu'en moyenne chaque `(word,fileName)` apparaît 6 fois.

In [30]:
uniques.take(30).foreach(println)

((dexterity,merrywivesofwindsor),1)
((force,muchadoaboutnothing),2)
((whole,comedyoferrors),2)
((lamb,muchadoaboutnothing),2)
((blunt,tamingoftheshrew),3)
((letter,merrywivesofwindsor),19)
((crest,asyoulikeit),1)
((bestow,asyoulikeit),1)
((rear,midsummersnightsdream),1)
((crossing,tamingoftheshrew),1)
((wronged,merrywivesofwindsor),4)
((S,tamingoftheshrew),10)
((HIPPOLYTA,midsummersnightsdream),19)
((revolve,twelfthnight),1)
((er,merrywivesofwindsor),11)
((renown,asyoulikeit),1)
((cubiculo,twelfthnight),1)
((All,twelfthnight),3)
((power,loveslabourslost),8)
((Albeit,asyoulikeit),1)
((lips,tamingoftheshrew),3)
((upshot,twelfthnight),1)
((approach,midsummersnightsdream),4)
((mean,muchadoaboutnothing),5)
((embossed,asyoulikeit),1)
((varnish,loveslabourslost),2)
((Apollo,midsummersnightsdream),1)
((spangled,midsummersnightsdream),1)
((gentlemen,comedyoferrors),1)
((Rebuke,loveslabourslost),1)


Pour _inverted index_, nous voulons que les dernières clés soient des mots, il faut donc restructurer les tuples de `((word,fileName),count)` vers `(word,(fileName,count))`. Maintent, nous allons encore sortir des tuples à deux éléments mais le `word` sera la clé et le `(fileName,count)` sera la valeur.

In [31]:
val words = uniques.map { word_file_count_tup3 => 
    (word_file_count_tup3._1._1, (word_file_count_tup3._1._2, word_file_count_tup3._2)) 
}

words = MapPartitionsRDD[14] at map at <console>:38


MapPartitionsRDD[14] at map at <console>:38

Les méthodes de tuple `_1._2` sont assez difficiles à lire et la logique devient obscure. Nous allons voir par la suite une alternative beaucoup plus élégante.

Nous allons utiliser l'opération `group by`, car nous en avons besoin pour garder les groupes. Appeller `RDD.groupByKey` 
utilise le premier élément du tuple, présentement juste les `words`, pour ramener ensemble toutes les occurences de mots uniques. Ensuite nous allons trier les mots par ordre alphabétique.

In [32]:
val wordGroups = words.groupByKey.sortByKey(ascending = true)
wordGroups

wordGroups = ShuffledRDD[18] at sortByKey at <console>:40


ShuffledRDD[18] at sortByKey at <console>:40

Notez que chaque groupe est en fait un [Iterable](http://www.scala-lang.org/api/current/index.html#scala.collection.Iterable), i.e., une abstraction pour une certaine forme de collection. (C'est actuellement une collection privée définie par Spark appelée `CompactBuffer`.)

In [33]:
wordGroups.count

11951

In [34]:
wordGroups.take(30).foreach(println)

(,CompactBuffer((tamingoftheshrew,1), (asyoulikeit,1), (merrywivesofwindsor,1), (comedyoferrors,1), (midsummersnightsdream,1), (twelfthnight,1), (loveslabourslost,1), (muchadoaboutnothing,1)))
(A,CompactBuffer((loveslabourslost,78), (midsummersnightsdream,39), (muchadoaboutnothing,31), (merrywivesofwindsor,38), (comedyoferrors,42), (asyoulikeit,34), (twelfthnight,47), (tamingoftheshrew,59)))
(ABOUT,CompactBuffer((muchadoaboutnothing,18)))
(ACT,CompactBuffer((asyoulikeit,22), (comedyoferrors,11), (tamingoftheshrew,12), (loveslabourslost,9), (muchadoaboutnothing,17), (twelfthnight,18), (merrywivesofwindsor,23), (midsummersnightsdream,9)))
(ADAM,CompactBuffer((asyoulikeit,16)))
(ADO,CompactBuffer((muchadoaboutnothing,18)))
(ADRIANA,CompactBuffer((comedyoferrors,85)))
(ADRIANO,CompactBuffer((loveslabourslost,111)))
(AEGEON,CompactBuffer((comedyoferrors,20)))
(AEMELIA,CompactBuffer((comedyoferrors,16)))
(AEMILIA,CompactBuffer((comedyoferrors,3)))
(AEacides,CompactBuffer((tamingoftheshrew,1)

Finalement, nettoyons ces `CompactBuffers`. Nous pouvons les convertir en [Vector](http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Vector) (une collection avec une performance de _O(1)_ pour la plupart des opérations), puis les ordonner par ordre décroissant de count, afin que les lieux qui mentionnent le mot correspondant au _plus_ apparaissent en _premier_ dans la liste. (Réfléchissez à comment vous voudriez qu'un moteur de recherche fonctionne...) 

Notez que nous utilisons `Vector.sortBy`, et pas un sort provenant de `RDD`. Ce sortBy prend une function qui accepte tous les éléments d'une collection et retourne quelque chose utilisé pour trier la collection. En retournant `(-fileNameCountTuple2._2, fileNameCountTuple2)`, je dis, "triez par count _décroissant_ en premier, puis triez par le nom du fichier". `-fileNameCountTuple2._2` cause les counts à être triés par ordre décroissant car je retourne une valeur négative donc les counts plus larges seront inférieurs aux counts plus petits, par exemple `-3 < -2`.

Finalement, je prend le `Vector` résultant et je crée un CSV avec les éléments en utilisant la méthode helper `mkString`.

Qu'est donc `RDD.mapValues` ? Je pourrais utiliser `RDD.map`,mais je ne change pas les clés (les mots), donc au lieu d'avoir à faire au tuple avec les deux éléments, `mapValues` passe seulement la valeur du tuple et reconstruit un nouveau tuple `(clé,valeur)` avec la nouvelle valeur que ma fonction retourne. Ainsi, `mapValues` est plus pratique à utiliser que `map` quand j'ai un tuple de deux éléments et que je ne modifie pas les clés.

In [35]:
val iiFirstPass2 = wordGroups.mapValues { iterable => 
    val vect = iterable.toVector.sortBy { file_count_tup2 => 
        (-file_count_tup2._2, file_count_tup2._1)
    }
    vect.mkString(",")
}

iiFirstPass2 = MapPartitionsRDD[19] at mapValues at <console>:42


MapPartitionsRDD[19] at mapValues at <console>:42

C'est bon ! Le nombre de records est le même que `wordGroups` (comprenez-vous pourquoi ?), observons donc quelques records.

In [36]:
iiFirstPass2.take(30).foreach(println)

(,(asyoulikeit,1),(comedyoferrors,1),(loveslabourslost,1),(merrywivesofwindsor,1),(midsummersnightsdream,1),(muchadoaboutnothing,1),(tamingoftheshrew,1),(twelfthnight,1))
(A,(loveslabourslost,78),(tamingoftheshrew,59),(twelfthnight,47),(comedyoferrors,42),(midsummersnightsdream,39),(merrywivesofwindsor,38),(asyoulikeit,34),(muchadoaboutnothing,31))
(ABOUT,(muchadoaboutnothing,18))
(ACT,(merrywivesofwindsor,23),(asyoulikeit,22),(twelfthnight,18),(muchadoaboutnothing,17),(tamingoftheshrew,12),(comedyoferrors,11),(loveslabourslost,9),(midsummersnightsdream,9))
(ADAM,(asyoulikeit,16))
(ADO,(muchadoaboutnothing,18))
(ADRIANA,(comedyoferrors,85))
(ADRIANO,(loveslabourslost,111))
(AEGEON,(comedyoferrors,20))
(AEMELIA,(comedyoferrors,16))
(AEMILIA,(comedyoferrors,3))
(AEacides,(tamingoftheshrew,1))
(AEgeon,(comedyoferrors,7))
(AEgle,(midsummersnightsdream,1))
(AEmilia,(comedyoferrors,4))
(AEsculapius,(merrywivesofwindsor,1))
(AGUECHEEK,(twelfthnight,2))
(ALL,(midsummersnightsdream,2),(tamingof

Cela paraît raisonnable.

Nous allons maintenant améliorer le code en utilisant une feature très puissante, le _pattern matching_, qui permet de rendre le code à la fois plus concis et facile à comprendre. C'est ma fonctionnalité *préferée* de Scala. 

Avant cela, essayez de faire d'améliorer le code par vous même.

**Exercices:**

* Ajoutez un filter pour enlever le mot vide "". Vous pouvez faire cela de deux manières différentes, en utilisant [RDD.filter](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD) (cherchez dans [Scaladoc page]((http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD) pour la méthode `filter`), _ou_ en utilisant la méthode similaire implémentée par Scala, [scala.collection.Seq.filter](http://www.scala-lang.org/api/current/index.html#scala.collection.Seq). Les deux versions prennent une fonction _prédicat_, fonction qui retourne  `true` si le record doit être _conservée_ et `false` dans le cas contraire. Pensez-vous qu'un des deux choix est meilleur que l'autre ? Pourquoi ? Ou ces deux choix sont-ils au final similaires ? Les raisons peuvent inclure la compréhension du code et la performance.

* Convertissez tous les mots en minuscule. Il faut juste appeler `toLowerCase` sur une string. A quel endroit est-il judicieux de placer cet appel ?

J'implémenterais ces deux changements dans les améliorations à suivre en bas.

> **NOTE:** Si vous préférez faire une copie du code dans une nouvelle cellule, utilisez le menu _Insert_ en haut pour ajoutez des cellules. Ou vous pouvez apprendre un autre raccourci clavier `ESC`, puis `A` pour insérer avant ou `B` pour insérer après. Vous pouvez ensuite appuyer sur entrée pour éditer la cellule. Notez la barre d'outil pour configurer le format de la cellule. Cette cellule que vous lisez est par exemple en _Markdown_. Utilisez _Code_ pour les cellules contenant votre code source.

## Pattern Matching

Nous avons étudié un vrai programme et nous avons appris pas mal de Scala. Améliorons ça avec ma fonctionnalité préférée de Scala : _le pattern matching_.

Voici la version "premier passage" à nouveau.

In [37]:
val iiFirstPass1b = sc.wholeTextFiles(shakespeare.toString).
    flatMap { location_contents_tuple2 => 
        val words = location_contents_tuple2._2.split("""\W+""")
        val fileName = location_contents_tuple2._1.split(pathSeparator).last
        words.map(word => ((word, fileName), 1))
    }.
    reduceByKey((count1, count2) => count1 + count2).
    map { word_file_count_tup3 => 
        (word_file_count_tup3._1._1, (word_file_count_tup3._1._2, word_file_count_tup3._2)) 
    }.
    groupByKey.
    sortByKey(ascending = true).
    mapValues { iterable => 
        val vect = iterable.toVector.sortBy { file_count_tup2 => 
            (-file_count_tup2._2, file_count_tup2._1)
        }
        vect.mkString(",")
    }

iiFirstPass1b = MapPartitionsRDD[29] at mapValues at <console>:52


MapPartitionsRDD[29] at mapValues at <console>:52

Maintenant, voici une nouvelle implémentation qui utilise le _pattern matching_.

J'y ai fait deux autres ajouts : les solutions des derniers exercices, qui retire les mots wides "" et corrige les majuscules mélangées, en utilisant les ajouts suivants :

* `filter(word => word.size > 0)` pour retirer les mots vides. (En Spark et dans les collections Scala, `filter` a un sens positif : "qu'est-ce qui doit être retenu ?"). C'est indiqué par le commentaire `// #1`.
* `word.toLowerCase` pour convetir tous les mots uniformément en minuscule, tel que des mots comme HAMLET, Hamlet et hamlet dans le texte original sont traités comme étant le même mot, puisque nous comptons les occurences des mots. Voir le commentaire `// #2`.

In [8]:
val ii1 = sc.wholeTextFiles(shakespeare.toString).
    flatMap {
        case (location, contents) => 
            val words = contents.split("""\W+""").
                filter(word => word.size > 0)                      // #1
            val fileName = location.split(pathSeparator).last
            words.map(word => ((word.toLowerCase, fileName), 1))   // #2
    }.
    reduceByKey((count1, count2) => count1 + count2).
    map { 
        case ((word, fileName), count) => (word, (fileName, count)) 
    }.
    groupByKey.
    sortByKey(ascending = true).
    mapValues { iterable => 
        val vect = iterable.toVector.sortBy { 
            case (fileName, count) => (-count, fileName) 
        }
        vect.mkString(",")
    }

Name: Compile Error
Message: <console>:27: error: not found: value shakespeare
val ii1 = sc.wholeTextFiles(shakespeare.toString).
                            ^
<console>:30: error: value split is not a member of Any
            val words = contents.split("""\W+""").
                                 ^
<console>:32: error: value split is not a member of Any
            val fileName = location.split(pathSeparator).last
                                    ^
<console>:32: error: not found: value pathSeparator
            val fileName = location.split(pathSeparator).last
                                          ^
<console>:40: error: not found: value ascending
    sortByKey(ascending = true).
              ^

StackTrace: 

Comparez avec les solutions d'exercice plus haut. J'ai ajouté le filtrage dans la foncrtion passée à `flatMap`. Mon choix réduit le nombre d'enregistrement en sortie de `flatMap` d'au plus un enregistrement par ligne en entrée, ce qui ne devrait pas avoir un impact significatif sur les performance. Le filtrage en lui-même ajoute un peu de surcharge.

La façon dont Spark implémente des étapes comme `map`, `flatMap`, `filter` entraine la même surcharge que si j'utilisais `RDD.filter`. Notez que nous pouvons aussi faire un filtrage plus tard dans la pipeline, après `groupByKey`, par exemple. Donc, quelque soit l'approche que vous implémentez, c'est probablement bien. Vous pouvez réaliser un profiling de performancedes différentes approches, mais vous ne trouverez pas de différence significative à moins d'utiliser un data set d'entrée très important.

Vérifions si nous obtenons toujours des résultats raisonable. Maintenant je vais utiliser l'API [DataFrame](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame) de Spark pour ses options d'affichage pratique. `DataFrames` font partis de [Spark SQL](http://spark.apache.org/docs/latest/sql-programming-guide.html). 

Tout d'abord, nous avons besoin de créer une instance de [SQLContext](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLContext) qui nous permettra d'accéder à ces fonctionnalités.

In [39]:
import org.apache.spark.sql.SQLContext

In [40]:
val sqlContext = new SQLContext(sc)

sqlContext = org.apache.spark.sql.SQLContext@27a9da47




org.apache.spark.sql.SQLContext@27a9da47

Maintenant, nous convertissons le `RDD` en `DataFrame` avec `sqlContext.createDataFrame` et nous utilisons `toDF` (convertir vers un autre `DataFrame` ?) avec un nouveau pour chaque "colonne".

In [41]:
val ii1DF = sqlContext.createDataFrame(ii1).toDF("word", "locations_counts")

ii1DF = [word: string, locations_counts: string]


[word: string, locations_counts: string]

La _cellule magique_ `%%dataframe` founit un sympathique affichage tabulaire.

In [42]:
%%dataframe
ii1DF

word,locations_counts
a,"(loveslabourslost,507),(merrywivesofwindsor,494),(muchadoaboutnothing,492),(asyoulikeit,461),(tamingoftheshrew,445),(twelfthnight,416),(midsummersnightsdream,281),(comedyoferrors,254)"
abandon,"(asyoulikeit,4),(tamingoftheshrew,1),(twelfthnight,1)"
abate,"(loveslabourslost,1),(midsummersnightsdream,1),(tamingoftheshrew,1)"
abatement,"(twelfthnight,1)"
abbess,"(comedyoferrors,8)"
abbey,"(comedyoferrors,9)"
abbominable,"(loveslabourslost,1)"
abbreviated,"(loveslabourslost,1)"
abed,"(asyoulikeit,1),(twelfthnight,1)"
abetting,"(comedyoferrors,1)"


OK, maintenant explorons la nouvelle implémentation. Je commence comme précédemment, en appelant `wholeTextFiles` :

```scala
val ii = sc.wholeTextFiles(shakespeare.toString).
```

La fonction que je passe à `flatMap` maintenant ressemble à ça :

```scala
flatMap { 
    case (location, contents) => 
        val words = contents.split("""\W+""").
            filter(word => word.size > 0)                      // #1
        val fileName = location.split(pathSeparator).last
        words.map(word => ((word.toLowerCase, fileName), 1))   // #2
}.
```

Comparez la avec la version précédente (en ignorant les améliorations pour les mots vides et les majuscules, marqués avec les commentaires \#1 et \#2) :

```scala
flatMap { location_contents_tuple2 => 
    val words = location_contents_tuple2._2.split("""\W+""")
    val fileName = location_contents_tuple2._1.split(pathSeparator).last
    words.map(word => ((word, fileName), 1))
}.
```

À la place de `location_contents_tuple2` un nom de variable pour tout le tuple, J'ai écrit `case (location, contents)`. Le mot-clé `case` indique que j'effectue un _pattern match_ sur l'objet passé à la fonction. Si c'est un tuple de deux éléments (et je sais que nous serons toujours dans ce cas), alors on _extrait_ le premier élément et l'assigner à une variable nommée `location` puis on extrait le second élément et on l'assigne à un variable nommée `contents`.

Maintenant, au lieu d'accèder à la position (`location`) et au contenu (`content`) avec la syntaxe quelque peu obscure et verbeuse `location_contents_tuple2._1` et `location_contents_tuple2._2`, respectivement, j'utilise des noms éloquents, `location` et `content`. Le code devient plus concis et plus lisible.

Je vais ci-dessous explorer d'avantage le pattern matching.

L'étape `reduceByKey` reste inchangé :

```scala
reduceByKey((count1, count2) => count1 + count2).
```

Pour plus de clarté, ceci n'est pas une expression de pattern-matching ; il n'y a de mot-clé `case`. C'est juste une fonction "régulière" qui prend deux paramètres, pour les deux éléments que je veux additionner.

Mon amélioration préféré est dans la ligne suivante :

```scala
map { 
    case ((word, fileName), count) => (word, (fileName, count)) 
}.
```

Comparez la avec l'obscure version précédente :

```scala
map { word_file_count_tup3 => 
    (word_file_count_tup3._1._1, (word_file_count_tup3._1._2, word_file_count_tup3._2)) 
}.
```

La nouvelle implémentation apporte de la clarté sur ce que je suis en train de faire : juste déplacer des parenthèses ! C'est tout ce que ça prend pour aller des clés `(word, fileName)` avec `count` en tant que valeur à des clés `word` et `(fileName, count)` en tant que valeur. Notez que le pattern matchimg fonctionne très bien avec les structures imbriquées, comme `((word, fileName), count)`.

J'espère que vous pouvez apprécier à quel point cette expression est élégante et concise ! NOtez comment je pense à la transformation suivante que j'ai besoin de faire en préparation pour le group-by final, de basculer de `((word, fileName), count)` à `(word, (fileName, count))` et _je l'ai simplement écrit exactement tel que je me l'étais représenté !_

Du code comme celui-ci fait de l'écriture de code Scala Spark une expérience sublime pour moi. J'espère que c'est le cas pour vous aussi ;)

Les deux prochaines expressions sont inchangées :

```scala
groupByKey.
sortByKey(ascending = true).
```

Le `mapValues` final utilise maintenant le pattern matching pour trier le `Vector` dans chaque enregistrement :

```scala
mapValues { iterable => 
    val vect = iterable.toVector.sortBy { 
        case (fileName, count) => (-count, fileName) 
    }
    vect.mkString(",")
}
```

Comparé le à la version originale, c'est à nouveau plus facile à lire :

```scala
mapValues { iterable => 
    val vect = iterable.toVector.sortBy { file_count_tup2 => 
        (-file_count_tup2._2, file_count_tup2._1)
    }
    vect.mkString(",")
}
```

La fonction que j'ai passé à `sortBy` retourne un tuple utilisé pour trié, avec `-count` pour forcé le trie numérique _descendant_ (le plus grand en premier) et `fileName` pour trié dans un deuxième temps par nom de fichier, pour les décomptes identiques. Je pourrais ignorer le trie par nom de fichier et simplement retourner `-count` (sans tuple). Cependant, si vous avez besoin d'une sortie reproductible dans un système réparti comme Spark, par exemple pour la validation par tests unitaires, alors le trie secondaire par nom de fichier utile.

## Our Final Version: Supporting SQL Queries
To play with some more Spark, let's write SQL queries to explore the resulting data. 

To do this, let's first refine the output. Instead of creating a string for the list of `(location,count)` pairs, which is opaque to our SQL schema (i.e., just a string), let's "unzip" the collection into two arrays, one for the `locations` and one for the `counts`. That way, if we ask for the first element of each array, we'll have nicely separate fields that work better with Spark SQL queries.

"Zipping" and "unzipping" work like a mechanical zipper. If I have a collection of tuples, say `List[(String, Int)]`, I convert this single collection of "zippered" values into two collections (in a tuple) of single values, `(List[String], List[Int])`. Zipping is the inverse operation.

Here is our final implementation, `ii1` rewritten with this change.

In [43]:
val ii = sc.wholeTextFiles(shakespeare.toString).
    flatMap {
        case (location, contents) => 
            val words = contents.split("""\W+""").
                filter(word => word.size > 0)                      // #1
            val fileName = location.split(pathSeparator).last
            words.map(word => ((word.toLowerCase, fileName), 1))   // #2
    }.
    reduceByKey((count1, count2) => count1 + count2).
    map { 
        case ((word, fileName), count) => (word, (fileName, count)) 
    }.
    groupByKey.
    sortByKey(ascending = true).
    map {                         // Must use map now, because we'll format new records. 
      case (word, iterable) =>    // Hence, pattern match on the whole input record.

        val vect = iterable.toVector.sortBy { 
            case (fileName, count) => (-count, fileName) 
        }

        // Use `Vector.unzip`, which returns a single, two element tuple, where each
        // element is a collection, one for the locations and one for the counts. 
        // I use pattern matching to extract these two collections into variables.
        val (locations, counts) = vect.unzip  
        
        // Lastly, I'll compute the total count across all locations and return 
        // a new record with all four fields. The `reduceLeft` method takes a function
        // that knows how to "reduce" the collection down to a final value, working 
        // from the left.
        val totalCount = counts.reduceLeft((n1,n2) => n1+n2)
        
        (word, totalCount, locations, counts)
    }

ii = MapPartitionsRDD[54] at map at <console>:55


MapPartitionsRDD[54] at map at <console>:55

We've changed the ending `mapValues` call to a `map` call, because we'll construct entirely new records, not just new values with the same keys. Hence the full records, two-element tuples are passed in, rather than just the values, so we'll pattern match on the tuple:


```scala
    map {                         // Must use map now, because we'll format new records.
      case (word, iterable) =>    // Hence, pattern match on the whole input record.

        val vect = iterable.toVector.sortBy { 
            case (fileName, count) => (-count, fileName) 
        }
```


We have a `Vector[String, Int]` of two-element tuples `(fileName, count)`. We use `Vector.unzip` to create a single, two element tuple, where each element is now a collection, one for the locations and one for the counts. The type is `(Vector[String], Vector[Int])`.

We can also use pattern matching with assignment! We immediately decompose the two-element tuple:

```scala
        // I use pattern matching to extract these two collections into variables.
        val (locations, counts) = vect.unzip  
```

Finally, it's convenient to know how many locations and counts we have, so we'll compute another new column for the their count and format a four-element tuple as the final output.

```scala
        // Lastly, I'll compute the total count across all locations and return 
        // a new record with all four fields. The `reduceLeft` method takes a function
        // that knows how to "reduce" the collection down to a final value, working 
        // from the left.
        val totalCount = counts.reduceLeft((n1,n2) => n1+n2)

        (word, totalCount, locations, counts)
    }
```    

Okay! Now let's create a [DataFrame](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame) with this data. The `toDF` method just returns the same `DataFrame`, but with appropriate names for the columns, instead of the synthesized names that `createDataFrame` generates (e.g., `_c1`, `_c2`, etc.)

Caching the `DataFrame` in memory prevents Spark from recomputing `ii` from the input files _every time_ I write a query!

Finally, to use SQL, I need to "register" a temporary table.

In [44]:
val iiDF = sqlContext.createDataFrame(ii).toDF("word", "total_count", "locations", "counts")
iiDF.cache
iiDF.registerTempTable("inverted_index")

iiDF = [word: string, total_count: int ... 2 more fields]




[word: string, total_count: int ... 2 more fields]

Let's remind ourselves of the schema:

In [45]:
iiDF.printSchema

root
 |-- word: string (nullable = true)
 |-- total_count: integer (nullable = false)
 |-- locations: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- counts: array (nullable = true)
 |    |-- element: integer (containsNull = false)



The following SQL query extracts the top location by count for each word, as well as the total count across all locations for the word. The Spark SQL dialect supports Hive SQL syntax for extracting elements from arrays, maps, and structs ([details](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-CollectionFunctions)). Here I access the first element (index zero) from each array. 

In [46]:
%%SQL
SELECT word, total_count, locations[0] AS top_location, counts[0] AS top_count 
FROM inverted_index 

+-----------+-----------+--...


+-----------+-----------+----------------+---------+
|       word|total_count|    top_location|top_count|
+-----------+-----------+----------------+---------+
|          a|       3350|loveslabourslost|      507|
|    abandon|          6|     asyoulikeit|        4|
|      abate|          3|loveslabourslost|        1|
|  abatement|          1|    twelfthnight|        1|
|     abbess|          8|  comedyoferrors|        8|
|      abbey|          9|  comedyoferrors|        9|
|abbominable|          1|loveslabourslost|        1|
|abbreviated|          1|loveslabourslost|        1|
|       abed|          2|     asyoulikeit|        1|
|   abetting|          1|  comedyoferrors|        1|
+-----------+-----------+----------------+---------+
only showing top 10 rows



Unfortunately, the output formatting for the `%%SQL` "cell magic" is not configurable. The `%%DataFrame` magic handles variable width layout and also provides more display options. First, to see its options:

In [47]:
%%dataframe

%%dataframe [arguments]
DATAFRAME_CODE

DATAFRAME_CODE can be any numbered lines of code, as long as the
last line is a reference to a variable which is a DataFrame.
    Option    Description                       
------    -----------                       
--limit   The number of records to return   
            (default: 10)                   
--output  The type of the output: html, csv,
            json (default: html)            


Now here's the previous query again, with the a `WHERE` clause added for good measure:

In [48]:
val topLocations = sqlContext.sql("""
    SELECT word,  total_count, locations[0] AS top_location, counts[0] AS top_count
    FROM inverted_index 
    WHERE word LIKE '%love%' OR word LIKE '%hate%'
""")

topLocations = [word: string, total_count: int ... 2 more fields]


[word: string, total_count: int ... 2 more fields]

Now use the `%%dataframe` _magic_.

In [49]:
%%dataframe --limit 100
topLocations

word,total_count,top_location,top_count
beloved,11,tamingoftheshrew,4
cloven,1,loveslabourslost,1
cloves,1,loveslabourslost,1
glove,3,loveslabourslost,2
glover,1,merrywivesofwindsor,1
gloves,5,merrywivesofwindsor,3
hate,22,midsummersnightsdream,9
hated,6,midsummersnightsdream,4
hateful,5,midsummersnightsdream,3
hates,5,asyoulikeit,2


A _natural language processing_ (NLP) expert might tell you that _love_, _loved_, _loves_, etc. are really the same word, because they are different conjugations of the verb _to love_ and _love_ is a noun, too. Similarly, should _gloves_ (plural) and _glove_ (singular) be handled differently?

What we really should do is extract the _stems_ of these words and use those instead. NLP toolkits handle this _stemming_ for you.

There's also a useful `show` method on `DataFrames`.

In [50]:
topLocations.show

+-------+-----------+--------------------+---------+
|   word|total_count|        top_location|top_count|
+-------+-----------+--------------------+---------+
|beloved|         11|    tamingoftheshrew|        4|
| cloven|          1|    loveslabourslost|        1|
| cloves|          1|    loveslabourslost|        1|
|  glove|          3|    loveslabourslost|        2|
| glover|          1| merrywivesofwindsor|        1|
| gloves|          5| merrywivesofwindsor|        3|
|   hate|         22|midsummersnightsd...|        9|
|  hated|          6|midsummersnightsd...|        4|
|hateful|          5|midsummersnightsd...|        3|
|  hates|          5|         asyoulikeit|        2|
| hateth|          1|midsummersnightsd...|        1|
|   love|        662|    loveslabourslost|      121|
|  loved|         38|         asyoulikeit|       13|
| lovely|         15|midsummersnightsd...|        7|
|  lover|         33|         asyoulikeit|       14|
| lovers|         31|midsummersnightsd...|    

By default, it truncates column widths and only prints 20 rows. You can override both:

In [51]:
topLocations.show(numRows = 40, truncate = false)

+--------+-----------+---------------------+---------+
|word    |total_count|top_location         |top_count|
+--------+-----------+---------------------+---------+
|beloved |11         |tamingoftheshrew     |4        |
|cloven  |1          |loveslabourslost     |1        |
|cloves  |1          |loveslabourslost     |1        |
|glove   |3          |loveslabourslost     |2        |
|glover  |1          |merrywivesofwindsor  |1        |
|gloves  |5          |merrywivesofwindsor  |3        |
|hate    |22         |midsummersnightsdream|9        |
|hated   |6          |midsummersnightsdream|4        |
|hateful |5          |midsummersnightsdream|3        |
|hates   |5          |asyoulikeit          |2        |
|hateth  |1          |midsummersnightsdream|1        |
|love    |662        |loveslabourslost     |121      |
|loved   |38         |asyoulikeit          |13       |
|lovely  |15         |midsummersnightsdream|7        |
|lover   |33         |asyoulikeit          |14       |
|lovers  |

> **Note:** Named Parameters

> I used _named parameters_ here, `show(numRows = 40, truncate = false)`, for legibility. They are optional in Scala, as long as you pass the values in the same order as the parameters are declared. You can also use named parameters to write the arguments in any order you want, not just declaration order. So, I could have just written `(40, false)`, but then you would rightly wonder what `false` means in this context.

**Exercises:** 

See the <a href="#ExerciseSolutions">Appendix</a> for the solutions to the first two exercises.

* The `glove`, `gloves`, `whate` and `whatever` aren't really the `love` and `hate` we wanted ;) How might you change the query so be more specific.
* Modify the query to return the top two locations and counts.
* Before moving on, try writing other queries. Edit the query in the following cell:

In [52]:
val sql1 = sqlContext.sql("""
    SELECT * FROM inverted_index
""")
sql1.show(10, false)

+-----------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------+
|word       |total_count|locations                                                                                                                                       |counts                                  |
+-----------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------+
|a          |3350       |[loveslabourslost, merrywivesofwindsor, muchadoaboutnothing, asyoulikeit, tamingoftheshrew, twelfthnight, midsummersnightsdream, comedyoferrors]|[507, 494, 492, 461, 445, 416, 281, 254]|
|abandon    |6          |[asyoulikeit, tamingoftheshrew, twelfthnight]                                                                                  

sql1 = [word: string, total_count: int ... 2 more fields]


[word: string, total_count: int ... 2 more fields]

#### Removing the "Stop Words"
Did you notice that one record we saw above was for the word "a". Not very useful if you're using this data for text searching, _sentiment mining_, etc. So called _stop words_, like _a_, _an_, _the_, _he_, _she_, _it_, etc., could also be removed.

Recall the `filter` logic I added to remove "", `word => word.size > 0`. I could replace it with `word => keep(word)`, where `keep` is a method that does any additional filtering I want, like removing stop words.

**Exercise:**

* Implement the `keep(word: String):Boolean` method and change the `filter` function to use it. Have `keep` return `false` for a small, hard-coded list of stop words (make up your own list or search for one). (See the <a href="#ExerciseSolutions">Appendix</a> for the solution.)

## Plus sur la syntaxe de Pattern Matching
Nous avons seulement gratté la surface du pattern matching. Découvrons la plus en détail.

Ici nous avons une autre fonction anonyme qui utilise du pattern matching qui étend la fonction précédente que nous avons passé à `flatMap` :

```scala
{
    case (location, "") => 
        Array.empty[((String, String), Int)]  // Return an empty array
    case (location, contents) => 
        val words = contents.split("""\W+""")
        val fileName = location.split(pathSep).last
        words.map(word => ((word, fileName), 1))
}.
```

Vous pouvez avoir plusieurs clauses `case`, certains d'eux pouvant correspondre sur des valeurs littérales spécifiques ("" dans ce cas-là) et d'autres qui sont plus générales. La première clause case s'occupe des fichiers sans contenu. La seconde clause est la même qu'avant.

Le pattern matching est _eager_. L'ordre est important car le premier match réalisé sera celui écrit en premier. Si vous inversez l'ordre ici, alors la `case (location, "")` ne marchera jamais et le compilateur vous affichera un avertissement "unreachable code".

Notez que vous n'avez pas à mettre les lignes après le `=>` dans les accolades `{...}` (bien que vous pouvez le faire). Les mots-clés `=>` et `case` (ou le dernier `}`) sont suffisants pour délimiter ces blocs. Aussi pour un bloc contenant une seule expression, comme pour la première clause, vous pouvez mettre l'expression sur la même ligne après le `=>` si vous voulez (et si ça rentre).

FInalement si aucun des case n'est réalisé, alors une exception [MatchError](http://www.scala-lang.org/api/current/index.html#scala.MatchError) exception est lancée. Dans notre cas, nous savons _toujours_ que nous avons des tuples à deux éléments, ainsi les exemples jusqu'ici sont corrects.

Voici un exemple final imaginé pour illustrer ce qui est possible, en utilisant une séquence d'objets de types différent :

In [53]:
val stuff = Seq(1, 3.14159, 2L, 4.4F, ("one", 1), (404F, "boo"), ((11, 12), 21, 31), "hello")

stuff.foreach {
    case i: Int               => println(s"Found an Int:   $i")
    case l: Long              => println(s"Found a Long:   $l")
    case f: Float             => println(s"Found a Float:  $f")
    case d: Double            => println(s"Found a Double: $d")
    case (x1, x2) => 
        println(s"Found a two-element tuple with elements of arbitrary type: ($x1, $x2)")
    case ((x1a, x1b), _, x3) => 
        println(s"Found a three-element tuple with 1st and 3th elements: ($x1a, $x1b) and $x3")
    case default              => println(s"Found something else: $default")
}

Found an Int:   1
Found a Double: 3.14159
Found a Long:   2
Found a Float:  4.4
Found a two-element tuple with elements of arbitrary type: (one, 1)
Found a two-element tuple with elements of arbitrary type: (404.0, boo)
Found a three-element tuple with 1st and 3th elements: (11, 12) and 31
Found something else: hello


stuff = List(1, 3.14159, 2, 4.4, (one,1), (404.0,boo), ((11,12),21,31), hello)


List(1, 3.14159, 2, 4.4, (one,1), (404.0,boo), ((11,12),21,31), hello)

Quelques notes.
* Un littéral comme `1`est inféré en tant que `Int`, tandis que `3.14159` est inféré en tant que `Double`. Ajoutez `L` ou `F`, pour inférer respectivement `Long` ou `Float` à la place.
* Notez comment nous avons mélangé la vérifaction de types spécifiques, par exemple `i: Int` avec des types moins typés comme `(x1, x2)`, qui attend un tuple à deux éléments. Tous les mots `i`, `l`, `f`, `d`, `x1`, `x2`, `x3`, et `default` sont des noms de variables arbitraires. `default` n'est pas un mot-clé, mais un choix arbitraire pour le nom d'une variable. Nous pourrions utiliser ce que nous voulons.
* La dernière clause `default` définit une variable sans information de type. Ainsi, cela match _tout_, ce qui est la raison pour laquelle cette clause doit apparaître en dernier. C'est la syntaxe utilisée quand vous n'êtes pas sûr du type des choses que vous être en train de faire correspondre et que vous voulez éviter une possible [MatchError](http://www.scala-lang.org/api/current/index.html#scala.MatchError).
* Si vous voulez matcher quelque chose qui _existe_, mais dont vous n'avez pas besoin de mettre dans une variable, alors utilisez `_` comme dans l'exemple du tuple à trois éléments.
* L'exemple du tuple à trois éléments montre que "l'imbriquage" arbitraire d'expressions est supportée, où le premier élément attendu est un tuple de deux éléments.

Toutes les fonctions anonymes que nous avons vues qui utilisent ces clauses de pattern matching ont ce format :

```scala
{ 
    case firstCase => ...
    case secondCase => ...
    ... 
}```

Ce format a un nom spécial. Il est appelé _fonction partielle_. Tout ce que cela veut dire est seulement que nous "promettons" d'accepter des arguments qui correspondent à au moins une de nos clauses `case` et non pas n'importe quelle entrée.

L'autre type de fonction anonyme que nous avons vu est la _fonction totale_.

Souvenez-vous lorsque il a été dit que les fonctions totales peuvent utiliser soit `(...)` ou `{...}` autour d'elles, en fonction du "look" que vous voulez leur donner. Pour les _fonction partielles_, vous _devez_ utiliser `{...}`.

Souvenez-vous que nous avons utilisé du pattern matching avec affectation :

```scala
val (locations, counts) = vect.unzip  
```
[Vector.unzip](http://www.scala-lang.org/api/current/#scala.collection.immutable.Vector) retourne un tuple de deux éléments, où chaque élément est une collection. Nous matchons sur ce tuple et affectons chaque pièce à une variable. Voici un autre exemple imaginé avec un tuple comportant des éléments imbriqués.

In [54]:
val (a, (b, (c1, c2), d)) = ("A", ("B", ("C1", "C2"), "D"))
println(s" $a, $b, $c1, $c2, $d")

 A, B, C1, C2, D


a = A
b = B
c1 = C1
c2 = C2
d = D


D

Essayez d'ajouter un élément `"E"` au côté droit du tuple sans changer le côté gauche. Que se passe-t-il ?
Essayez d'enlever le `"D"` et le `"E"`. Que se passe-t-il maintenant ?

Nous reviendrons à un dernier exemple de pattern matching quand nous discuterons des _case classes_.

## Le modèle Objet de Scala
Scala est un langage de prommation _hybride_, orienté objet et fonctionnel. La philosophie de Scala est que nous exploitons l'orienté objet pour l'encapsulation des détails, c-à-d _la modularité_, mais nous utilisons la programmation fonctionnelle pour sa précision logique lorsque nous implémentons ces détails. La majorité de ce que nous avons vu jusque-là relève plus de la programmation fonctionnelle. Une grande partie de la manipulation et l'analyse des données est en fait des mathématiques. La programmation fonctionnelle essaye de rester le plus proche possible du fonctionnement des fonctions et valeurs des mathématiques.

Cependant en codant des programmes Spark non-triviaux, il est parfois utile d'exploiter les fonctionnalités orientés objet.

### Classes vs. Instances
Scala utilise la même différenciation entre les classes et les instances que vous trouverez en Java. Les classes sont comme des _templates_ utilisées pour créer des instances.

Nous avons parlé des _types_ de diverses choses, comme `word` qui est un `String` et `totalCount` qui est un `Int`. Une classe définit un _type_ aussi.

Voici un exemple d'une classe que nous pourrions utiliser pour représenter l'index inversé des lignes que nous venons de créer :

In [55]:
class IIRecord1(
    word: String, 
    total_count: Int, 
    locations: Array[String], 
    counts: Array[Int]) {
    
    /** CSV formatted string, but use [a,b,c] for the arrays */
    override def toString: String = {
        val locStr = locations.mkString("[", ",", "]")  // i.e., "[a,b,c]"
        val cntStr = counts.mkString("[", ",", "]")  // i.e., "[1,2,3]"
        s"$word,$total_count,$locStr,$cntStr"
    }
}

new IIRecord1("hello", 3, Array("one", "two"), Array(1, 2))

defined class IIRecord1


hello,3,[one,two],[1,2]

Quand nous définissons une classe, la liste des arguments après le nom de la classe est la liste des arguments pour le _constructeur primaire_. Vous pouvez définir des constructeurs secondaires aussi mais ce n'est pas très commun de le faire pour des raisons que nous allons voir sous peu.

Notez que quand vous overridez une méthode qui est définie dans une classe parent comme `Object.toString` de Java, Scala a besoin que vous ajoutiez le mot-clé `override`.

Nous avons crée une _instance_ de `IIRecord1` en utilisant `new` comme en Java.

Finalement, remarque supplémentaire, nous avons utilisé jusqu'à ici beaucoup de `Int` (entiers) pour les différents counts, mais en réalité dans le "big data" nous devrions probablement utiliser des `Long`.

### Objets

Nous avons intentionnellement utilisé le mot _instance_ pour les choses que nous créeons des classes. Cela est dû au fait que Scala possède la fonctionnalité de créer seulement une instance de la classe avec [Singleton Design Pattern](https://en.wikipedia.org/wiki/Singleton_pattern). Dans ce cas, il faudra utiliser le mot-clé `object`.

En Java par exemple vous devez définir une classe avec la méthode `static void main(String[] arguments)` comme point d'entrée de votre programme. En Scala vous pouvez utiliser un `object` qui contiendra le `main` comme montré dans l'exemple suivant :

In [56]:
object MySparkJob {

    val greeting = "Hello Spark!"
    
    def main(arguments: Array[String]) = {
        println(greeting)
        
        // Create your SparkContext, etc., etc.
    }
}

defined object MySparkJob


Comme pour les classes, le nom de l'objet peut être ce que vous voulez. Il n'y a pas de mot-clé `static` en Scala. A la place d'ajouter des méthodes et champs `static` dans les classes comme en Java, vous pouvez les mettre dans des objets comme montré ici.

> **NOTE:** Parce que le compilateur Scala doit générer du byte code valide pour la JVM, ces définitions sont converties en un équivalent Java qui contient des définitions static.

### Case Classes
Les tuples sont utiles pour représenter des lignes et les décomposer avec du pattern matching. Cependant, il serait agréable si les champs étaient _nommés_ ainsi que _typés_. Une bonne utilisation pour une classe comme `IIRecord1` pour représenter cette structure et nous donner des champs nommés. Affinons donc la définition de la classe afin d'exploiter des fonctionnalités supplémentaires très utile en Scala.

Considérons la définition suivante d'une _case class_ qui représente notre type d'enregistrement final.

In [57]:
case class IIRecord(
    word: String, 
    total_count: Int = 0, 
    locations: Array[String] = Array.empty, 
    counts: Array[Int] = Array.empty) {

    /** 
     * Different than our CSV output above, but see toCSV.
     * Array.toString is useless, so format these ourselves. 
     */
    override def toString: String = 
        s"""IIRecord($word, $total_count, $locStr, $cntStr)"""
    
    /** CSV-formatted string, but use [a,b,c] for the arrays */
    def toCSV: String = 
        s"$word,$total_count,$locStr,$cntStr"
        
    /** Return a JSON-formatted string for the instance. */
    def toJSONString: String = 
        s"""{
        |  "word":        "$word", 
        |  "total_count": $total_count, 
        |  "locations":   ${toJSONArrayString(locations)},
        |  "counts"       ${toArrayString(counts, ", ")}
        |}
        |""".stripMargin

    private def locStr = toArrayString(locations)
    private def cntStr = toArrayString(counts)

    // "[_]" means we don't care what type of elements; we're just
    // calling toString on them!
    private def toArrayString(array: Array[_], delim: String = ","): String = 
        array.mkString("[", delim, "]")  // i.e., "[a,b,c]"

    private def toJSONArrayString(array: Array[String]): String =
        toArrayString(array.map(quote), ", ")
    
    private def quote(word: String): String = "\"" + word + "\""  
}

defined class IIRecord


Nous avons dit que définir des constructeurs secondaires n'était pas très commun. En partie c'est parce que nous avons utilisé une fonctionnalité pratique, la capacité de définir des valeurs par défaut pour les arguments de méthodes, en incluant le constructeur primaire. La possibilité d'utiliser des valeurs par défaut signifie que nous pouvons créer des instances sans avoir à donner tous les arguments explicitement tant qu'il y a une valeur par défaut définie et de même pour l'appel des méthodes. Considérez ces deux exemples suivant :

In [58]:
val hello = new IIRecord("hello")
val world = new IIRecord("world!", 3, Array("one", "two"), Array(1, 2))

println("\n`toString` output:")
println(hello)
println(world)

println("\n`toJSONString` output:")
println(hello.toJSONString)
println(world.toJSONString)

println("\n`toCSV` output:")
println(hello.toCSV)
println(world.toCSV)


`toString` output:
IIRecord(hello, 0, [], [])
IIRecord(world!, 3, [one,two], [1,2])

`toJSONString` output:
{
  "word":        "hello", 
  "total_count": 0, 
  "locations":   [],
  "counts"       []
}

{
  "word":        "world!", 
  "total_count": 3, 
  "locations":   ["one", "two"],
  "counts"       [1, 2]
}


`toCSV` output:
hello,0,[],[]
world!,3,[one,two],[1,2]


hello = IIRecord(hello, 0, [], [])
world = IIRecord(world!, 3, [one,two], [1,2])


IIRecord(world!, 3, [one,two], [1,2])

Nous avons ajouté `toJSONString` pour illustrer l'ajout de méthodes _publiques_, la visibilité par défaut et les méthodes _privées_ à la définition d'une classe. Quand il n'y a pas de méthodes ou de variables sans champs à définir, nous pouvons omettre le corps entièrement, pas de `{}` nécessaire.

Souvenez-vous du mot-clé `override` qui est utilisé lorsque nous redéfinissons `toString`.

Qu'en est-il du mot-clé `case`? Ce mot-clé dit au compilateur de faire plusieurs choses utiles pour nous, éliminant ainsi pas mal de code superflu que nous pourrions trouver dans d'autres langages, comme Java :

1. Traitez chaque argument de constructeur comme un champ privé immutable (`val`) de l'instance.
1. Générez une méthode de lecteur publique pour le champ avec le même nom (par exemple `word`).
1. Générez des implémentations _correctes_ des méthodes `equals` et `hashCode`, souvent implémentées incorrectement, ainsi que la méthode par défaut `toString`. Vous pouvez utiliser vos propres définitions en les ajoutant explicitement dans le corps. Nous avons fait cela pour `toString` afin de formatter les tableaux de manière plus agréable que la méthode par défaut `Array[_].toString`.
1. Générez un `object IIRecord`, donc avec le même nom. L'objet sera appelé _objet compagnon_.
1. Générez une méthode "factory" dans l'objet compagnon qui prend la même liste d'argument et qui instancie une instance.
1. Générez des méthodes helper dans l'objet compagnon qui supportent le pattern matching.

Les points 1 et 2 font que chaque argument se comporte comme s'ils étaient public, en lecture seule des champs de l'instance mais ils sont implémentés comme décrit.

Le troisième point est important pour comprendre le comportement standard. Les instances de case class sont souvent utilisés comme clés dans [Maps](http://www.scala-lang.org/api/current/index.html#scala.collection.Map) et [Sets](http://www.scala-lang.org/api/current/index.html#scala.collection.Set), les RDD Spark RDD et méthodes de DataFrame, etc. En fait vous devriez utiliser _uniquement_ vos case classes ou les types inclus dans Scala possédant des bonnes définitions des méthodes `hashCode` et `equals` (comme `Int` et les autres types numériques, `String`, tuples, etc.) comme clés.

Pour le quatrième point, _l'objet compagnon_ est généré automatiquement par le compilateur. Le compilateur ajoute une méthode "factory" mentioné dans le cinquième point, et des méthodes qui supportent le pattern matching, expliqué dans le sixième point. Vous pouvez définir ces méthodes explicitement par vous même ainsi que les champs pour retenir un état. Le compilateur insèrera toujours ces méthodes. Cependant jetez un oeil à <a href="#Ambiguities">Ambiguities with Companion Objects</a>. La conclusion est que vous ne devriez pas définir des case classes dans un notebook comme celui ci avec des méthodes en plus dans l'objet compagnon à cause de problèmes de parsing que cela peut engendrer.

Le cinquième point signifie que `new` est rarement utilisé quand des instances sont crées. Cependant, ces deux lignes suivantes sont équivalentes :

In [59]:
val hello1 = new IIRecord("hello1")
val hello2 = IIRecord("hello2")

hello1 = IIRecord(hello1, 0, [], [])
hello2 = IIRecord(hello2, 0, [], [])


IIRecord(hello2, 0, [], [])

Qu'arrive t'il en réalité dans le second cas sans `new` ? La méthode "factory" est en fait appelée `apply`. En Scala quand vous mettez une liste d'argument après n'importe quelle _instance_, en incluant les `objets` comme dans le cas de `hello2`, une méthode `apply` va être recherchée par Scala pour être appelée. Les arguments devront correspondre à la liste des argument à apply (nombre d'arguments, types des arguments, prise en compte des valeurs par défaut des arguments, etc.). Ainsi en réalité la déclaration de `hello2` est :

In [60]:
val hello2b = IIRecord.apply("hello2b")

hello2b = IIRecord(hello2b, 0, [], [])


IIRecord(hello2b, 0, [], [])

Vous pouvez exploiter cette fonctionnalité dans vos autres classes aussi. Nous avon parlé du mot "stemming" en haut. Supposons que vous écriviez une libraire de stemming et déclariez un objet comme point d'entrée. Ici nous allons faire quelque chose de simple; nous assumons qu'un "s" à la fin du mot signifie que le mot est au pluriel et nous allons l'enlever (c'est une mauvaise présomption...) :

In [61]:
object stem {
    def apply(word: String): String = word.replaceFirst("s$", "") // insert real implementation!
}

println(stem("dog"))
println(stem("dogs"))

defined object stem


dog
dog


Notez que l'appel à `stem` ressemble à un appel de fonction ou de méthode. Scala permet aux objets et aux classes d'avoir un nom qui commence avec une lettre minuscule.

Finalement, le 6ème point signifie que nous pouvons utiliser nos case classes spécifique dans des expressions de pattern matching. Nous n'allons pas regarder les méthodes implémentées dans l'objet compagnon et comment ils supportent le pattern matching. Nous allons seulement utiliser de la "magie" dans l'exemple suivant qui "parcourt" nos instances définies précedemment, `hello` et `world`.

In [62]:
Seq(hello, world).map {
    case IIRecord(word, 0, _, _) => s"$word with no occurrences."
    case IIRecord(word, cnt, locs, cnts) => 
        s"$word occurs $cnt times: ${locs.zip(cnts).mkString(", ")}"
}

List(hello with no occurrences., world! occurs 3 times: (one,1), (two,2))

La première clause case ignore les locations et les counts car nous savons que ce seront des arrays vides si le count total est 0 !

La seconde clause case utilise la méthode `zip` pour mettre les locations et les counts ensemble. Souvenez-vous que nous avons utilisé `unzip` pour créer des collections séparées.

## Datasets et DataFrames
Jusque là, nous avons principalement utilisé l'API RDD de Spark. Les case classes sont souvent utilisées pour représenter des "schema" de records quand on utilise des RDD. Elles le sont également avec un nouveau type, [Dataset[T]](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset), qui est similaire à `RDD[T]`, où `T` représente un type de records.

Les [DataFrames](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame) possèdent un problème qui est que les champs sont non-typés jusqu'à leur accès. Les `Datasets` rétablissent l'assurance du type des `RDD` en utilisant une case class comme la définition d'un schema.

Les `Datasets` ont été introduits avec Spark 1.6.0, mais elles étaient quelque peu incomplètes dans les versions 1.6.X. Dans Spark 2.0.0, `Dataset` devient la classe "parente" de `DataFrame`. Cela signifie qu'il est recommandé d'utiliser la plus grande assurance de type de `Dataset`, mais vous pouvez toujours utiliser `DataFrame` si vous le souhaitez. Maintenant, `DataFrame` est l'équivalent de `Dataset[Row]`, où [Row](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row) est la représentation grossière des row et columns.


Essayons cela. Mais d'abord, nous devons importer du code venant de SparkSQL. Scala vous laisse importer du code n'importe où, alors qu'avec Java les imports se font au début du fichier source. Scala vous laisse aussi importer des membres des instances, pas juste des imports statiques supportés par Java.

La prochaine cellule importe donc des "implicits" de l'instance [SQLContext](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLContext) qui est déjà dans le scope. Malheureusement, dû à une ambiguïté du scope impliquant les notebooks et l'interpréteur Scala, nous devons assigner `sqlContext` à une nouvelle variable et _ensuite_ importer avec celle-ci :

In [63]:
val sqlc = sqlContext
import sqlc.implicits._

sqlc = org.apache.spark.sql.SQLContext@27a9da47


org.apache.spark.sql.SQLContext@27a9da47

We'll explain what "implicits" are <a href="#implicits">later</a>. For now, suffice it to say that they are used to "allow" us to call the `as` method on our `iiDF` `DataFrame`, which converts it to a `Dataset[IIRecord]`.

Nous allons expliquer ce que sont les `implicits` <a href="#implicits">plus tard</a>. Pour le moment, ils nous "permettent" d'appeler la méthode `as` sur notre `DataFrame` `iiDF`, ce qui la convertit en `Dataset[IIRecord]`. 

In [64]:
val iiDS = iiDF.as[IIRecord]
iiDS

iiDS = [word: string, total_count: int ... 2 more fields]


[word: string, total_count: int ... 2 more fields]

In [65]:
iiDS.show

+-----------+-----------+--------------------+--------------------+
|       word|total_count|           locations|              counts|
+-----------+-----------+--------------------+--------------------+
|          a|       3350|[loveslabourslost...|[507, 494, 492, 4...|
|    abandon|          6|[asyoulikeit, tam...|           [4, 1, 1]|
|      abate|          3|[loveslabourslost...|           [1, 1, 1]|
|  abatement|          1|      [twelfthnight]|                 [1]|
|     abbess|          8|    [comedyoferrors]|                 [8]|
|      abbey|          9|    [comedyoferrors]|                 [9]|
|abbominable|          1|  [loveslabourslost]|                 [1]|
|abbreviated|          1|  [loveslabourslost]|                 [1]|
|       abed|          2|[asyoulikeit, twe...|              [1, 1]|
|   abetting|          1|    [comedyoferrors]|                 [1]|
|abhominable|          1|  [loveslabourslost]|                 [1]|
|      abhor|          5|[asyoulikeit, com...|  

# "Scala pour Spark avancé"
Nous avons déjà abordé beaucoup de choses dans ce notebook, en se concentrant sur les sujets essentiels que vous devez savoir de Scala pour un usage quotidien. Appelons ces choses "Scala pour Spark débutant".

Dès maintenant, nous vous conseillons de créer un nouveau notebook et de jouer avec Spark en utilisant ce que vous avez appris jusqu'à maintenant et de revenir ici si vous tombez sur quelque chose qui n'a pas été encore abordé jusque là. Il y a une chance pour que ce que vous devez savoir se trouve dans la partie qui suit.

### Tout importer dans un package
En Java, `import foo.bar.*;` signifie que nous allons tout importer du package `bar`.

En Scala, `*` est en fait un nom de méthode légal; par exemple lorsque vous définissez multiplication pour un type numérique crée comme `Matrix`. Ainsi un import avec une `*` serait ambigu. Scala utilise donc `_` au lieu de `*`, `import foo.bar._` (avec le point-virgule qui est inféré)

À quoi ressemblerait la définition de la méthode `*` ? Elle ressemblerait à quelque chose comme :

```scala
case class Matrix(rows: Array[Array[Double]]) {  // Each row is an Array[Double]

    /** Multiple this matrix by another. */
    def *(other: Matrix): Matrix = ...
    
    /** Add this matrix by another. */
    def +(other: Matrix): Matrix = ...
    
    ...
}

val row1: Array[Array[Double]] = ...
val row2: Array[Array[Double]] = ...
val m1 = Matrix(rows1)
val m2 = Matrix(rows2)
val m1_times_m2 = m1 * m2
val m1_plus_m2 = m1 + m2
```

### Syntaxe des opérateurs

Une seconde ! Que signifie `m1 * m2` ? Ne devrait-ce pas être `m1.*(m2)` ? Cela serait vraiment pratique pour utiliser la "syntaxe opérateur", plus précisément appelé la _notation d'opérateur infixe_ pour des méthodes diverses comme `*` et `+` ici. Le parser Scala supporte cela avec un simple relâchement des règles; quand une méthode prend en paramètre un argument unique, alors vous pouvez omettre le point `.` et les parenthèses `(...)`. Ces deux lignes sont équivalentes :

```scala
val m1_times_m2 = m1.*(m2)
val m1_times_m2 = m1 * m2
```

Cela peut amener à du code confus, particulièrement pour les débutants à Scala, utilisez la donc avec précaution.

### Traits
Les _traits_ sont similaires aux _interfaces_ Java 8, utilisés pour définir des abstractions, mais avec la possibilité de fournir des implémentations par "défaut" des méthodes déclarées. Contrairement aux interfaces de Java 8, les traits peuvent aussi avoir des champs qui représentes "l'était" de l'information à propos des instances. Il y a une ligne fine entre les _traits_ et les _classes abstraites_, là encore avec des méthodes membres ou champs ne sont pas défini.
Dans les deux cas, les sous-types de trait et/ou d'une classe abstraite doivent définir tous les membres non définis si vous voulez pouvoir construire des instances de celles-ci.

Pouruqoi avoir traits et classes abstraites ? C'est parce que Java ne permet _qu'un seul héritage_; ile ne peut y avoir qu'un type _parent_, qio est normalement utilisé là où vous voudriez utiliser une classe abstraite, mais Scala vous permet de "mixer" des traits (ou d'utiliser un trait comme la classe parente - assez confus). Un bon exemple de trait "mélangé" est celui qui implémente le logging. Tous les "services" peuvent ajouter le trait de logging pour avoir un accès "instantané" à cette fonctionnalité réutilisable. Cela ressemblerait au suivant :

```scala
// Assume severity `Level` and `Logger` types defined elsewhere...
trait Logging {

    def log(level: Level, message: String): Unit = logger.log(level, message)
    
    private logger: Logger = ...
}

abstract class Service {
    def run(): Unit   // No body, so abstract!
}

class MyService extends Service with Logging {
    def run(): Unit = {
        log(INFO, "Staring MyService...")
        ...
        log(INFO, "Finished MyService")
    }
}
```

`Unit` en Scala est l'équivalent Java de `void`. C'est un vrai type avec une valeur de retour unique contrairement à `void`, et nous l'utilisons dans le sens "rien d'utile va être retourné".

### Intervalles
Et si vous vouliez des nombres entre une valeur de départ et de fin ? Utilisez un [Range](http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Range), qui possède une syntaxe littérale `1 until 100`, `2 to 200 by 3`. 

`Range` comprend toujours la borne inférieure. Utiliser `to` dans `Range` fait en sorte que la borne supérieure soit _incluse_. Utiliser `until` le rend _exclusif_. Utilisez `by` pour spécifier un delta, qui par défaut est de `1`.

In [66]:
1 until 10

Range(1, 2, 3, 4, 5, 6, 7, 8, 9)

In [67]:
1 to 10

Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

In [68]:
1 to 10 by 3

Range(1, 4, 7, 10)

Quand vous avez besoin d'un jeu de données pour jouer avec Spark, les intervalles peuvent être pratique.

In [69]:
val rdd7 = sc.parallelize(1 to 50).
    map(i => (i, i%7)).
    groupBy{ case (i, seven) => seven }.
    sortByKey()
rdd7.take(7).foreach(println)

(0,CompactBuffer((7,0), (14,0), (21,0), (28,0), (35,0), (42,0), (49,0)))
(1,CompactBuffer((1,1), (8,1), (15,1), (22,1), (29,1), (36,1), (43,1), (50,1)))
(2,CompactBuffer((2,2), (9,2), (16,2), (23,2), (30,2), (37,2), (44,2)))
(3,CompactBuffer((3,3), (10,3), (17,3), (24,3), (31,3), (38,3), (45,3)))
(4,CompactBuffer((4,4), (11,4), (18,4), (25,4), (32,4), (39,4), (46,4)))
(5,CompactBuffer((5,5), (12,5), (19,5), (26,5), (33,5), (40,5), (47,5)))
(6,CompactBuffer((6,6), (13,6), (20,6), (27,6), (34,6), (41,6), (48,6)))


rdd7 = ShuffledRDD[101] at sortByKey at <console>:39


ShuffledRDD[101] at sortByKey at <console>:39

[SparkContext](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext) a aussi une méthode `range` qui fait la même chose que `sc.parallelize(some_range)`.

### Interpréteur Scala (REPL) vs. Notebooks vs. Compilateur Scala
<a name="REPL"></a>
Ce notebook est en train d'utiliser un interprèteur Scala, le _REPL_ ("read, eval, print, loop") pour parser le code Scala. La distribution Spark est fournie avec un script `spark-shell` qui vous permet aussi d'utiliser l'interpréteur depuis une ligne de commande mais sans la belle interface utilisateur d'un notebook.

Si vous utilisez le `spark-shell`, il y a quelques différences de comportement que vous devez savoir.

#### Utiliser le mode :paste
Par défaut, l'interpréteur Scala traite _chaque ligne_ que vous entrez de manière séparée. Cela peut engendrer des surprises comparé à la manière de fonctionner du _compilateur_ Scala, où le code sera traité dans le même fichier et le même contexte.

Par exemple, dans le code suivant, l'expression continue sur la seconde ligne, est comprise par le compilateur mais pas par l'interpréteur.

```scala
(1 to 100)
.map(i => i*i)
```

L'interpréteur pense qu'il a fini d'analyser l'expression quand il arrive à une nouvelle ligne après le littéral [Range](http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Range), `1 to 100`. L'interpréteur lance ensuite une erreur sur le `.` de la ligne suivante. En parallèle, le compilateur continue de compiler et ignore les nouvelles lignes dans ce cas-là.

Ce notebook fait la même chose que l'interpréteur mais dans quelques cas, les notebooks vont utiliser une commande, `:paste` qui indique au parser d'analyser toutes les lignes qui suivent ensemble, comme ce que le compilateur ferait jusqu'à "end of input" que vous pouvez indiquer avec `CTRL-D`.

Vous ne pouvez pas l'expérimenter avec ce notebook, mais votre session ressemblerait à quelque chose comme ça :

```scala
scala> :paste
// Entering paste mode (ctrl-D to finish)

(1 to 10)
.map(i => i*i)
<CTRL-D>

// Exiting paste mode, now interpreting.

res0: scala.collection.immutable.IndexedSeq[Int] = Vector(1, 4, 9, 16, 25, 36, 49, 64, 81, 100)

scala>
```

#### Ambiguïtés avec les objets compagnons
<a name="Ambiguïtés"></a>
Lors de la rédaction de ce notebook, nous _voulions_ démontrer en utilisant l'objet compagnon `IIRecord` de définir une méthode explicitement, mais cela mène à une ambiguïté plus tard dans ce notebook si vous voulez utiliser cette méthode. Le notebook confondra la case class et l'objet.

C'est regrettable mais il est vrai que lorsque vous commencez à définir des case classes plus complexes, avec plus que des méthodes triviales et des additions explicites à l'objet compagnon par défaut, vous devriez vraiment définir ces types en dehors du notebook dans une librairie compilée que vous pourrez utiliser dans ce notebook.

Les détails sont en dehors de notre cadre ici, mais vous pouvez créer un projet avec votre code Scala et le build avec votre outil de build favori. [SBT](http://www.scala-sbt.org/) est un choix populaire pour Scala, mais Maven, Gradle, etc peuvent être utilisés.

Vous devez générer un fichier _jar_ avec les artefacts compilés, puis quand vous lancez votre `spark-shell`, soumettez votre job Spark avec un `spark-submit` ou utilisez un environnement de notebook comme celui-ci en spécifiant les jars pour les inclure. Pour `spark-shell` et `spark-submit`, invoquez le avec l'option `--jars myproject.jar`. Pour Toree with Jupyter, regardez la discussion sur [FAQ page](https://toree.incubator.apache.org/documentation/user/faq.html).

### Hiérarchie de type Scala
La hiérarchie de type dans Scala est similaire à celle de Java mais avec des différences notables.

![Scala Type Hierarchy](http://docs.scala-lang.org/resources/images/classhierarchy.img_assist_custom.png)

En Java, tous les _références de types_ proviennent de [java.lang.Object](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html). Le nom _référence de type_ reflète le fait que les instances de tous ces types sont alloués sur le _heap_ et que les variables du programmes sont des références à ces heap.

Les types primitifs, `int`, `long`, etc ne sont pas considérés comme faisant partie de la hiérarchie de type et sont traités spécialement. C'est en partie pour optimiser les performances car les instances de ces types tiennent dans le registre du processeur et les valeurs sont envoyées dans la pile d'éxécution. Cependant, ces valeurs sont types "boxed", par exemple `Interger`, `Long`, etc et font parti de l'hiérarchie de type que vous êtes obligé d'utiliser avec les collections Java par exemple (avec l'exception des tableaux).

Scala traite les primitives au niveau du code comme des types de références. Vous n'utilisez pas `new Int(100)` par exemple, mais vous pouvez appeler des méthodes sur des instances de `Int`. Le code généré utilise en général des primitives de la JVM optimisés.

Ainsi, l'hiérarchie de type Scala définit un type [Any](http://www.scala-lang.org/api/current/#scala.Any) qui est un parent des types de références et des types de valeurs (pour les primitives). Chacun de ces sous-hiérarchies ont des types parent, [AnyRef](http://www.scala-lang.org/api/current/#scala.AnyRef) qui est la même chose que [java.lang.Object](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html), et [AnyVal](http://www.scala-lang.org/api/current/#scala.AnyVal) est le parent des types de valeur.

Finalement, pour plus de "solidité", le système de type Scala définit un type réel qui représente [Null](http://www.scala-lang.org/api/current/#scala.Null) et [Nothing](http://www.scala-lang.org/api/current/#scala.Nothing). En définissant `Null` comme étant le sous-type de tous les types de références `AnyRefs` (mais non pas `AnyVals`), Scala supporte au niveau du typage la pratique regrettable d'utiliser `null` pour une valeur de référence.

Cependant, `null` n'est pas autorisé pour un `AnyVal`, donc le vrai "type au fond" de la hiérarchie est `Nothing`. Nous allons expliquer l'utilité de cela dans la prochaine section.

<a name="TryOptionNull"></a>
### Try vs. Option vs. null

Rappelez-vous de la signature de notre méthode `curl` au début de ce notebook.

```scala
def curl(sourceURLString: String, targetDirectoryString: String): File = ...
```

Elle retourne un `File` quand tout va bien, mais elle peut aussi renvoyer une exception. Une alternative est de retourner un `Try[File]` où [Try](http://www.scala-lang.org/api/current/index.html#scala.util.Try) encapsule les deux cas dans la valeur de retour, comme nous le verrons plus tard. Nous verrons également une autre alternative, [Option](http://www.scala-lang.org/api/current/index.html#scala.Option).

Supposons maintenant que nous déclarions `curl` pour qu'elle retourne [util.Try[T]](http://www.scala-lang.org/api/current/index.html#scala.util.Try), où `T` est un `java.io.File`. La seule chose à changer dans le corps de la méthode est de tout simplement ajouter un `Try` avant la parenthèse ouvrante :

```
def curl(sourceURLString: String, targetDirectoryString: String): Try[File] = Try {...}
```

Maintenant avec cette signature, le lecteur sait que la méthode peut potentiellement échouer. Si un appel échoue, une exception adéquate sera retournée enveloppée dans une sous-classe de `Try` appelée [util.Failure[T]](http://www.scala-lang.org/api/current/index.html#scala.util.Failure). Toutefois, si `curl` réussi, le `File` sera retourné enveloppé dans une autre sous-classe de `Try` : [util.Success[T]](http://www.scala-lang.org/api/current/index.html#scala.util.Success).

Grâce à l'assurance de type de Scala, l'appelant de `curl` doit déterminer quel résultat est retourné et le gérer de manière adéquate. Cela signifie que l'appelant doit déterminer si `Success` ou `Failure` a été retourné, et le traiter de manière adéquate.

Scala ne possède pas de déclarations d'exceptions comme Java. Donc en regardant la version orginiale de notre signature, il n'y a pas de manière évidente de voir si la méthode renvoit une exception _ou_ un `null` en cas d'échec.

```scala
def curl(sourceURLString: String, targetDirectoryString: String): File = {...}
```

Si nous choisissons d'attraper les exceptions en interne et retournons `null`, l'appelant doit se rappeler de vérifier le cas pour `null`. Sinon, le célèbre [NullPointerException](https://docs.oracle.com/javase/8/docs/api/java/lang/NullPointerException.html) peut apparaître occasionellement si l'appelant suppose qu'une valeur non-`null` est retournée. Donc utiliser `Try[T]` nous protège de cette faille. _Cela aide l'utilisateur à faire la bonne chose !_

Utiliser `Try` au lieu de simplement renvoyer une exception signifie également que `curl` retourne toujours, "normalement", quelque chose, pour que l'appelant puisse avoir un contrôle total sur la stack d'appel et pour ne pas qu'une logique spéciale d'exception-catching soit requise.

Quelles sont les sous-classes valides de `Try` ? Il n'y en a que deux : `Success` et `Failure`. Ce serait une erreur d'autoriser les utilisateurs de définir d'autres sous-classes comme `PeutEchouerMaisQuiSait`, car les utilisateurs de `Try` voudront toujours savoir qu'il n'y a que deux possibilités en pattern matching. Scala ajoute un mot-clé pour renforcer ce comportement logique. `Try` se déclare en fait de la façon suivante :

```scala
sealed abstract class Try[+T] extends AnyRef
```

(`AnyRef` est identique au supertype `Object` de Java.) Le mot-clé `sealed` fait qu'on ne peut déclarer _aucune_ sous-classe de `Try`; _sauf_ si c'est dans le même fichier source que cette déclaration (qui est dans la bibliothèque que l'auteur a écrit). Ainsi, les utilisateurs de `Try` ne peuvent pas déclarer leur propre sous-classes, ce qui altèrerait la structure logique de cette hiérarchie de type et autres code d'utilisateurs qui seraient dépendants de cette structure.

Et si nous avons une situtation dans laquelle il est illogique d'inclure une exception mais que nous voulons le même traitement logique ? C'est là où vient [Option[T]](http://www.scala-lang.org/api/current/index.html#scala.Option).

`Option` est similaire à `Try`, c'est un `sealed` abstract type avec deux possibles sous-types :

* [Some[T]](http://www.scala-lang.org/api/current/index.html#scala.None): J'ai une instance `T` pour votre résultat, à l'intérieur de `Some[T]`
* [None](http://www.scala-lang.org/api/current/index.html#scala.None): Je n'ai pas de valeur pour votre résultat, désolé.

Notez qu'un hash map est un bon example où nous avons ou pas une valeur pour une clé donnée. Par conséquent, pour l'abstraction  [Map[K,V]](http://www.scala-lang.org/api/current/index.html#scala.collection.Map) de Scala où `K` est le type de la clé et `V` est le type de la valeur, la méthode `get` a cette signature :

```scala
def get(key: K): Option[V]
```

Encore une fois, vous voyez avec cette signature que vous allez avoir ou non une instance de valeur pour la clé donnée en entrée, _et_ vous **devez** déterminer si vous obtenez un `Some[V]` ou `None` comme résultat. Encore une fois, nous évitons de retourner `null` et de risquer un `NullPointerExceptions` si nous oublions de gérer ce cas.

Comment déterminons-nous alors quel `Option[T]` a été retourné ? Voyons quelques exemples utilisant `Option`. Pouvez vous deviner ce qu'ils font ? Vérifier l'[Option Scaladocs](http://www.scala-lang.org/api/current/#scala.Option) pour confirmer. `Try` peut être utilisé de manière similaire, avec d'autres moyens disponibles que nous ne discuterons pas ici (mais regardez le [Try Scaladocs](http://www.scala-lang.org/api/current/#scala.util.Try)).

In [70]:
val options = Seq(None, Some(2), Some(3), None, Some(5))

options.foreach { o =>
    println(o.getOrElse("None"))
}

None
2
3
None
5


options = List(None, Some(2), Some(3), None, Some(5))


List(None, Some(2), Some(3), None, Some(5))

In [71]:
options.foreach {
    case None    => println(None)
    case Some(i) => println(i)  // Note how we extract the enclosed value.
}

None
2
3
None
5


Si vous voulez juste ignorer les valeurs `None`, utilisez un _for comprehension_ :

In [72]:
for {
    option <- options  // loop through the options, assign each to "option"
    value  <- option   // extract the value from the Some, or if None, skip to the next loop
} println(value)

2
3
5


Finalement, vous vous demandez peut-être comment `None` est déclaré. Considérez l'exemple suivant :

In [73]:
val opts: Seq[Option[String]] = Seq(Some("hello"), None, Some("world!"))
opts.foreach(println)

Some(hello)
None
Some(world!)


opts = List(Some(hello), None, Some(world!))


List(Some(hello), None, Some(world!))

Cela marche, donc cela veut dire que `None` est une sous-classe valide d'`Option[String]`. C'est en fait vrai pour tout `Option[T]`. Comment un seul object peut-il être un sous-type valide de _tous_ ces options ? Voilà comment il est déclaré (en ignorant quelques détails) : 

```scala
object None extends Option[Nothing] {...}

```

`None` ne porte aucune information d'"état" car il n'enveloppe pas d'instance comme le fait `Some[T]`. Ainsi, nous n'avons besoin que d'une instance pour toutes ces utilisations, donc il est déclaré comme un objet. Rappelez-vous que nous avons mentionné au-dessus que le système de type possède un type [Nothing](http://www.scala-lang.org/api/current/#scala.Nothing) qui est un sous-type de tout les autres types. Sans rentrer dans les détails, si une variable est de type `Option[String]`, alors vous pouvez utiliser un `Option[Nothing]` pour elle (i.e., ce dernier est un sous-type du premier). C'est pour cela que `Nothing` est utile : pour les cas comme `None`, pour que nous puissons avoir une instance de cela, mais qui obéit quand meme aux règles du système de types orienté objet de Scala.

### Implicits
<a name="implicits"></a>  
Scala possède un méchanisme puissant appelé _implicits_ qui est utilisé dans l'API Spark Scala. Les implicits représentent un sujet vaste, nous nous attarderons donc seulement sur les usages de ceux-ci qui sont important de comprendre. 

#### Conversion de types
Au-dessus nous avons utilisé des méthodes de `RDD` comme `reduceByKey`, mais si vous cherchez cette méthode dans la [page RDD de Scaladoc](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), vous ne la trouverez pas. À la place, elle est définie dans le type [PairRDDFunctions](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions) (avec les autres méthodes de la forme `*ByKey`). Comment alors pouvons nous utiliser ces méthodes comme si elles étaient définies pour des `RDD` ??

Quand le compilateur Scala voit un bout de code appeler une méthode qui n'existe pas pour un type, il cherche une _conversion implicit_ dans le scope, ce qui transforme l'instance dans un autre type possédant la méthode requise (c-à-d en l'enveloppant). La signature complète déduite pour la méthode doit correspondre à la définition de la classe enveloppante.

> **Note:** Si vous ne trouvez pas une méthode dans la [Scaladocs Spark](http://spark.apache.org/docs/latest/api/scala/index.html#package) pour un type où vous pensez qu'elle devrait être définie, pensez à regarder les helper types associés à la méthode.

Voici un petit exemple Scala de ce fonctionnement :


In [74]:
// A sample class. Note it doesn't define a `toJSON` method:
case class Person(name: String, age: Int = 0)

defined class Person


In [75]:
// To scope them, define implicit conversions within an object
object implicits {

    // `implicit` keyword tells the compiler to consider this conversion.
    // It takes a `Person`, returning a new instance of `PersonToJSONString`,
    // then resolves the invocation of `toJSON`.
    implicit class PersonToJSONString(person: Person) {
        def toJSON: String = s"""{"name": ${person.name}, "age": ${person.age}}"""
    }
}

import implicits._        // Now it is visible in the current scope.

val p = Person("Dean Wampler", 39)

// Magic conversion to `PersonToJSONString`, then `toJSON` is called.
p.toJSON

defined object implicits
p = Person(Dean Wampler,39)


{"name": Dean Wampler, "age": 39}

Pour les `RDDs`, les conversions implicites vers `PairRDDFunctions` et autres types supports sont gérés pour vous. Cependant, lorsque vous utilisez Spark SQL et l'API [DataFrame](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame), vous devrez vous-même importer quelques-unes de ces conversions comme cela :

In [76]:
val sqlc = sqlContext
import sqlc.implicits._  

sqlc = org.apache.spark.sql.SQLContext@27a9da47


org.apache.spark.sql.SQLContext@27a9da47

In [77]:
val wtc = iiDF.select($"word", $"total_count")
wtc.show

+-----------+-----------+
|       word|total_count|
+-----------+-----------+
|          a|       3350|
|    abandon|          6|
|      abate|          3|
|  abatement|          1|
|     abbess|          8|
|      abbey|          9|
|abbominable|          1|
|abbreviated|          1|
|       abed|          2|
|   abetting|          1|
|abhominable|          1|
|      abhor|          5|
|     abhors|          2|
|      abide|          5|
|     abides|          1|
|    ability|          2|
|     abject|          2|
|     abjure|          1|
|    abjured|          2|
|       able|          9|
+-----------+-----------+
only showing top 20 rows



wtc = [word: string, total_count: int]


[word: string, total_count: int]

La syntaxe de référence de colonne `$"name"` est implémentée en utilisant le même méchanisme de la librairie Scala qui implémente les _interpolated strings_, `s"$foo"`. Le `import sqlc.implicits._` la rend utilisable.

Notez que nous avons importé quelque chose d'une _instance_, au lieu d'un package ou un type comme dans Java. Cela peut être une fonctionnalité utile dans Scala, mais également délicate. Si vous essayez `import sqlContext.implicits._`, vous aurez une erreur de compilation qui requiert un "stable identifier". Il s'avère qu'assigner la valeur `val sqlc = sqlContext` en premier permet de remplir cette condition. Cela est unique à l'environnement du notebook. Normalement, vous ne rencontrerez pas ce problème si vous utilisez le `spark-shell` qui vient avec la distribution Spark, ou si vous écrivez un programme Spark et le compilez avec le compilateur Scala.

Cependant, ce serait mieux si Spark définissait cet objet `implicits` dans l'object compagnon de `SQLContext` à la place d'instances de celui-ci !

Pour compléter, mais sans rapport avec les implicits, l'API `DataFrame` vous permet d'écrire des requêtes à la SQL avec une API programmatique. Si vous voulez utiliser des fonctions intégrées comme `min`, `max`, etc. sur les colonnes, vous devez faire l'`import` suivant :

In [78]:
import org.apache.spark.sql.functions._

Maintenant nous pouvons utiliser `min`, `max`, `avg`, etc.

In [79]:
val mma = iiDF.select(min("total_count"), max("total_count"), avg("total_count"))
mma.show

+----------------+----------------+------------------+
|min(total_count)|max(total_count)|  avg(total_count)|
+----------------+----------------+------------------+
|               1|            5208|16.651743683350947|
+----------------+----------------+------------------+



mma = [min(total_count): int, max(total_count): int ... 1 more field]


[min(total_count): int, max(total_count): int ... 1 more field]

#### Implicit Method Arguments
Une autre utilisation des implicits qui mérite d'être su est les _arguments implicites_ de méthodes. Vous rencontrerez ce méchanisme en lisant le Spark Scaladocs, même si vous pourriez ne jamais comprendre qu'en fait vous l'utilisez dans votre code !

Souvenez vous que, précédemment, j'ai mentionné le fait de pouvoir définir des valeurs par défaut pour les arguments de méthodes. Je l'ai utilisé pour l'argument `age` dans `Person`:

```scala
case class Person(name: String, age: Int = 0)
```

Parfois nous avons besoin de quelque chose de plus sophistiqué. Par exemple, notre librairie possède un groupe de méthodes ayant besoin chacun d'un argument spécial qui leur prodigue une information de "contexte" utile, mais vous ne voulez pas que l'utilisateur soit obligé de passer cet argument à chaque fois. À d'autres moments vous pourriez utiliser les arguments implicites pour que l'API soit plus "propre", tout en gardant le contrôle sur ce qui est autorisé de mettre.

Voici un exemple qui est en partie inspiré de la méthode [Seq.sum](http://www.scala-lang.org/api/current/#scala.collection.Seq) de Scala. Ne serait-ce pas formidable si en ayant une collection de choses que nous pouvons "ajouter" ensemble, il suffirait d'appeller la méthode `sum` pour cette collection ? Faisons cela de manière un peu différente, avec une méthode helper `sum` en dehors de `Seq`.

In [80]:
trait Add[T] {
    def add(t1: T, t2: T): T
}

// Nested implicits so they don't conflict with the previous object implicits.
object Adder {
    object implicits {
        implicit val intAdd = new Add[Int] { 
            def add(i1: Int, i2: Int): Int = i1+i2 
        }
        implicit val doubleAdd = new Add[Double] { 
            def add(d1: Double, d2: Double): Double = d1+d2 
        }
        implicit val stringAdd = new Add[String] { 
            def add(s1: String, s2: String): String = s1+s2 
        }
        // etc...
    }
}

import Adder.implicits._

// NOTE: TWO argument lists!
def sum[T](ts: Seq[T])(implicit adder: Add[T]): T = {
    ts.reduceLeft((t1, t2) => adder.add(t1, t2))
}

defined trait Add
defined object Adder


sum: [T](ts: Seq[T])(implicit adder: Add[T])T


In [81]:
sum(0 to 10)

55

In [82]:
sum(0.0 to 5.5 by 0.3)

51.29999999999999

In [83]:
sum(Seq("one", "two", "three"))

onetwothree

In [84]:
// Will fail, because there's no Add[Char] in scope:
sum(Seq('a', 'b', 'c'))   // Characters

Name: Compile Error
Message: <console>:60: error: could not find implicit value for parameter adder: Add[Char]
       sum(Seq('a', 'b', 'c'))   // Characters
          ^

StackTrace: 

Les valeurs implicites `intAdd`, `doubleAdd`, et `stringAdd` ont donc été utilisées par l'interpréteur Scala pour l'argument `adder` dans la seconde `liste d'arguments` de `sum`. Notez que vous devez utiliser une seconde liste d'arguments et que tous les arguments de cette liste doivent être implicites.

Nous aurions pu éviter d'utiliser des arguments implicites si nous avions défini des méthodes custom `sum` pour chaque type. Cela aurait été plus simple dans ce cas trivial, mais pour des méthodes non-trivials les arguments implicites valent plus le coup pour éviter de la duplication. Un autre avantage de ce méchanisme est que l'utilisateur peut definir sa propre instance implicite `Add[T]` pour des domain types (par exemple `Money`) et ils "marcheraient".

L'API collections de Scala utilise ce fonctionnement pour savoir comment construire une nouvelle collection du même type que la collection mise en entrée quand on utilise `map`, `flatMap`, `reduceLeft`, etc.

Spark utilise ce pattern pour les [Encoders](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Encoder) dans Spark SQL. Les Encoders sont utilisés pour sérialiser des valeurs dans le nouveau et compact encodage de mémoire introduite par le projet _Tungsten_ (par exemple [ici](https://spark-summit.org/2015/events/deep-dive-into-project-tungsten-bringing-spark-closer-to-bare-metal/)). Voici un exemple pour créer un [Dataset](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset), où la méthode `toDS` est "ajoutée" en premier dans un [Seq](http://www.scala-lang.org/api/current/#scala.collection.Seq) à l'aide d'une conversion implicite (en particulier [SQLImplicits.localSeqToDatasetHolder](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLImplicits) qui est utilisable dans le scope grâce à l'import `import sqlc.implicits._`), et ensuite `toDS` utilise `Encoders` en interne.

In [85]:
(0 to 10).toDS()

[value: int]

# Conclusions
I appreciate the effort you put into studying this notebook. I hope you enjoyed it as much as I enjoyed writing it. Please post issues on how I can improve it to the [GitHub repo](https://github.com/deanwampler/JustEnoughScalaForSpark). 

Now you know the core elements of Scala that you need for using the Spark Scala API. I hope you can appreciate the power and elegance of Scala. I hope you will choose to use it for all of your data engineering tasks, not just for Spark. 

What about data science? There are many people who use Scala for data science in Spark, but today Python and R have much richer libraries for Mathematics and Machine Learning. That will change over time, but for now, you'll need to decide which language best fits your needs.

As you use Scala, there will be more things you'll want to understand that we haven't covered, including common idioms, conventions, and tools used in the Scala community. The references at the beginning of the notebook will give you the information you need.

Best wishes.

[Dean Wampler, Ph.D.](mailto:deanwampler@gmail.com)<br/>
[@deanwampler](http://twitter.com/deanwampler)

## Appendix: Exercise Solutions
<a name="ExerciseSolutions"></a>
Let's discuss the solutions to exercises that weren't already solved earlier in the notebook.

### Filter for Plays that Have "of" in the Name
You can add the condition (comment `// <== here`) immediate after defining `play`. You could do it later, after either of the subsequent two expressions, but then you're doing needless computation. Change `true` to `false` to print plays that don't contain "of".

In [86]:
val list2 = for {
    play <- plays 
    if (play.contains("of") == true)                            // <== here
    playFileString = targetDirName + pathSeparator + play
    playFile = new File(playFileString)
} yield {
    val successString = if (playFile.exists) "Success!" else "NOT FOUND!!"
    "%-40s\t%s".format(playFileString, successString)
}
list2.foreach(println)

/home/jovyan/work/data/shakespeare/tamingoftheshrew	Success!
/home/jovyan/work/data/shakespeare/comedyoferrors	Success!
/home/jovyan/work/data/shakespeare/merrywivesofwindsor	Success!


list2 = List(/home/jovyan/work/data/shakespeare/tamingoftheshrew	Success!, /home/jovyan/work/data/shakespeare/comedyoferrors	Success!, /home/jovyan/work/data/shakespeare/merrywivesofwindsor	Success!)


List(/home/jovyan/work/data/shakespeare/tamingoftheshrew	Success!, /home/jovyan/work/data/shakespeare/comedyoferrors	Success!, /home/jovyan/work/data/shakespeare/merrywivesofwindsor	Success!)

### More Specific "Love" and "Hate" Words
One reasonable choice to prevent seeing `glove`, `whatever`, etc. is to only find words that start with `love` and `have`. Let's also keep `unlove`:

In [87]:
val topLocationsLoveHate = sqlContext.sql("""
    SELECT word,  total_count, locations[0] AS top_location, counts[0] AS top_count
    FROM inverted_index 
    WHERE word LIKE 'love%' OR word LIKE 'unlove%' OR word LIKE 'hate%'
""")
topLocationsLoveHate.show(40)

+-------+-----------+--------------------+---------+
|   word|total_count|        top_location|top_count|
+-------+-----------+--------------------+---------+
|   hate|         22|midsummersnightsd...|        9|
|  hated|          6|midsummersnightsd...|        4|
|hateful|          5|midsummersnightsd...|        3|
|  hates|          5|         asyoulikeit|        2|
| hateth|          1|midsummersnightsd...|        1|
|   love|        662|    loveslabourslost|      121|
|  loved|         38|         asyoulikeit|       13|
| lovely|         15|midsummersnightsd...|        7|
|  lover|         33|         asyoulikeit|       14|
| lovers|         31|midsummersnightsd...|       17|
|  loves|         51| muchadoaboutnothing|       10|
| lovest|          8|    tamingoftheshrew|        3|
| loveth|          2|    loveslabourslost|        1|
|unloved|          1|midsummersnightsd...|        1|
+-------+-----------+--------------------+---------+



topLocationsLoveHate = [word: string, total_count: int ... 2 more fields]


[word: string, total_count: int ... 2 more fields]

### Return the Top Two Locations and Counts
We used the `DataFrame` API to write a SQL query that returned the top location and count. Adding the next one is straightforward. What do you observe is returned when there isn't a second location and count?

In [88]:
val topTwoLocations = sqlContext.sql("""
    SELECT word, total_count, 
        locations[0] AS first_location,  counts[0] AS first_count,
        locations[1] AS second_location, counts[1] AS second_count
    FROM inverted_index 
    WHERE word LIKE '%love%' OR word LIKE '%hate%'
""")

topTwoLocations = [word: string, total_count: int ... 4 more fields]


[word: string, total_count: int ... 4 more fields]

In [89]:
topTwoLocations.show(100)

+--------+-----------+--------------------+-----------+--------------------+------------+
|    word|total_count|      first_location|first_count|     second_location|second_count|
+--------+-----------+--------------------+-----------+--------------------+------------+
| beloved|         11|    tamingoftheshrew|          4|         asyoulikeit|           3|
|  cloven|          1|    loveslabourslost|          1|                null|        null|
|  cloves|          1|    loveslabourslost|          1|                null|        null|
|   glove|          3|    loveslabourslost|          2|        twelfthnight|           1|
|  glover|          1| merrywivesofwindsor|          1|                null|        null|
|  gloves|          5| merrywivesofwindsor|          3|         asyoulikeit|           1|
|    hate|         22|midsummersnightsd...|          9|         asyoulikeit|           6|
|   hated|          6|midsummersnightsd...|          4|         asyoulikeit|           2|
| hateful|

### Removing Stop Words
Recall you were asked to implement a `keep(word: String):Boolean` method that filters stop words.

First, let's implement `keep`. You can find lists of stop words on the web. One such list for English can be found [here]( * From http://norm.al/2009/04/14/list-of-english-stop-words/). It includes many words that you might not consider stop words. Nevertheless, I'll just use a smaller list here.

Note that I'll use a Scala [Set](http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Set) to hold the stop words. We want _O(1)_ look-up performance. We just want to know if the word is in the set or not.

I'll also add "", so I can remove the explicit test for it.

Finally, we'll embed the whole thing in a new Scala `object`. This extra encapsulation is a way to work around occasional problems with "task not serializable" errors.

**WARNING**: The definition in the next cell may trigger a `Task not serializable` error in the cell that follows, where it is used. Of so, this is "quirk" of the Scala interpreter running with the notebook environment. This code should work without issues in Spark applications that you write, i.e., that you compile into applications with `scalac`.

In [90]:
object IIStopWords {
    val stopWords = Set("", "a", "an", "and", "I", "he", "she", "it", "the")

    /**
     * If the set contains the word, we return false - we don't want to keep it!
     * Note we assume the word has already been converted to lower case!
     */
    def keep(word: String): Boolean = stopWords.contains(word) == false  
    
    def compute(sc: org.apache.spark.SparkContext, input: String) = {
        sc.wholeTextFiles(input).
        flatMap {
            case (location, contents) => 
                val words = contents.split("""\W+""").
                    map(word => word.toLowerCase).  // Do this early, before keep()
                    filter(word => keep(word))      // <== filter here
                val fileName = location.split(java.io.File.separator).last
                words.map(word => ((word, fileName), 1))
        }.
        reduceByKey((count1, count2) => count1 + count2).
        map { 
            case ((word, fileName), count) => (word, (fileName, count)) 
        }.
        groupByKey.
        sortByKey(ascending = true).
        map { 
            case (word, iterable) => 
                val vect = iterable.toVector.sortBy { 
                    case (fileName, count) => (-count, fileName) 
                }
                val (locations, counts) = vect.unzip  
                val totalCount = counts.reduceLeft((n1,n2) => n1+n2)        
                (word, totalCount, locations, counts)
        }
    }
}

defined object IIStopWords


In [92]:
val iiStopWords = IIStopWords.compute(sc, "/home/jovyan/work/data/shakespeare")

lastException = null


Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace:   at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
  at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:380)
  at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:379)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.flatMap(RDD.scala:379)
  at IIStopWords$.compute(<console>:80)
  ... 52 elided
Caused by: java.io.NotSerializableException: IIStopWords$
Serialization stack:
	- object not serializable (class: IIStopWords$, value: IIStopWords$@a4424d)
	- field (class: IIStopWords$$anonfun

In [93]:
iiStopWords.take(100).foreach(println)

Name: Unknown Error
Message: lastException: Throwable = null
<console>:58: error: not found: value iiStopWords
       iiStopWords.take(100).foreach(println)
       ^

StackTrace: 

One last thing, we now have `filter(word => keep(word))`, but note how we used `println` in the previous cell to see results. We can do something similar with `filter` and instead write `filter(keep)`. 

What does this mean exactly? It tells the compiler "convert the _method_ `keep` to a _function_ and pass that to `filter`." This works because `keep` already does what `filter` wants, take a single string argument and return a boolean result.

Passing `keep` is actually different than passing `word => keep(word)`, which is an _anonymous_ function that _calls_ keep. We are using `keep` as the function itself, rather than constructing a function that uses `keep`.