# Use case with SPARK

## Prerequisite

Start your HDFS cluster :

Start your Spark cluster :

Check all the following containers are up :

smaster, sworker1, sworker2 


### Data download

This practice is based on a dataset from data.gouv.fr which you uploaded into hdfs while you studied HDFS.

If you don't, please run the following cell :


In [1]:
!$HADOOP_HOME/bin/hdfs dfs -mkdir /data/permanent/rawdata/trade/
!$HADOOP_HOME/bin/hdfs dfs -put /home/jovyan/data/2021-mutations-immobilieres.csv /data/permanent/rawdata/trade/

--2022-03-24 21:24:06--  https://files.data.gouv.fr/geo-dvf/latest/csv/2021/full.csv.gz
Resolving files.data.gouv.fr (files.data.gouv.fr)... 51.38.54.240
Connecting to files.data.gouv.fr (files.data.gouv.fr)|51.38.54.240|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 34657631 (33M) [application/octet-stream]
Saving to: ‘/dataspark/2021-mutations-immobilieres.csv.gz’


2022-03-24 21:24:07 (41.8 MB/s) - ‘/dataspark/2021-mutations-immobilieres.csv.gz’ saved [34657631/34657631]



The dataset represents all France real estate transfer in 2021.

Here is the description of each column : 

```
id_mutation : Identifiant de mutation (non stable, sert à grouper les lignes)
date_mutation : Date de la mutation au format ISO-8601 (YYYY-MM-DD)
numero_disposition : Numéro de disposition
nature_mutation : Nature de la mutation
valeur_fonciere : Valeur foncière (séparateur décimal = point)
adresse_numero : Numéro de l'adresse
adresse_suffixe : Suffixe du numéro de l'adresse (B, T, Q)
adresse_code_voie : Code FANTOIR de la voie (4 caractères)
adresse_nom_voie : Nom de la voie de l'adresse
code_postal : Code postal (5 caractères)
code_commune : Code commune INSEE (5 caractères)
nom_commune : Nom de la commune (accentué)
ancien_code_commune : Ancien code commune INSEE (si différent lors de la mutation)
ancien_nom_commune : Ancien nom de la commune (si différent lors de la mutation)
code_departement : Code département INSEE (2 ou 3 caractères)
id_parcelle : Identifiant de parcelle (14 caractères)
ancien_id_parcelle : Ancien identifiant de parcelle (si différent lors de la mutation)
numero_volume : Numéro de volume
lot_1_numero : Numéro du lot 1
lot_1_surface_carrez : Surface Carrez du lot 1
lot_2_numero : Numéro du lot 2
lot_2_surface_carrez : Surface Carrez du lot 2
lot_3_numero : Numéro du lot 3
lot_3_surface_carrez : Surface Carrez du lot 3
lot_4_numero : Numéro du lot 4
lot_4_surface_carrez : Surface Carrez du lot 4
lot_5_numero : Numéro du lot 5
lot_5_surface_carrez : Surface Carrez du lot 5
nombre_lots : Nombre de lots
code_type_local : Code de type de local
type_local : Libellé du type de local
surface_reelle_bati : Surface réelle du bâti
nombre_pieces_principales : Nombre de pièces principales
code_nature_culture : Code de nature de culture
nature_culture : Libellé de nature de culture
code_nature_culture_speciale : Code de nature de culture spéciale
nature_culture_speciale : Libellé de nature de culture spéciale
surface_terrain : Surface du terrain
longitude : Longitude du centre de la parcelle concernée (WGS-84)
latitude : Latitude du centre de la parcelle concernée (WGS-84)    
```

You are now ready to analyze data.

Note that the following documentation may be a great help to assist you in this homework.

https://sparkbyexamples.com/pyspark-tutorial/


## 1 - Dataframe

Open a connection to your Spark cluster :

In [2]:
# N'oubliez pas de fermer la connexion à la fin du TP
# spark.stop()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("spark://spark-master:7077").appName("TP03").getOrCreate()

Check your connection :

In [3]:
spark

<font color='red'>Q1 - Open and infer the dataset into the mutationsDF dataframe ?
    
Note that the dataset schema should be automatically included into the dataframe.  

</font>

In [6]:
import datetime

# ouverture du fichier csv en utilisant todayStr pour accéder au dossier du jour
# l'inférence du schéma s'appuie sur une échantillonage et est par conséquent plus lourd
# que l'ouverture simple du fichier 

pathFile = "/data/permanent/rawdata/trade/2021-mutations-immobilieres.csv"
mutationsDF = spark.read.load(pathFile, format="csv", sep=",", inferSchema="true", header="true")

<font color='red'>Q2 - Display the data schema of mutationsDF dataframe ?  
</font>

In [7]:
mutationsDF.printSchema()

root
 |-- id_mutation: string (nullable = true)
 |-- date_mutation: string (nullable = true)
 |-- numero_disposition: integer (nullable = true)
 |-- nature_mutation: string (nullable = true)
 |-- valeur_fonciere: double (nullable = true)
 |-- adresse_numero: integer (nullable = true)
 |-- adresse_suffixe: string (nullable = true)
 |-- adresse_nom_voie: string (nullable = true)
 |-- adresse_code_voie: string (nullable = true)
 |-- code_postal: integer (nullable = true)
 |-- code_commune: string (nullable = true)
 |-- nom_commune: string (nullable = true)
 |-- code_departement: string (nullable = true)
 |-- ancien_code_commune: integer (nullable = true)
 |-- ancien_nom_commune: string (nullable = true)
 |-- id_parcelle: string (nullable = true)
 |-- ancien_id_parcelle: string (nullable = true)
 |-- numero_volume: string (nullable = true)
 |-- lot1_numero: string (nullable = true)
 |-- lot1_surface_carrez: double (nullable = true)
 |-- lot2_numero: string (nullable = true)
 |-- lot2_surfa

<font color='red'>Q3 - Display one row of your mutationsDF dataframe :
    </font>

In [8]:

# On affiche les 10 premières lignes tel qu'elles sont enregistrées dans le fichier
mutationsDF.show(1)

+-----------+-------------+------------------+---------------+---------------+--------------+---------------+----------------+-----------------+-----------+------------+-------------+----------------+-------------------+------------------+--------------+------------------+-------------+-----------+-------------------+-----------+-------------------+-----------+-------------------+-----------+-------------------+-----------+-------------------+-----------+---------------+----------+-------------------+-------------------------+-------------------+--------------+----------------------------+-----------------------+---------------+---------+--------+
|id_mutation|date_mutation|numero_disposition|nature_mutation|valeur_fonciere|adresse_numero|adresse_suffixe|adresse_nom_voie|adresse_code_voie|code_postal|code_commune|  nom_commune|code_departement|ancien_code_commune|ancien_nom_commune|   id_parcelle|ancien_id_parcelle|numero_volume|lot1_numero|lot1_surface_carrez|lot2_numero|lot2_surface_

<font color='red'>Q4 - Create from mutationsDF dataframe a new Dataframe CleanMutationDF which matches the following constraints : 
* type_local is not null
* type_local is 'Maison' or 'Appartement'
* only the following attributes should be selected : id_mutation , nature_mutation, type_local , date_mutation , valeur_fonciere


</font>

In [10]:
CleanMutationDF = mutationsDF.filter("type_local is not null AND ( type_local = 'Appartement' OR  type_local = 'Maison')").select("id_mutation","nature_mutation","type_local","date_mutation","valeur_fonciere")


In [11]:
CleanMutationDF.show(5)

+-----------+---------------+-----------+-------------+---------------+
|id_mutation|nature_mutation| type_local|date_mutation|valeur_fonciere|
+-----------+---------------+-----------+-------------+---------------+
|     2021-1|          Vente|     Maison|   2021-01-05|       185000.0|
|     2021-3|          Vente|     Maison|   2021-01-04|       204332.0|
|     2021-4|          Vente|     Maison|   2021-01-06|       320000.0|
|     2021-6|          Vente|Appartement|   2021-01-04|       176000.0|
|     2021-9|          Vente|     Maison|   2021-01-04|       226700.0|
+-----------+---------------+-----------+-------------+---------------+
only showing top 5 rows



<font color='red'>Q5 - Save your CleanMutationDF dataframe into a parquet file ( /dataspark/mutations-immobilieres.parquet ) ?
</font>

In [12]:
CleanMutationDF.write.parquet('/data/permanent/processeddata/parquet/mutations-immobilieres.parquet') 

If you need to remove the parquet file you can run the following cell :

In [18]:
!rm -rf /dataspark/mutations-immobilieres.parquet

<font color='red'>Q6 - Load  the parquet file ('/dataspark/mutations-immobilieres.parquet') into a mutationsPDF dataframe  ?</font>

In [13]:
#Chargement des données dans un dataframe 

mutationsPDF = spark.read.parquet('/data/permanent/processeddata/parquet/mutations-immobilieres.parquet') 

In [14]:
mutationsPDF.printSchema

<bound method DataFrame.printSchema of DataFrame[id_mutation: string, nature_mutation: string, type_local: string, date_mutation: string, valeur_fonciere: double]>

<font color='red'>Q7 - How many rows do you have in mutationsPDF ?</font>

In [15]:
mutationsPDF.count()

1109023



<div class="alert alert-block alert-info">
 Please note that there may be several rows for the same transaction. All the rows part of a single transaction have the same identifier (i.e. the same value) in the id_mutation column. For instance, there are two rows with the value 2021-887 in the id_mutation column.
</div>

<font color='red'>Q8 - Select all rows concerning the id_mutation 2021-15481 into the singleTrDF dataframe :
</font>


In [16]:
singleTrDF = mutationsPDF.filter(mutationsPDF.id_mutation == '2021-15481')



In [17]:
singleTrDF.show()

+-----------+---------------+----------+-------------+---------------+
|id_mutation|nature_mutation|type_local|date_mutation|valeur_fonciere|
+-----------+---------------+----------+-------------+---------------+
| 2021-15481|          Vente|    Maison|   2021-06-01|        88340.0|
+-----------+---------------+----------+-------------+---------------+





<font color='red'>Q9 - The singleTrDF contains 3 lines for the same transaction. How could you filtered out duplicated rows ? 
</font>

In [25]:
singleTrDF.distinct().show()

+-----------+---------------+----------+-------------+---------------+
|id_mutation|nature_mutation|type_local|date_mutation|valeur_fonciere|
+-----------+---------------+----------+-------------+---------------+
| 2021-15481|          Vente|    Maison|   2021-01-13|       900000.0|
+-----------+---------------+----------+-------------+---------------+



<font color='red'>Q10 -  From mutationsPDF dataframe, create a new dataframe mutationDistinctPDF matching the following constraints :
* no duplicated rows
* selecting only nature_mutation = 'Vente'
</font>

In [4]:
mutationDistinctPDF = mutationsPDF.filter("nature_mutation = 'Vente'").distinct()

In [27]:
mutationDistinctPDF.show(5)

+-----------+---------------+----------+-------------+---------------+
|id_mutation|nature_mutation|type_local|date_mutation|valeur_fonciere|
+-----------+---------------+----------+-------------+---------------+
|   2021-360|          Vente|    Maison|   2021-02-05|       285000.0|
|   2021-980|          Vente|    Maison|   2021-03-31|        56000.0|
|  2021-1139|          Vente|    Maison|   2021-04-02|       120000.0|
|  2021-1239|          Vente|    Maison|   2021-04-12|       434020.0|
|  2021-1719|          Vente|    Maison|   2021-01-29|       358000.0|
+-----------+---------------+----------+-------------+---------------+
only showing top 5 rows



<font color='red'>Q11 -  From mutationDistinctPDF dataframe, compare the sales amount (valeur_fonciere) between 'Maison' (House) and 'Appartment' by month
</font>


<div class="alert alert-block alert-info">
Note that you can use the month function included into the pyspark.sql.functions to extract the month value from a date.
</div>


In [5]:
from pyspark.sql.functions import month


#Some valeur_fonciere or date_mutation can be null
# so we can drop Rows with NULL Values on Selected Columns
CleanSales = mutationDistinctPDF.na.drop(subset=["valeur_fonciere","date_mutation"])

CleanSales.count()

341176

In [6]:
import pyspark.sql.functions as f 
# we are now ready to compute the sum

Sales = CleanSales.withColumn('mois', month("date_mutation")).groupBy("mois","type_local") \
    .agg( f.sum(mutationDistinctPDF['valeur_fonciere'].alias("total"))) 

Sales.show(truncate=False)

+----+-----------+-----------------------------+
|mois|type_local |sum(valeur_fonciere AS total)|
+----+-----------+-----------------------------+
|3   |Maison     |8.66192723233E9              |
|5   |Maison     |7.15093441882E9              |
|6   |Maison     |8.138101043709997E9          |
|3   |Appartement|6.707402151400001E9          |
|2   |Maison     |7.677523595549999E9          |
|6   |Appartement|6.682449287090001E9          |
|1   |Appartement|6.04043523954E9              |
|2   |Appartement|5.566652159379999E9          |
|5   |Appartement|5.737454823870001E9          |
|4   |Appartement|6.111317165559999E9          |
|4   |Maison     |8.158892162630002E9          |
|1   |Maison     |8.317504558539999E9          |
+----+-----------+-----------------------------+



<font color='red'>Q12 - Determine the month where sales amount is highest for 'Maison' and 'Appartement' ?
</font>

In [45]:
Sales.filter("type_local = 'Maison'").groupBy('type_local','mois').max("sum(valeur_fonciere)").show(1)

+----------+----+-------------------------+
|type_local|mois|max(sum(valeur_fonciere))|
+----------+----+-------------------------+
|    Maison|   3|          8.66192723233E9|
+----------+----+-------------------------+
only showing top 1 row



In [44]:
Sales.filter("type_local = 'Appartement'").groupBy('type_local','mois').max("sum(valeur_fonciere)").show(1)

+-----------+----+-------------------------+
| type_local|mois|max(sum(valeur_fonciere))|
+-----------+----+-------------------------+
|Appartement|   3|      6.707402151400001E9|
+-----------+----+-------------------------+
only showing top 1 row



# 3 - SQL

In this exercise, you will handle the dataset using the SQL language.


<font color='red'>Q13 - From the mutationDistinctPDF dataframe, create a view mutationSalesV ?</font>

In [107]:
# création de la vue 'mutationV' 
mutationDistinctPDF.createOrReplaceTempView("mutationSalesV")

<font color='red'>Q14 -  From the mutationSalesV view, compare the sales amount (valeur_fonciere) between 'Maison' (House) and 'Appartment' by month using SQL ?

</font>

In [113]:
spark.sql("select month(date_mutation), type_local, sum(valeur_fonciere) from mutationSalesV  group by month(date_mutation), type_local order by month").show(truncate=False)

+--------------------+-----------+--------------------+
|month(date_mutation)|type_local |sum(valeur_fonciere)|
+--------------------+-----------+--------------------+
|3                   |Maison     |8.66192723233E9     |
|5                   |Maison     |7.150934418820001E9 |
|6                   |Maison     |8.138101043709997E9 |
|3                   |Appartement|6.707402151400002E9 |
|2                   |Maison     |7.677523595550001E9 |
|6                   |Appartement|6.68244928709E9     |
|1                   |Appartement|6.040435239540002E9 |
|2                   |Appartement|5.566652159379999E9 |
|5                   |Appartement|5.737454823870001E9 |
|4                   |Appartement|6.11131716556E9     |
|4                   |Maison     |8.158892162629999E9 |
|1                   |Maison     |8.317504558539999E9 |
+--------------------+-----------+--------------------+



Close your Spark connection.

In [18]:
spark.stop()