# -1) install la librairie "findSpark"

In [1]:
!pip install findspark



# 0) configure 'findspark'

In [62]:
import findspark

In [63]:
spark_home = "/Users/romainjouin/Downloads/spark-3.1.2-bin-hadoop3.2"
findspark.init(spark_home=spark_home, )

# 1) configure spark

In [64]:
from pyspark         import SparkConf
from pyspark.context import SparkContext
from pyspark.sql     import SparkSession

## 1.1) création d'une configuration

In [65]:
master_url = "spark://romainjouin-macbookpro1.roam.corp.google.com:7077"
nb_cpu     = 4
App_name   = "Nom de l'application" 

[Propriétés de Spark](https://spark.apache.org/docs/latest/configuration.html#available-properties)

In [66]:
conf = SparkConf()
conf = conf.setMaster(master_url)
conf = conf.set("spark.cores.max", nb_cpu)
conf = conf.setAppName(App_name)


## 1.2) creation d'un spark context

In [67]:
try    : sc.stop()
except : pass

In [68]:
sc = SparkContext(conf=conf)

In [69]:
sc

# 2) lecture d'un fichier parquet file (slide 212)

In [77]:
spark_dir            = "./../Downloads/spark-3.1.2-bin-hadoop3.2"
path_to_parquet_file = f"{spark_dir}/examples/src/main/resources/users.parquet"
path_to_readme       = f"{spark_dir}/README.md"

# Coder une requête sql en map reduce
SELECT   mot, count(mot)  
FROM     fichier  
WHERE    len(mot)>3  
GROUP BY mot  
LIMIT    10

In [None]:
import re
from pyspark.sql import functions as F
from operator import add

In [109]:
fichier        = sc.textFile(path_to_readme)
mots           = fichier.flatMap(lambda ligne: re.split("\W+", ligne.lower().strip()))
mots_de_3_car  = mots.filter(lambda mot : len(mot)>3)
k_v            = mots_de_3_car.map(lambda mot: (mot, 1))
denombrement   = k_v.reduceByKey(add)
dix_examples   = denombrement.take(10)
dix_examples

[('unified', 1),
 ('analytics', 1),
 ('engine', 2),
 ('large', 1),
 ('provides', 1),
 ('high', 1),
 ('scala', 4),
 ('java', 1),
 ('python', 4),
 ('optimized', 1)]

# 3) création d'une spark session 

In [111]:
spark_Session = SparkSession.builder.getOrCreate()

In [114]:
spark_Session

## 3.1) utiliser la sparkSession pour lire des CSV avec headers

In [115]:
reader = spark_Session.read.option("header", "true")
type(reader)

pyspark.sql.readwriter.DataFrameReader

In [119]:
path_to_csv = "./2021_esilv_spark/cycliste_debug.csv"

In [120]:
df = reader.csv(path_to_csv)

In [122]:
df.head()

Row(cycliste='cycliste_azey2', n_message='0442', time=' 1486038900.39', message='velo rendu sur la station azgb6 ( a la maison = False) (elem de station_travail)')

## 3.2) possible de convertir la DataFrame en RDD 

Les dataframes sont bassées sur des RDD
En spark 1 les RDD étaient les objets de bases, maintenant on promeut plutôt les df => qui ont un schéma est sont plus optimisées

In [123]:
rdd = df.rdd

In [125]:
rdd.take(3)

[Row(cycliste='cycliste_azey2', n_message='0442', time=' 1486038900.39', message='velo rendu sur la station azgb6 ( a la maison = False) (elem de station_travail)'),
 Row(cycliste='cycliste_azey2', n_message='0441', time=' 1486038900.39', message='impossible de rendre sur '),
 Row(cycliste='cycliste_azey2', n_message='0440', time=' 1486038900.38', message='self.a_la_maison = False')]

# 4) Sql Context

In [128]:
from pyspark import SQLContext

## 4.1) création du sql context

In [130]:
sqlContext = SQLContext(sc)

## 4.2) lecture d'un fichier => création dataframe

In [134]:
df = sqlContext.read.load(path_to_parquet_file)

In [135]:
df.head(5)

[Row(name='Alyssa', favorite_color=None, favorite_numbers=[3, 9, 15, 20]),
 Row(name='Ben', favorite_color='red', favorite_numbers=[])]

## 4.3) enregistrement d'une table temporaire en mémoire acceptant ensuite des requêtes SQL

In [137]:
nom_de_la_table = "XY"
sqlContext.registerDataFrameAsTable(df, nom_de_la_table)

In [143]:
requete_sql = f"""
                    Select name, count(*)
                    FROM {nom_de_la_table}
                    group by name
"""
sql_result = sqlContext.sql(requete_sql)
sql_result.collect()


[Row(name='Ben', count(1)=1), Row(name='Alyssa', count(1)=1)]

In [18]:

path_to_parquet_file = "./../Downloads/spark-3.1.2-bin-hadoop3.2/examples/src/main/resources/people.csv"

In [22]:
with open(path_to_parquet_file) as f:
    for i in range(100):
        print(f.readline(), end="")

name;age;job
Jorge;30;Developer
Bob;32;Developer


In [38]:
reader = sqlcontext.read

In [40]:
reader = reader.option("delimiter", ";")

In [42]:
df = reader.load(path_to_parquet_file, format="csv", schema="nom STRING,age FLOAT,job STRING", )
df.head(5)

[Row(nom='name', age=None, job='job'),
 Row(nom='Jorge', age=30.0, job='Developer'),
 Row(nom='Bob', age=32.0, job='Developer')]