# Calcul distribué (Map-Reduce Spark) - ESIPE, "INFO 3-Opt° Logiciel"
# Evaluation 2019-2020
<style type="text/css">
    .question {
        background-color: yellow;
    }
</style>

In this evaluation, we will analyse the parking meters of Paris for the year 2014. The dataset is composed in two parts:
* The parking meter devices
* The transactions

All the data needed for this evaluation are located in the directory `data`.

### Evaluation process

This notebook is divided in 5 parts:
* PART 1: Initiate the environment (/1)
* PART 2: Get and analyse the device dataset (/3)
* PART 3: Get and analyse the transaction dataset (/7)
* PART 4: Joining devices and transactions (/5)
* PART 5: Analytics on a map (/4)

All questions are highlight in <span style="background-color: yellow">yellow</span>. They have to be answered using Spark Core / SQL / ML features. Indicative rating is given for each question (total / 20). Task answers, code clarity and good use of spark computing possibilities are taking into account for the final rating.

During this evaluation, you can access to any support including internet, course lectures and labs. the use of online messaging and drives are not permitted during this session.

<span style="background-color: #ffbbaa;">**Do not forget to oftenly save your whole notebook.**</span>

At the end of the session, rename the notebook with the following format `"evaluation_<name>.ipynb"`. Then, send it by email at `francois@univalence.io; bernarith@univalence.io`. Wait for the examiners notebook approval before leaving the class.

## PART 1: Initiate the environment (/1)
To do our analysis, we will use Spark SQL.

<span style="background-color: yellow;">Create a SparkSession and assign it to the variable `spark`.</span>

In [1]:
//import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
    .appName("ParkingMetterAnalytics")
    .master("local[*]")
    .getOrCreate()
val sc = spark.sparkContext

println(spark.version)

2.4.0


spark = org.apache.spark.sql.SparkSession@c9d0cba
sc = org.apache.spark.SparkContext@46ba03db


org.apache.spark.SparkContext@46ba03db

If necessary, the Spark UI interface is available at http://localhost:4040/ or  http://localhost:4041/.

We will need also many Spark SQL tools. Run the cell below.

In [2]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._

## PART 2: Get and analyse the device dataset (/3)

### Read parking meter files (/0.5)
The parking meter devices are stored in a JSON file of 4.5MB.

<span style="background-color: yellow;">Read file `data/horodateurs-mobiliers.json` and store it in a variable named `raw_parkmeters`. Display its content by using the method `.show()`.</span>

In [3]:
// read "data/horodateurs-mobiliers.json"
val raw_parkmeters = spark.read.json("data/horodateurs-mobiliers.json")
raw_parkmeters.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|           datasetid|              fields|            geometry|    record_timestamp|            recordid|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|horodateurs-mobil...|[18 RUE DU GENERA...|[[2.2621510006342...|2019-07-26T14:20:...|2ac791554242ed16b...|
|horodateurs-mobil...|[44 AVENUE GEORGE...|[[2.2799760006304...|2019-07-26T14:20:...|29eb91e68c848e4a4...|
|horodateurs-mobil...|[6 RUE GIFFARD, S...|[[2.3725550003135...|2019-07-26T14:20:...|aa09cbb9d1e7cd8d6...|
|horodateurs-mobil...|[13 bis RUE FELIX...|[[2.4105978799268...|2019-07-26T14:20:...|5f959c793e8aa2865...|
|horodateurs-mobil...|[3 RUE LOUIS LOUC...|[[2.3258659994146...|2019-07-26T14:20:...|fc6e0c7d81b7051e7...|
|horodateurs-mobil...|[3 vis-à-vis RUE ...|[[2.3585040002808...|2019-07-26T14:20:...|17fd1a8389f9c1050...|
|horodateurs-mobil...|[25 RUE DES MAT

raw_parkmeters = [datasetid: string, fields: struct<adresse: string, alim: string ... 10 more fields> ... 3 more fields]


[datasetid: string, fields: struct<adresse: string, alim: string ... 10 more fields> ... 3 more fields]

### Display schema (/0.5)
The file comes with nested records. We will need to simplify its structure.

To understand its structure, <span style="background-color: yellow;">display the schema of `raw_parkmeters`.</span>

In [4]:
raw_parkmeters.printSchema()

root
 |-- datasetid: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- adresse: string (nullable = true)
 |    |-- alim: string (nullable = true)
 |    |-- arrondt: long (nullable = true)
 |    |-- geo_point_2d: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- geo_shape: struct (nullable = true)
 |    |    |-- coordinates: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |-- modele: string (nullable = true)
 |    |-- numhoro: long (nullable = true)
 |    |-- objectid: long (nullable = true)
 |    |-- regime: string (nullable = true)
 |    |-- tarif: string (nullable = true)
 |    |-- tarifhor: double (nullable = true)
 |    |-- zoneres: string (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-

### Simplify dataframe (/2)
We are here interested only on those fields:
* `numhoro`: parking meter number (it must be renamed to `parkmeter_id`)
* `arrondt`: district number in Paris (it must be renamed to `district`)
* `regime`: pricing mode (MIX = includes specific rule for inhabitants (_résident_), ROT = everyone follows the same rules - it must be renamed `type`)
* `zoneres`: residential area (it must be renamed to `area`)

<span style="background-color: yellow;">Create a new dataframe from `raw_parkmeters` named `parkmeters`, that includes only the fields shown above.</span>

In [5]:
val parkmeters = raw_parkmeters.select(
    col("fields.numhoro").alias("parkmeter_id"),
    col("fields.arrondt").alias("district"),
    col("fields.zoneres").alias("area"),
    col("fields.regime").alias("type")
)

parkmeters = [parkmeter_id: bigint, district: bigint ... 2 more fields]


[parkmeter_id: bigint, district: bigint ... 2 more fields]

## PART 3: Get and analyse the transaction dataset (/7)

### Read all the files (/0.5)
<span style="background-color: yellow;">Read all the files in `data/horodateurs-transactions-de-paiement` directory in a single command to create a dataframe named `raw_transactions`.</span>

Pay attention to the fact that there is a header in the files and that the semi-colon (`;`) is used as a field delimiter. For the last one, we will use the option `.option("delimiter", ";")`.

In [6]:
val raw_transactions = spark.read
  .option("delimiter", ";")
  .option("header", true)
  .csv("data/horodateurs-transactions-de-paiement")

raw_transactions = [horodateur: string, date horodateur: string ... 6 more fields]


[horodateur: string, date horodateur: string ... 6 more fields]

### Display the content (/0.5)
<span style="background-color: yellow;">Display its content by using the method `.show()`.</span>

In [7]:
raw_transactions.show(truncate=false)

+----------+-------------------+--------+-----------------+-------------+---------------+-------------------+-------------------+
|horodateur|date horodateur    |usager  |moyen de paiement|montant carte|dur�e pay�e (h)|d�but stationnement|fin stationnement  |
+----------+-------------------+--------+-----------------+-------------+---------------+-------------------+-------------------+
|1050      |31/01/2014 15:09:33|R�sident|CB               |3,25         |50,00          |31/01/2014 15:09:33|07/02/2014 15:09:33|
|1050      |24/01/2014 13:41:24|Rotatif |CB               |0,90         |0,25           |24/01/2014 13:41:24|24/01/2014 13:56:24|
|20301     |21/01/2014 23:56:27|R�sident|CB               |0,65         |10,00          |21/01/2014 23:56:27|22/01/2014 19:00:00|
|20301     |23/01/2014 22:51:32|R�sident|CB               |0,65         |10,00          |23/01/2014 22:51:32|24/01/2014 19:00:00|
|20301     |27/01/2014 07:45:12|R�sident|CB               |0,65         |10,00          |2

Note: here `usager` are the users of parking meters. They can be Résident (or `R�sident`), if they are inhabitants. They can be `Rotatif`, if they are considered as occasional visitors.

### The schema (/0.5)
<span style="background-color: yellow;">Now, display the schema of the dataset.</span>

In [8]:
raw_transactions.printSchema()

root
 |-- horodateur: string (nullable = true)
 |-- date horodateur: string (nullable = true)
 |-- usager: string (nullable = true)
 |-- moyen de paiement: string (nullable = true)
 |-- montant carte: string (nullable = true)
 |-- dur�e pay�e (h): string (nullable = true)
 |-- d�but stationnement: string (nullable = true)
 |-- fin stationnement: string (nullable = true)



### Cleaning (/5)
The dataset comes with some inconveniences:
* Everything is a string in this schema
* Some columns have name with strange characters
* Numbers are in French format
* Timestamps are in french format too

To improve the dataset, we will provide two functions:
* `toDouble` that takes a column representing a number, replace "," by "." and cast it into DoubleType (you will need the Spark SQL function `translate`)
* `toTimestamp` that takes a column representing a timestamp with the format `"dd/MM/yyyy HH:mm:ss"` and convert it Unix timestamp (you will need the Spark SQL function `unix_timestamp` with two parameters). A Unix timestamp is in seconds.

But first, let run the cell below, that creates a function to simplify the writing of unit tests.

In [9]:
def test_function(function : (Column => Column), de : DataFrame): Unit = {
        val text_df = de.toDF("data", "expected")
        val result = text_df
          .withColumn("result", function(col("data")))
          .withColumn("succeed", col("expected") === col("result"))
        result.show()
      }

test_function: (function: org.apache.spark.sql.Column => org.apache.spark.sql.Column, de: org.apache.spark.sql.DataFrame)Unit


#### ToDouble function (/1)
<span style="background-color: yellow;">Complete the function `toDouble`.</span>

In [10]:
def toDouble(column: Column): Column = {
        translate(column, ",", ".").cast(DoubleType)
      }

// Unit test
val data_expected = Seq(
    ("1,0", 1.0),
    ("3,4", 3.4),
    ("0,65", 0.65)
).toDF()
test_function(toDouble, data_expected)

+----+--------+------+-------+
|data|expected|result|succeed|
+----+--------+------+-------+
| 1,0|     1.0|   1.0|   true|
| 3,4|     3.4|   3.4|   true|
|0,65|    0.65|  0.65|   true|
+----+--------+------+-------+



data_expected = [_1: string, _2: double]


<console>:37: error: missing argument list for method test_function
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `test_function _` or `test_function(_,_)` instead of `test_function`.
       test_function
       ^
lastException: Throwable = null
toDouble: (column: org.apache.spark.sql.Column)org.apache.spark.sql.Column


[_1: string, _2: double]

#### ToTimestamp function (/1)
<span style="background-color: yellow;">Complete the function `toTimestamp`.</span>

In [11]:
def toTimestamp(column: Column): Column = {
    unix_timestamp(column, "dd/MM/yyyy HH:mm:ss")    
}

// Unit tests
val data_expected = Seq(
    ("31/01/2014 15:09:33", 1391180973),
    ("24/01/2014 13:56:24", 1390571784),
    ("26/01/2014 19:21:09", 1390764069)
).toDF
test_function(toTimestamp, data_expected)

+-------------------+----------+----------+-------+
|               data|  expected|    result|succeed|
+-------------------+----------+----------+-------+
|31/01/2014 15:09:33|1391180973|1391180973|   true|
|24/01/2014 13:56:24|1390571784|1390571784|   true|
|26/01/2014 19:21:09|1390764069|1390764069|   true|
+-------------------+----------+----------+-------+



data_expected = [_1: string, _2: int]


toTimestamp: (column: org.apache.spark.sql.Column)org.apache.spark.sql.Column


[_1: string, _2: int]

#### Cleaning process (/3)
Now do the cleaning:
* `horodateur` needs to be renamed into `parkmeter_id`
* `montant carte` needs to be converted into number and renamed `amount`
* `d�but stationnement` needs to be converted into timestamp and renamed `parking_start`
* `fin stationnement` needs to be converted into timestamp and renamed `parking_end`

You will also add a column `duration`, that is the result of the difference between `parking_start` and `parking_end`. Make sure that `duration` is in hours, knowing that `parking_start` and `parking_end` are in seconds.

We only want transactions for users marked as `Rotatif`.

<span style="background-color: yellow;">Starts from `raw_transactions` and apply all the cleaning rules seen above to create the dataframe `transactions`.</span>

In [12]:
val transactions = raw_transactions
    .withColumn("parking_start", toTimestamp(col("d�but stationnement")))
    .withColumn("parking_end", toTimestamp(col("fin stationnement")))
.select(
        col("horodateur").alias("parkmeter_id"),
        toDouble(col("montant carte")).alias("amount"),
        toDouble(col("dur�e pay�e (h)")).alias("payed_duration"),
        col("parking_start"),
        col("parking_end"),
        ((col("parking_end") - col("parking_start")) / (60.0 * 60.0)).alias("duration")
).where(col("usager") === "Rotatif")

transactions = [parkmeter_id: string, amount: double ... 4 more fields]


[parkmeter_id: string, amount: double ... 4 more fields]

<span style="background-color: yellow;">Use `.show()` method to display the content of `transactions`.</span>

In [13]:
transactions.show()

+------------+------+--------------+-------------+-----------+------------------+
|parkmeter_id|amount|payed_duration|parking_start|parking_end|          duration|
+------------+------+--------------+-------------+-----------+------------------+
|        1050|   0.9|          0.25|   1390570884| 1390571784|              0.25|
|       20301|   0.9|          0.25|   1390570800| 1390571700|              0.25|
|       20301|   0.9|          0.25|   1390832331| 1390833231|              0.25|
|       20301|   0.9|          0.25|   1391089833| 1391090733|              0.25|
|       20301|   0.9|          0.25|   1391188067| 1391188967|              0.25|
|       20301|   0.9|          0.25|   1391189808| 1391190708|              0.25|
|       20301|   1.8|           0.5|   1390401545| 1390403345|               0.5|
|       20301|   1.8|           0.5|   1390556079| 1390557879|               0.5|
|       20301|   1.8|           0.5|   1391105707| 1391107507|               0.5|
|       20301|  

### Number of records (/0.5)

<span style="background-color: yellow;">Display the number of records in `transactions`.</span>

In [14]:
transactions.count()

19033897

## PART 4: Joining devices and transactions (/5)
Now that we have the devices location and the transactions, we can merge those two datasets and do different analysis.

## Joining (/2)
<span style="background-color: yellow;">Create a dataframe named `parkmeter_transactions`, that joins the dataframes `parkmeters` and `transactions`.</span>

* Keep only those columns: `"parkmeter_id", "district", "area", "duration", "parking_start", "parking_end", "amount"`
* Beware! some columns are defined both in `transactions` and in `parkmeters`. Depending, on the way you reference a column, it can lead Spark to confusion and thus a failure.

In [16]:
val parkmeter_transactions =
// transactions.join(parkmeters, transactions("parkmeter_id") === parkmeters("parkmeter_id"))
transactions.join(parkmeters, "parkmeter_id")
    .select(
        transactions("parkmeter_id"),
        $"district",
        $"area",
        $"duration",
        $"parking_start",
        $"parking_end",
        $"amount")
// transactions.parkmeter_id

parkmeter_transactions = [parkmeter_id: string, district: bigint ... 5 more fields]


[parkmeter_id: string, district: bigint ... 5 more fields]

### Save the join (/1)
Before going further, due to the size of the data, the relative heaviness of the processing, and the weakness of the machine you are working on, it is preferable to store data in a Parquet file first.

Once written, this file will be used as a checkpoint. So, **if something goes wrong in your notebook, you can start again from the read of the parquet file below.**

<span style="background-color: yellow;">Store the `parkmeter_transactions` dataframe in the Parquet file `parkmeter_transactions.parquet`.</span>

In [17]:
parkmeter_transactions.write.mode("overwrite").parquet("parkmeter_transactions.parquet")

<span style="background-color: yellow;">Now load the file in `parkmeter_transactions`.</span>

In [18]:
val parkmeter_transactions = spark.read.parquet("parkmeter_transactions.parquet")

parkmeter_transactions = [parkmeter_id: string, district: bigint ... 5 more fields]


[parkmeter_id: string, district: bigint ... 5 more fields]

### First analysis of parkmeter_transactions (/2)

We will do an analysis of dataframe `parkmeter_transactions`. For that we will use the method `.describe()` available on dataframes. `.describe()` returns a dataframe with stats on the different columns.

<span style="background-color: yellow;">Use `.describe()` on `parkmeter_transactions` and display its result.</span>

In [19]:
%%dataframe
parkmeter_transactions.describe()

summary,parkmeter_id,district,area,duration,parking_start,parking_end,amount
count,18880331.0,18880331.0,18880331,18137113.0,18880331.0,18137113.0,18880331.0
mean,50660524.60244151,12.701171976275203,13.944135404050764,2.874938438858208,1403890575.7585852,1403473042.1784124,2.6736536404118287
stddev,28897308.100392148,4.6756002880304575,7.552287449180098,8.855951946794098,9307864.260225637,9248501.709550656,1.864034619949784
min,10030101.0,1.0,10E,0.0041666666666666,1388534441.0,1388653275.0,0.0
max,99980113.0,20.0,9G,760.0,1420070214.0,1420196400.0,7.2


In [27]:
parkmeter_transactions.select($"parking_end").orderBy("parking_end").show

+-----------+
|parking_end|
+-----------+
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
+-----------+
only showing top 20 rows



The `count` row shows the number of non-null elements for each column.

What can you identify from the result of `.describe()`?

<span style="background-color: yellow;">Update `parkmeter_transactions` to remove rows with undesirable values.</span>

In [20]:
val parkmeter_transactions_updated = parkmeter_transactions.where(col("parking_end").isNotNull)

parkmeter_transactions_updated = [parkmeter_id: string, district: bigint ... 5 more fields]


[parkmeter_id: string, district: bigint ... 5 more fields]

## PART 5: Analytics (/4)

### Number of transactions (/2)
<span style="background-color: yellow;">Find the number of transactions per district on the map of Paris, the columns must be "district" and "count_transactions".</span>

In [22]:
val count_transactions = parkmeter_transactions_updated
 .groupBy("district")
 .agg(count(lit(1))
   .alias("count_transactions"))

count_transactions.orderBy("district").show()

+--------+------------------+
|district|count_transactions|
+--------+------------------+
|       1|            217918|
|       2|            150775|
|       3|            332353|
|       4|            297668|
|       5|            671297|
|       6|            522417|
|       7|            844408|
|       8|           1224097|
|       9|            641778|
|      10|            418927|
|      11|            918664|
|      12|           1181645|
|      13|           1612113|
|      14|           1278967|
|      15|           1861138|
|      16|           2023056|
|      17|           1447280|
|      18|            775634|
|      19|            844784|
|      20|            872194|
+--------+------------------+



count_transactions = [district: bigint, count_transactions: bigint]


[district: bigint, count_transactions: bigint]

<span style="background-color: yellow;">Find the number of transactions per area on the map of Paris, the columns must be "area" and "count_transactions".</span>

In [24]:
val count_transactions = parkmeter_transactions_updated.groupBy("area").agg(count(lit(1)).alias("count_transactions"))

count_transactions.show()

+----+------------------+
|area|count_transactions|
+----+------------------+
| 20E|             59020|
| 17R|            195511|
| 13G|             96797|
| 17H|            143629|
| 14G|             89070|
| 16H|             83797|
| 12K|             80208|
|  2E|            150775|
| 16W|            113714|
| 20L|             69274|
| 11G|            155370|
| 17F|            162810|
| 11L|            178897|
| 15M|            146438|
| 19L|             79961|
| 16I|            102528|
| 20H|            135256|
| 18H|             37052|
| 14N|            261768|
|  7E|            133919|
+----+------------------+
only showing top 20 rows



count_transactions = [area: string, count_transactions: bigint]


[area: string, count_transactions: bigint]

## Average transaction amount (/2)
<span style="background-color: yellow;">Find the average transaction amount per district in Paris, the columns must be "district" and "avg_amount".</span>

In [25]:
val avg_amount = parkmeter_transactions.groupBy("district").agg(avg("amount").alias("avg_amount"))

avg_amount.orderBy("district").show()

+--------+------------------+
|district|        avg_amount|
+--------+------------------+
|       1| 4.301764246339528|
|       2| 4.480481584776363|
|       3| 4.305446529342117|
|       4| 4.117026393682926|
|       5|  4.11636519485262|
|       6| 4.091528713153557|
|       7| 4.358567822283758|
|       8|3.7411084164675112|
|       9| 3.285424385216695|
|      10|3.0349363138361745|
|      11|3.0225273033470357|
|      12|2.2972604344204712|
|      13|1.9482355792491894|
|      14| 2.054451960621803|
|      15|1.9789072250383324|
|      16|2.3930864311867337|
|      17|2.3881802450173786|
|      18|1.9813573437728245|
|      19|1.6529570147163235|
|      20|1.7255606331221356|
+--------+------------------+



avg_amount = [district: bigint, avg_amount: double]


[district: bigint, avg_amount: double]

<span style="background-color: yellow;">Find the average transaction amount per area in Paris, the columns must be "area" and "avg_amount".</span>

In [23]:
val avg_amount = parkmeter_transactions.groupBy("area").agg(avg("amount").alias("avg_amount"))

avg_amount.show()

+----+------------------+
|area|        avg_amount|
+----+------------------+
| 20E|1.8331724677103984|
| 17R|2.0945348911531583|
| 13G|1.8544097995547602|
| 17H|2.8468915847870147|
| 14G|1.7036216822272703|
| 16H|4.4431372571414265|
| 12K|1.6725680547076067|
|  2E| 4.480481584776363|
| 16W| 1.554992893761908|
| 20L|1.6929231895876082|
| 11G|3.2125187141951104|
| 17F|2.1540502423191694|
| 11L|2.8811642590074285|
| 15M|1.6813064576330332|
| 19L|1.5279515326018853|
| 16I| 4.223896085728048|
| 20H| 1.755981560666388|
| 18H| 1.874756749774015|
| 14N|2.7749364197021045|
|  7E| 4.527142202837942|
+----+------------------+
only showing top 20 rows



avg_amount = [area: string, avg_amount: double]


[area: string, avg_amount: double]

## End

At the end of the session, rename the notebook with the following format `"evaluation_<name>.ipynb"`. Then, send it by email at `francois@univalence.io; bernarith@univalence.io`. Wait for the examiners notebook approval before leaving the class.