# 0) vérifier [sc] et [spark]

In [1]:
sc

<pyspark.context.SparkContext at 0x100706590>

In [2]:
# le spark context détermine les ressources 
# disponibles pour l'application
assert sc==sc

In [3]:
# la variable spark permet de gérer les
# dataframes (dépend de pyspark.sql)
assert spark==spark

In [4]:
spark

<pyspark.sql.session.SparkSession at 0x107082410>

In [210]:
data_dir = "/Users/romain/Informatique/notebooks/spark_scala/velib/logs/"

# 1) loader un fichier en mémoire

In [213]:
path  = "hdfs://spark52:8020/cycliste_debug_2.csv"
infos_cyclistes = os.path.join(data_dir,"cycliste_cyclistes.csv")
infos_trajets = os.path.join(data_dir,"cycliste_debug.csv")

In [214]:
cyclistes = sc.textFile(infos_cyclistes)
trajets   = sc.textFile(infos_trajets)

In [215]:
def head_and_count(rdd, name="", head=2):
    print ("="*10,name)
    print ("count = %s"%rdd.count())
    print ("head for %s ligne : "%(head))
    print ("-"*3)
    for indice, ligne in enumerate(rdd.take(head)):
        print ("%03i - %s"%(indice, ligne))
    print ("-"*3)
    print ("="*10)


In [216]:
head_and_count(trajets,"trajets")

count = 582974
head for 2 ligne : 
---
000 - cycliste,n_message,time,message
001 - cycliste_azey2,0442, 1486038900.39,velo rendu sur la station azgb6 ( a la maison = False) (elem de station_travail)
---


In [56]:
head_and_count(cyclistes,"cyclistes")

count = 3301
head for 2 ligne : 
---
000 - cycliste,sportif,age,sexe,nb_km,vitesse,attente
001 - cycliste_azthv,0,34,femme,9.86503821812,15.4581576429,0.5
---


# 2) Création d'une pairedRDD sur les cyclistes

## 2.1) on split sur les virgules

In [169]:
splitted_cyclistes =  cyclistes.map(lambda x: x.split(","))
head_and_count(splitted_cyclistes, "splitted_cyclistes")

count = 3301
head for 2 ligne : 
---
000 - [u'cycliste', u'sportif', u'age', u'sexe', u'nb_km', u'vitesse', u'attente']
001 - [u'cycliste_azthv', u'0', u'34', u'femme', u'9.86503821812', u'15.4581576429', u'0.5']
---


## 2.2) on récupère le header et l'enlève

In [170]:
head = splitted_cyclistes.take(1)[0]
splitted_cyclistes = splitted_cyclistes.filter(lambda x: x!=head)

## 2.3) on crée un tableau d'info par cycliste

In [185]:
infos_par_cyclistes = splitted_cyclistes.map(lambda x: (x[0], zip(head[1:], x[1:])))
infos_par_cyclistes = splitted_cyclistes.map(lambda x: (x[0], x[1:]))
head_and_count(infos_par_cyclistes)

count = 3301
head for 2 ligne : 
---
000 - (u'cycliste', [u'sportif', u'age', u'sexe', u'nb_km', u'vitesse', u'attente'])
001 - (u'cycliste_azthv', [u'0', u'34', u'femme', u'9.86503821812', u'15.4581576429', u'0.5'])
---


# 3) On peut créer des fonctions

In [186]:
def get_first_line(rdd):
    return rdd.take(1)[0]

def remove_first_line(rdd):
    head         = get_first_line(rdd)
    headless_rdd = rdd.filter(lambda x: x!= head)
    return headless_rdd    

def split(rdd, sep=","):
    return rdd.map(lambda ligne : ligne.split(sep))

def create_paired_rdd_on_first_column(array_rdd):
    return array_rdd.map(lambda x: (x[0], x[1:]))

In [194]:
head             = get_first_line(trajets).split(",")
headless_trajets = remove_first_line(trajets)
array_trajets    = split(headless_trajets)
pairedTrajets    = create_paired_rdd_on_first_column(array_trajets)

# 4) Joindre 2 paired RDD

In [217]:
jointure = pairedTrajets.join(infos_par_cyclistes)

In [218]:
jointure.take(2)

[(u'cycliste_azfk3',
  ([u'0009',
    u' 1486391580.93',
    u'velo pris sur station azuxn (elem de station travail)'],
   [u'4', u'42', u'femme', u'12.0849164506', u'18.1675557286', u'0.5'])),
 (u'cycliste_azfk3',
  ([u'0008',
    u' 1486391567.04',
    u'velo rendu sur la station azuxn ( a la maison = False) (elem de station_travail)'],
   [u'4', u'42', u'femme', u'12.0849164506', u'18.1675557286', u'0.5']))]

## 4.1) regarder les valeurs par clef

In [198]:
for indice, k_v in enumerate(valeurs_par_clefs.take(1)):
    print ("%03i - %s" % (indice, k_v[0]))
    for infos in k_v[1]:
        print (infos)

000 - cycliste_aze70
([u'0009', u' 1486391577.48', u'False a pris le velo velo_aeufv_1486391531.03 sur la station azuf0'], [u'-0.5', u'69', u'homme', u'15.7032986473', u'10.4313587952', u'0.5'])
([u'0008', u' 1486391577.48', u'velo pris sur station aep80 (elem de station travail)'], [u'-0.5', u'69', u'homme', u'15.7032986473', u'10.4313587952', u'0.5'])
([u'0007', u' 1486391564.72', u'velo rendu sur la station azsf3 ( a la maison = False) (elem de station_travail)'], [u'-0.5', u'69', u'homme', u'15.7032986473', u'10.4313587952', u'0.5'])
([u'0006', u' 1486391564.71', u'self.a_la_maison = False'], [u'-0.5', u'69', u'homme', u'15.7032986473', u'10.4313587952', u'0.5'])
([u'0005', u' 1486391555.19', u'True a pris le velo velo_aerkc_1486391529.5 sur la station azrmx'], [u'-0.5', u'69', u'homme', u'15.7032986473', u'10.4313587952', u'0.5'])
([u'0004', u' 1486391555.19', u'velo pris sur station azu12 (elem de station maison)'], [u'-0.5', u'69', u'homme', u'15.7032986473', u'10.4313587952', u

In [219]:
jointure = jointure.partitionBy(10)
jointure = jointure.cache()
jointure.count()

620334

In [220]:
jointure = jointure.partitionBy(20)
jointure = jointure.cache()
jointure.count()

620334

# 5) Creating a Dataframe from an RDD

In [224]:
df = spark.createDataFrame(cyclistes)

TypeError: Can not infer schema for type: <type 'unicode'>

In [222]:
df = spark.createDataFrame(splitted_cyclistes)

In [233]:
df.take(2)

[Row(_1=u'cycliste', _2=u'sportif', _3=u'age', _4=u'sexe', _5=u'nb_km', _6=u'vitesse', _7=u'attente'),
 Row(_1=u'cycliste_azthv', _2=u'0', _3=u'34', _4=u'femme', _5=u'9.86503821812', _6=u'15.4581576429', _7=u'0.5')]

## 5.1) préparer le schéma

In [260]:
from collections import OrderedDict
from pyspark.sql.types import *

champs = OrderedDict([("cycliste"           , StringType() ) ,
                      ("sportif"            , FloatType()  ) ,
                      ("age"                , IntegerType() ) ,
                      ("sexe"               , StringType()  ) ,
                      ("nb_km"              , FloatType()  ) ,
                      ("vitesse"            , FloatType()  ) ,
                      ("attente"            , FloatType() ) ])

fields = [StructField(champ, _type, nullable = True) for champ, _type in champs.iteritems()]
schema = StructType(fields)
for i in schema:
    print i

In [264]:
sans_header = remove_first_line(splitted_cyclistes)
sans_header.take(1)

[[u'cycliste_azthv',
  u'0',
  u'34',
  u'femme',
  u'9.86503821812',
  u'15.4581576429',
  u'0.5']]

## 5.2 ) caster les strings en valeurs numériques python

In [265]:
from pyspark.sql import Row

In [266]:
rdd    = sans_header.map(lambda p: Row( cycliste        =       p[0], 
                                        sportif         = float(p[1]), 
                                        age             = int  (p[2]), 
                                        sexe            =       p[3] , 
                                        nb_km           = float(p[4]), 
                                        vitesse         = float(p[5]), 
                                        attente         = float(p[6])))

## 5.3) lier les numériques python à une dataframe spark

In [267]:
df = spark.createDataFrame(rdd, schema)

In [268]:
df.take(3)

[Row(cycliste=u'cycliste_azthv', sportif=0.0, age=34, sexe=u'femme', nb_km=9.86503791809082, vitesse=15.458157539367676, attente=0.5),
 Row(cycliste=u'cycliste_azqs2', sportif=4.0, age=63, sexe=u'homme', nb_km=4.089626789093018, vitesse=24.407756805419922, attente=0.5),
 Row(cycliste=u'cycliste_azyuo', sportif=0.0, age=55, sexe=u'femme', nb_km=11.663058280944824, vitesse=7.8741774559021, attente=0.5)]

In [None]:
df.

In [231]:
champs = OrderedDict([("station_depart"     , StringType() ) ,
                      ("cycliste"           , StringType() ) ,
                      ("velo"               , StringType() ) ,
                      ("heure_de_depart"    , FloatType()  ) ,
                      ("heure_de_fin"       , FloatType()  ) ,
                      ("duree"              , FloatType()  ) ,
                      ("distance"           , FloatType()  ) ,
                      ("station_arrivee"    , StringType() ) ])

In [None]:
fields = [StructField(champ, _type, nullable = True) for champ, _type in champs.iteritems()]
schema = StructType(fields)


In [226]:
df.take(2)

[Row(_1=u'cycliste', _2=u'sportif', _3=u'age', _4=u'sexe', _5=u'nb_km', _6=u'vitesse', _7=u'attente'),
 Row(_1=u'cycliste_azthv', _2=u'0', _3=u'34', _4=u'femme', _5=u'9.86503821812', _6=u'15.4581576429', _7=u'0.5')]