In [None]:
%%capture

!pip install -q pyspark findspark

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
!git clone https://github.com/javieraespinosa/big-data-analytics-practicals
!mv big-data-analytics-practicals/data/* .
!ls *.csv

In [None]:
sc = spark.sparkContext
sc

# Introduction à SPARK

## 1 Présentation

SPARK est un framework qui peut fonctionner de manière autonome ou être intégré à un cluster hadoop.
Ce qui est intéressant avec SPARK, le développeur peut se concentrer sur son application métier sans se soucier des problématiques lié aux traitements distribués.

Vous pouvez voir  SPARK comme une machine virtuelle composée de nombreuses ressources informatiques (CPU et RAM) qui exécute votre code.

Pour rendre transparente la logique cachée derrière l'informatique distribuée, SPARK utilise le concept de Resilient Distributed Dataset (RDD).
Un RDD est une API qui vous permet d'interagir avec vos données de manière unifiée et fournit de nombreuses fonctions pour faciliter la programmation.

Nous pouvons diviser l'API RDD en deux parties:
* Transformation:
Comme son nom l'indique, une transformation est un moyen de transformer vos données en quelque chose d'autre. Par exemple, si vous devez filtrer des données, nous transformons la source de données en filtrant les données, ce qui donnera un nouveau RDD.
Notez qu'une source de données (RDD) est immuable, vous ne pouvez donc pas en modifier le contenu. Pour modifier un RDD, l'astuce est d'appliquer vos modifications via des transformations qui seront contenues dans un nouveau RDD que vous pourrez sauvegarder.
La liste des transformations possibles est disponible sur le site d'apache Spark :
https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

* Action :
SPARK est fénéant/lazyness, cela signifie que SPARK ne fait rien tant que le programme n’exécute pas une  action. Par exemple, la fonction count()  est une action qui comptabilie le nombre de n-uplets dans le RDD.
Vous trouverez l'ensemble des actions possibles sur le site de Spark :
https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

Ce laboratoire vous présentera comment développer une application SPARK avec Python.
La librairie python pour Spark est détaillée ici :
https://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD

Bien entendu, vous pouvez consulter le site Web de SPARK pour obtenir plus de détails: https://spark.apache.org/docs/latest/index.html


Alors, commençons maintenant quelques exemples pour devenir familier avec SPARK

Dans ce TP, nous explorerons les transformations et actions courantes fournies par Spark:

## 2 - Débuter avec Spark

Pour faciliter la prise en main de Spark, nous allons travailler avec de petits jeux de données.
Nous utiliserons des RDD contenant de 1 à 5 valeurs numériques mais il faut considérer que chaque valeur peut être une ligne d'un fichier.

### Collect / take / first / last

Renvoye tous / certains / le premier  éléments du RDD au driver dans une simple liste.

In [None]:
nums= sc.parallelize([1,2,3,4,5])
print(type(nums))

print( 'Collecte tous les éléments', nums.collect())
print( 'Collecte une partie des éléments', nums.take(3))
print( 'Collecte le 1er élément', nums.first())

Pair RDDs contain elements that are key-value pairs.  Keys and values can be any type.

# Les Transformations

## MAP

La méthode map prend une fonction en entrée et l'applique à chaque élément du RDD source pour créer un nouveau RDD.
La fonction d'entrée doit prendre un seul paramètre d'entrée et renvoyer une valeur.

In [None]:
nums= sc.parallelize([1,2,3])

print("Type de nums : ", type(nums))
pluOneRDD = nums.map(lambda x : x+1)
print("plusOneRDD is new RDD")
print("Type de pluOneRDD : ", type(pluOneRDD))
pluOneRDD.collect()

Une autre manière d'écrire ce programme :

In [None]:
nums= sc.parallelize([1,2,3])

def plus1(x):
    if( x > 2 ):
        s = x*x
    else:
        s = 1
    return s

# on  appelle la fonction plus1 directement dans la phase map
pluOneRDD = nums.map(lambda x : plus1(x))

print("map with lambda function")
print(pluOneRDD.collect())

## Exercice 1

Tranformer la collection suivante [“John”, “Fred”, “Anna”, “James”] afin de retourner le nombre de caractères de chaque élément :

In [None]:
x = sc.parallelize(["John", "Fred", "Anna", "James"])
# Complétez le code


## FILTER

Le filtre prend une fonction booléenne en entrée et l'applique à chaque élément du RDD source pour créer un nouveau RDD.
Une fonction booléenne prend une entrée et renvoie true ou false.
La méthode de filtrage renvoie un nouveau RDD formé en sélectionnant uniquement les éléments pour lesquels la fonction booléenne en entrée a renvoyé la valeur true.
Ainsi, le nouveau RDD contient un sous-ensemble des éléments du RDD d'origine.

In [None]:
nums= sc.parallelize([1,2,3])
myFilteredRDD = nums.filter(lambda x : x > 1)
myFilteredRDD.collect()

## Exercise 2:

Changez la collection pour ne conserver que les nombres pairs et calculez le carré:

In [None]:
nums= sc.parallelize([1,2,3])
# Complétez le code


## FLATMAP

FlatMap prend une fonction d'entrée, qui renvoie une séquence pour chaque élément d'entrée qui lui est transmis.
La méthode flatMap renvoie un nouveau RDD formé en aplatissant cette collection de séquences.

In [None]:
nums= sc.parallelize([1,2,3])
mapRDD = nums.map(lambda x : (x,x*x,x*x*x))
flatMapRDD = mapRDD.flatMap(lambda x : x )
print("Display MapRDD :")
print(mapRDD.collect())

print("\nDisplay flatMapRDD :")
print(flatMapRDD.collect())

## Exercice

Comptez le nombre de mots contenu dans l'ensemble des lignes faisant référence au "spam":


In [None]:
# Indice: vous utiliserez la fonction flatMap
data = sc.parallelize(["Spark c'est bien","il peut nous aider à trouver les spams","spam viagra.com"])


## UNION

Union prend un RDD en entrée et retourne un nouveau RDD contenant l'union des éléments dans le RDD source et le RDD qui lui est transmis en tant qu'entrée.

In [None]:
x = sc.parallelize([[1,2],3])
y = sc.parallelize([1,2,3])
z = x.union(y)
z.collect()

## INTERSECTION

La méthode intersection prend un RDD en entrée et retourne un nouveau RDD contenant l'intersection des éléments du RDD source et du RDD qui lui est transmis en tant qu'entrée.

In [None]:
x = sc.parallelize([1,2,3])
y = sc.parallelize([4,6,3])
z = x.intersection(y)
z.collect()

## La transformation subtract

La méthode subtract prend un RDD en entrée et retourne un nouveau RDD contenant des éléments dans le RDD source mais pas dans le RDD en entrée.

In [None]:
x = sc.parallelize([1,2,3])
y = sc.parallelize([4,6,3])
z = x.subtract(y)
z.collect()

## La transformation Distinct

La méthode distincte d'un RDD renvoie un nouveau RDD contenant les éléments distincts dans le RDD source.

In [None]:
x = sc.parallelize([1,2,3,4,5,3,2,4])
y = x.distinct()
y.collect()

## La transformation Join


La méthode de "join" prend un RDD de paires clé-valeur en entrée et effectue une jointure interne sur les RDD source et en entrée.

In [None]:
x = sc.parallelize([("TWTR", "Twitter"), ("GOOG", "Google"), ("AAPL", "Apple")])
y = sc.parallelize([("TWTR", 36), ("GOOG", 532), ("AAPL", 127)])
z = x.join(y).map(lambda x : x[1])
z.collect()

## La transformation KeyBy

Créez un RDD Pair,  formant une paire pour chaque article du RDD original.
La clé de la paire est calculée à partir de la valeur via une fonction fournie par l'utilisateur.

In [None]:
x = sc.parallelize(range(0,5)).keyBy(lambda x: x*x)
x.collect()

## Exercice 4


Créez un RDD à partir de cette liste, puis utilisez .keyBy pour créer une paire RDD avec:
['New York, NY', 'Philadelphia, PA', 'Denver, CO', 'San Francisco, CA'])

Résultat :
[('New York', ' NY'), ('Philadelphia', ' PA'), ('Denver', ' CO'), ('San Francisco', ' CA')]

In [None]:
y = sc.parallelize(['New York, NY', 'Philadelphia, PA', 'Denver, CO', 'San Francisco, CA'])

# Complétez le code


## Transformation leftOuterJoin / rightOuterJoin

Effectue une jointure externe gauche / droite à l'aide de deux RDD clé-valeur. Veuillez noter que les touches doivent être généralement comparables pour que cela fonctionne correctement.

In [None]:
x = sc.parallelize([("a", 1), ("b", 2), ("c", 2)])
y = sc.parallelize([("a", 3), ("a", 4), ("b", 5)] )
z = x.leftOuterJoin(y)
print(z.collect())

## Exercice 5


Créer une paire RDD renvoyant tous les mots par longueur pour les deux listes:

list1 = ["dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"]
list2 = [“dog”, “salmon”, “salmon”, “rat”, “elephant”]

Notez que le programme devra renvoyer au minimum tous les éléments de la list1.

In [None]:
list2 = sc.parallelize(['dog', 'salmon', 'salmon', 'rat', 'elephant'])
list1 = sc.parallelize(['dog','cat','gnu','salmon','rabbit','turkey','wolf','bear','bee'])

# Complétez le code
z = list1.keyBy(lambda x : len(x))
v = list2.keyBy(lambda x : len(x))
w = z.leftOuterJoin(v)
w.collect()

## La transformation PartitionBy

Renvoie un nouveau RDD avec le nombre spécifié de partitions, en plaçant les éléments d'origine dans la partition renvoyée par un partitionneur fourni par l'utilisateur.

In [None]:
rdd = sc.parallelize([('J', "James"), ('F', "Fred"), ('A', "Anna"), ('J', "John")])

rdd1 = rdd.partitionBy(2)
rdd2 = rdd.partitionBy(3)

print("2 partitions : ",rdd1.glom().collect())
print("3 partitions : ",rdd2.glom().collect())


Renvoye un nouveau fichier RDD contenant des paires dont la clé est l’élément du fichier original, et dont la valeur correspond à l’élément correspondant de cet élément (même partition, même index) dans un deuxième fichier RDD.

![title](http://i.imgur.com/5J0lg6g.png)



In [None]:
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))
x.zip(y).collect()
[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

# Les actions

Les actions calculent un résultat (par exemple, des données numériques ou créer une structure de données non RDD) ou produire un effet secondaire, tel que l'écriture d'une sortie sur un disque.
___________________

## L'action Reduce

Comptez le nombre de mots pour cette liste avec les fonctions map et reduce:

[‘Spark s est vraiement facile’,‘Qu en pensez vous ?’]

In [None]:
from operator import add
x = sc.parallelize(['Spark s est vraiement facile','Qu en pensez vous ?'])
y = x.map(lambda x : x.split(' ')).flatMap(lambda x : x).map(lambda x : 1)
y.reduce(add)

## Les actions : Count, Max, Min, Sum, Mean, Variance, Stdev, stats

In [None]:
x = sc.parallelize(range(10))

print("count",x.count())
print("max",x.max())
print("min",x.min())
print("sum",x.sum())
print("mean",x.mean())
print("variance",x.variance())
print("stdev",x.stdev())
print("stat", x.stats())

## L'action CountByKey

CountByKey : Retourne une liste des clés et compte leurs occurrences dans le RDD


In [None]:
x = sc.parallelize([('J', "James"), ('F', "Fred"), ('A', "Anna"), ('J', "John")])
y = x.countByKey()

print(y)


## ReduceByKey

Regroupe tous les éléments du RDD en appliquant une fonction utilisateur par paire aux éléments et aux résultats partiels et renvoyez un résultat au pilote.

In [None]:
from operator import add
z = sc.parallelize([(1,20),(1,30),(3,60),(3,20),(3,20)])

result = z.reduceByKey(add)
result.collect()


## Exercice 6

Comptez le nombre de mots pour cette liste avec la fonction map et reduce :

In [None]:
from operator import add
x = sc.parallelize(['Spark s est vraiement facile','Qu en pensez vous ?'])
# Complétez le code


Vous pouvez à présent fermer le SparkContext.

In [None]:
sc.stop()