In [13]:
import pyspark        

In [14]:
"""
en pyspark il faut commencer par un sparksession.
Le point d'entrée pour programmer Spark avec l'API Dataset et DataFrame.
Une SparkSession peut être utilisée pour créer DataFrame, s'enregistrer en DataFrametant que tables, exécuter du SQL sur 
des tables, mettre en cache des tables et lire des fichiers parquet
"""
from pyspark.sql import SparkSession      

In [15]:
spark=SparkSession.builder.appName('DataFrame').getOrCreate()   

In [16]:
spark

In [17]:
#lire un dataset 
#inferSchema = définir les types de données        (rq: au bebut tout les données sont de types String par defaut)
df_pyspark=spark.read.option('header','true').csv('data.csv',inferSchema=True) 

In [18]:
#vérification des types de données des colonnes
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- salaire;: string (nullable = true)



In [19]:
df_pyspark.show()          #visualiser le dataset 

+----+----+----------+--------+
|Name| Age|Experience|salaire;|
+----+----+----------+--------+
| aaa|  11|         1|    100;|
| bbb|  22|         2|    200;|
| ccc|  33|         3|    300;|
| ddd|  44|         4|    400;|
| eee|  55|         5|    500;|
| fff|  66|         6|    600;|
| ggg|null|      null|    700;|
|null|  88|         8|    800;|
|null|  99|      null|    null|
+----+----+----------+--------+



In [20]:
#type de dataset : DataFrame c'est une structure de données qu'on applique sur elle différentes types d'opérations 
type (df_pyspark)  

pyspark.sql.dataframe.DataFrame

In [21]:
df_pyspark.head(9)   #les lignes de dataset

[Row(Name='aaa', Age=11, Experience=1, salaire;='100;'),
 Row(Name='bbb', Age=22, Experience=2, salaire;='200;'),
 Row(Name='ccc', Age=33, Experience=3, salaire;='300;'),
 Row(Name='ddd', Age=44, Experience=4, salaire;='400;'),
 Row(Name='eee', Age=55, Experience=5, salaire;='500;'),
 Row(Name='fff', Age=66, Experience=6, salaire;='600;'),
 Row(Name='ggg', Age=None, Experience=None, salaire;='700;'),
 Row(Name=None, Age=88, Experience=8, salaire;='800;'),
 Row(Name=None, Age=99, Experience=None, salaire;=None)]

In [22]:
#les colonnes de dataset
df_pyspark.columns

['Name', 'Age', 'Experience', 'salaire;']

In [23]:
#vérifier les types de données
df_pyspark.dtypes

[('Name', 'string'),
 ('Age', 'int'),
 ('Experience', 'int'),
 ('salaire;', 'string')]

In [24]:
#sélection de colonnes (Name et Experience )et indexation
select=df_pyspark.select(['Name','Experience'])
select.show()

+----+----------+
|Name|Experience|
+----+----------+
| aaa|         1|
| bbb|         2|
| ccc|         3|
| ddd|         4|
| eee|         5|
| fff|         6|
| ggg|      null|
|null|         8|
|null|      null|
+----+----------+



In [25]:
#ajout de colonnes en dataframe
ajout=df_pyspark.withColumn('Experience aprés 2ans',df_pyspark['Experience']+2)
ajout.show()

+----+----+----------+--------+---------------------+
|Name| Age|Experience|salaire;|Experience aprés 2ans|
+----+----+----------+--------+---------------------+
| aaa|  11|         1|    100;|                    3|
| bbb|  22|         2|    200;|                    4|
| ccc|  33|         3|    300;|                    5|
| ddd|  44|         4|    400;|                    6|
| eee|  55|         5|    500;|                    7|
| fff|  66|         6|    600;|                    8|
| ggg|null|      null|    700;|                 null|
|null|  88|         8|    800;|                   10|
|null|  99|      null|    null|                 null|
+----+----+----------+--------+---------------------+



In [26]:
#Supprimer de colonnes en dataframe
supp=df_pyspark.drop('Experience aprés 2ans')
supp.show()

+----+----+----------+--------+
|Name| Age|Experience|salaire;|
+----+----+----------+--------+
| aaa|  11|         1|    100;|
| bbb|  22|         2|    200;|
| ccc|  33|         3|    300;|
| ddd|  44|         4|    400;|
| eee|  55|         5|    500;|
| fff|  66|         6|    600;|
| ggg|null|      null|    700;|
|null|  88|         8|    800;|
|null|  99|      null|    null|
+----+----+----------+--------+



In [27]:
#renommer les colonnes
ren=df_pyspark.withColumnRenamed('Name','New Name')
ren.show()

+--------+----+----------+--------+
|New Name| Age|Experience|salaire;|
+--------+----+----------+--------+
|     aaa|  11|         1|    100;|
|     bbb|  22|         2|    200;|
|     ccc|  33|         3|    300;|
|     ddd|  44|         4|    400;|
|     eee|  55|         5|    500;|
|     fff|  66|         6|    600;|
|     ggg|null|      null|    700;|
|    null|  88|         8|    800;|
|    null|  99|      null|    null|
+--------+----+----------+--------+



In [28]:
#gestion des valeurs manquantes
#supprimer les lignes qui ont des valeurs null
sup_nul=df_pyspark.na.drop()
sup_nul.show()

+----+---+----------+--------+
|Name|Age|Experience|salaire;|
+----+---+----------+--------+
| aaa| 11|         1|    100;|
| bbb| 22|         2|    200;|
| ccc| 33|         3|    300;|
| ddd| 44|         4|    400;|
| eee| 55|         5|    500;|
| fff| 66|         6|    600;|
+----+---+----------+--------+



In [29]:
#how=all : supprimer les lignes dont toutes les valeurs sont nulls 
df_pyspark.na.drop(how="all").show()

+----+----+----------+--------+
|Name| Age|Experience|salaire;|
+----+----+----------+--------+
| aaa|  11|         1|    100;|
| bbb|  22|         2|    200;|
| ccc|  33|         3|    300;|
| ddd|  44|         4|    400;|
| eee|  55|         5|    500;|
| fff|  66|         6|    600;|
| ggg|null|      null|    700;|
|null|  88|         8|    800;|
|null|  99|      null|    null|
+----+----+----------+--------+



In [30]:
#how=any : supprimer les lignes qui contient au moins une valeur null
df_pyspark.na.drop(how="any").show()

+----+---+----------+--------+
|Name|Age|Experience|salaire;|
+----+---+----------+--------+
| aaa| 11|         1|    100;|
| bbb| 22|         2|    200;|
| ccc| 33|         3|    300;|
| ddd| 44|         4|    400;|
| eee| 55|         5|    500;|
| fff| 66|         6|    600;|
+----+---+----------+--------+



In [31]:
#threshold : supprimer les lignes qui ne contient pas au moins 3 valeurs non nulls
df_pyspark.na.drop(how="any",thresh=3).show()

+----+---+----------+--------+
|Name|Age|Experience|salaire;|
+----+---+----------+--------+
| aaa| 11|         1|    100;|
| bbb| 22|         2|    200;|
| ccc| 33|         3|    300;|
| ddd| 44|         4|    400;|
| eee| 55|         5|    500;|
| fff| 66|         6|    600;|
|null| 88|         8|    800;|
+----+---+----------+--------+



In [32]:
#Subset : supprimer les lignes qui contiennent des valeurs nulls dans une colonne donnée
df_pyspark.na.drop(how="any",subset=['Name']).show()

+----+----+----------+--------+
|Name| Age|Experience|salaire;|
+----+----+----------+--------+
| aaa|  11|         1|    100;|
| bbb|  22|         2|    200;|
| ccc|  33|         3|    300;|
| ddd|  44|         4|    400;|
| eee|  55|         5|    500;|
| fff|  66|         6|    600;|
| ggg|null|      null|    700;|
+----+----+----------+--------+



In [33]:
#fill : remplir les valeurs manquantes
df_pyspark.na.fill('valeur manquante').show()

+----------------+----+----------+----------------+
|            Name| Age|Experience|        salaire;|
+----------------+----+----------+----------------+
|             aaa|  11|         1|            100;|
|             bbb|  22|         2|            200;|
|             ccc|  33|         3|            300;|
|             ddd|  44|         4|            400;|
|             eee|  55|         5|            500;|
|             fff|  66|         6|            600;|
|             ggg|null|      null|            700;|
|valeur manquante|  88|         8|            800;|
|valeur manquante|  99|      null|valeur manquante|
+----------------+----+----------+----------------+



In [34]:
#fill : remplir les valeurs manquantes de colonne (Name)
df_pyspark.na.fill('valeur manquante','Name').show()

+----------------+----+----------+--------+
|            Name| Age|Experience|salaire;|
+----------------+----+----------+--------+
|             aaa|  11|         1|    100;|
|             bbb|  22|         2|    200;|
|             ccc|  33|         3|    300;|
|             ddd|  44|         4|    400;|
|             eee|  55|         5|    500;|
|             fff|  66|         6|    600;|
|             ggg|null|      null|    700;|
|valeur manquante|  88|         8|    800;|
|valeur manquante|  99|      null|    null|
+----------------+----+----------+--------+



In [35]:
#filtrage des données 
#les personnes dont leurs ages inférieur ou egal à 44 
df_pyspark.filter("Age<=44").show()

+----+---+----------+--------+
|Name|Age|Experience|salaire;|
+----+---+----------+--------+
| aaa| 11|         1|    100;|
| bbb| 22|         2|    200;|
| ccc| 33|         3|    300;|
| ddd| 44|         4|    400;|
+----+---+----------+--------+



In [36]:
#les personnes dont leurs ages supérieur à 44            (la négation (~))
df_pyspark.filter(~(df_pyspark['Age']<=44)).show()

+----+---+----------+--------+
|Name|Age|Experience|salaire;|
+----+---+----------+--------+
| eee| 55|         5|    500;|
| fff| 66|         6|    600;|
|null| 88|         8|    800;|
|null| 99|      null|    null|
+----+---+----------+--------+



In [37]:
#selection les noms et l'expérience des personnes agé moins de 44 ans 
df_pyspark.filter("Age<=44").select('Name','Experience').show()


+----+----------+
|Name|Experience|
+----+----------+
| aaa|         1|
| bbb|         2|
| ccc|         3|
| ddd|         4|
+----+----------+



In [38]:
#selection les personnes qui ont une expérience entre 2 et 4 ans (multiple condition)
df_pyspark.filter((df_pyspark['Experience']>=2) &
                 (df_pyspark['Experience']<=4)).show()

+----+---+----------+--------+
|Name|Age|Experience|salaire;|
+----+---+----------+--------+
| bbb| 22|         2|    200;|
| ccc| 33|         3|    300;|
| ddd| 44|         4|    400;|
+----+---+----------+--------+

