# 0) chargement de spark

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# 1) lire de la donnée

## 1.1) lecture brute

In [5]:
path  = "./data/Villes/ville_1.csv"
# lecture d'un fichier de manière la plus brute
df    = spark.read.load(path, format="csv")
df.dtypes

[('_c0', 'string'),
 ('_c1', 'string'),
 ('_c2', 'string'),
 ('_c3', 'string'),
 ('_c4', 'string'),
 ('_c5', 'string'),
 ('_c6', 'string'),
 ('_c7', 'string'),
 ('_c8', 'string'),
 ('_c9', 'string'),
 ('_c10', 'string'),
 ('_c11', 'string'),
 ('_c12', 'string')]

## 1.2) lecture avec les entêtes

In [None]:
# l'option 'header' permet de rajouter les noms des colonnes 


## 1.3) lecture avec les types détectés automatiquement

In [None]:
# l'option 'inferSchema' permet de transformer les colonnes en 
# types plus précis : entier  / booléens / chaines de caractères...
# bien sûr spark trouve les types uniquement si le fichier d'origine
# permet de les trouver de manière simple


## 1.4) lecture avec schéma

# 2) écriture de la dataframe sur le disque

## 2.1) choix du format : csv

## 2.2) choix du format : parquet

## 2.3) choix du format : json

In [None]:
!ls ./data/Villes/ville/

## 2.4) lecture de différents formats

In [4]:
spark.read.load("./../data/Villes/ville/", format="json")

AnalysisException: Path does not exist: file:/home/jovyan/data/Villes/ville;

In [None]:
spark.read.load("./../data/Cyclistes/", format="csv").count()

# 3) Calculer des résultats : les actions 

## 3.1) nombre de lignes : count

In [None]:
df = spark.read.load("./../data/Cyclistes/", format="csv", header=True, inferSchema="True")

## 3.2) moyenne : agg + colonne + mean

## 3.3) quantile approximatifs pour gagner du temps de calcul

In [None]:
import time

In [None]:
def calcul_quantile(df, erreur_acceptee):
    debut            = time.time()
    colonne          = "vitesse"
    quantiles_voulus = [0.25, 0.50, 0.75]
    resultat         =  df.approxQuantile(colonne, quantiles_voulus , erreur_acceptee )
    fin              = time.time()
    delais           = fin -debut
    print ("delais =%.2f sec, quantiles = %s"%(delais, resultat))

In [None]:
calcul_quantile(df, 0.05)

In [None]:
calcul_quantile(df, 0.01)

In [None]:
calcul_quantile(df, 0)

## Reload de la dataframe

In [None]:
villes =spark.read.load("./../data/Villes/", format="csv", header=True, inferSchema="True")
villes.printSchema()

## 3.4) corrélation

## 3.5) covariance

## 3.6) sample

## 3.7) filter 

# 4) Transformer la données : les transformations!

## Transformations : demandent à être suivi par un collect ou une action (count par exemple)

## 4.1) obtenir des statistiques sur les colonnes numériques

## 4.2) groupby

## 4.3) summary 

## 4.4) union de dataframe

#### ajouter les colonnes les unes à côté des autres : join

#### ajouter les lignes les unes sous les autres : union

## 4.5) filtre

## 4.6) concaténation de colonne : F.concat

In [None]:
from pyspark.sql.types import *
from pyspark.sql       import functions as F

In [None]:
path       = "./../data/Cyclistes/*.csv" 

# 5) udf 
Il est possible d'enregistrer des fonctions python que l'on écrit nous même pour les appliquer sur une colonne d'une dataframe, c'est ce qu'on appelle les udf, pour User Defined Functions

In [None]:
from pyspark.sql.types     import *
from pyspark.sql.functions import udf

@udf(returnType = FloatType())
def cube(colonne):
    return colonne*colonne

In [None]:
villes.select(cube("salaire")).collect()

# 6)	Etude de cas : analyse des fichiers de logs des cyclistes

In [None]:
from pyspark.sql.types import *
from pyspark.sql       import functions as F
from pyspark.sql.types     import *
from pyspark.sql.functions import udf

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

## 6.1)  Charger la donnée

In [None]:
path       = "./../data/Cyclistes/*.csv" 


## 6.2) vérifier le nombre de cycles

## 6.3) transformer les timestamp en date

In [None]:
from pyspark.sql.types import TimestampType

@udf(returnType = TimestampType())
def transform_timestamp_in_date(timestamp):
    """
    Divise un timestamp entier en sous partie 
    pour calculer sa signification en terme de date.
    
    Base la date retournée sur le 1er janvier 2018
    
    Return : datetime
    """
    from datetime import datetime
    # définition de constantes pour les mathématiques de date
    une_minute = 60
    une_heure  = 60 * une_minute
    un_jour    = 24 * une_heure
    un_mois    = 30 * un_jour
    un_an      = 12 * un_mois
    # défintion de la date de base :
    timestamp_de_base = 2018 * un_an + 1 * un_mois + 1 * un_jour 
    # ajout de la date de base au timestamp
    timestamp     = timestamp_de_base + timestamp
    # calculs des composants : an / mois / jours / heure / secondes
    nb_an     = timestamp    // (un_an) 
    timestamp = timestamp - nb_an * un_an
    nb_mois   = (timestamp ) //  un_mois % 12
    timestamp = timestamp -   nb_mois *un_mois
    nb_jour   = (timestamp ) //   un_jour % 30
    timestamp = timestamp -   nb_jour *un_jour
    nb_heure  = timestamp    //    une_heure % 24
    timestamp = timestamp -   nb_heure *une_heure
    nb_min    = timestamp    //    une_minute % 60
    timestamp = timestamp -   nb_min * une_minute
    nb_sec    = timestamp % 60 
    # formatage en chaine de caractère
    format_date = "{jour:02d}/{mois:02d}/{an} {heure:02d}:{minute:02d}:{secondes:02d}"
    date = format_date.format(jour  = nb_jour  , mois   = nb_mois , an       = nb_an , 
                              heure = nb_heure , minute = nb_min  , secondes = nb_sec )
    # conversion de la chaine de caractère en objet datetime
    return datetime.strptime(date, "%d/%m/%Y %H:%M:%S")

In [None]:
tous_les_cyclistes = tous_les_cyclistes.withColumn("date", transform_timestamp_in_date("timestamp"))

In [None]:
tous_les_cyclistes.printSchema()

## vérifier les comptes

## 6.4) trouver les dates min/max par état de "sur un vélo"

## 6.5) détecter les changements d'états "sur un vélo" ou pas : window et lag

In [None]:
from pyspark.sql.functions import udf

In [None]:
@udf(returnType = IntegerType())
def changement(etat_actuel, etat_precedent):
    """
    Détecte si les deux états sont différent.
    
    Parametres :
        etat_actuel : valeur sur la ligne courante
                      renvoyée par F.lag (0)
        etat_precedent : valeur sur la ligne précédente
                      renvoyée par F.lag(1)
    Return: 0 s'ils sont égaux, 1 s'il y a une différence
    """
    if etat_precedent == None:
        return 0
    if etat_precedent == etat_actuel:
        return 0
    if etat_actuel != etat_precedent:
        return 1

In [None]:
from pyspark.sql.window import Window
w = Window.orderBy(["id", "date"])

## 6.6) somme partielle par sous groupe : windows

In [None]:
if False:
    @udf(returnType = IntegerType())
    def somme(indice_actuel, indice_precedent):
        if indice_precedent == None:
            return 0
        return indice_actuel + indice_precedent

In [None]:
windowval = Window.orderBy(["id", "date"])
windowval = windowval.partitionBy("id")
windowval = windowval.rangeBetween(Window.unboundedPreceding, 0)

## 6.7) Calculer la durée du trajet

# 7) datavisualisation