# CSC5003 Mini-projet 1
Alice ZHEN

## Objectif métier
On souhaite aider à sélectionner un point de vente de carburant à proximité d'un utilisateur automobiliste en fonction de la distance et du prix du carburant.

## Jeu de données utilisé
Prix des carburants en France le 9 janvier 2021 \
https://www.data.gouv.fr/fr/datasets/r/087dfcbc-8119-4814-8412-d0a387fac561

On peut observer que ce jeu de données est au format XML:

In [1]:
import scala.sys.process._
Process("cat PrixCarburants_quotidien_20210109.xml")!!

[32mimport [39m[36mscala.sys.process._
[39m
[36mres0_1[39m: [32mString[39m = [32m"""<?xml version="1.0" encoding="ISO-8859-1" standalone="yes"?>
<pdv_liste>
  <pdv id="1000001" latitude="4620114" longitude="519791" cp="01000" pop="R">
    <adresse>596 AVENUE DE TREVOUX</adresse>
    <ville>SAINT-DENIS-L�S-BOURG</ville>
    <services>
      <service>Station de gonflage</service>
      <service>Vente de gaz domestique (Butane, Propane)</service>
      <service>DAB (Distributeur automatique de billets)</service>
    </services>
    <prix nom="Gazole" id="1" maj="2021-01-08T10:20:52" valeur="1274"/>
    <prix nom="SP95" id="2" maj="2021-01-08T10:20:52" valeur="1388"/>
    <prix nom="SP98" id="6" maj="2021-01-09T09:25:53" valeur="1405"/>
    <rupture id="4" nom="GPLc" debut="2017-09-16T09:50:23" fin=""/>
    <rupture id="3" nom="E85" debut="2017-09-16T09:50:23" fin=""/>
    <rupture id="5" nom="E10" debut="2018-12-13T09:49:49" fin=""/>
  </pdv>
  <pdv id="1000002" latitude="4621842

## Préparation de l'environnement
On va utiliser Spark afin de traiter les données.

On importe dans un premier temps les dépendences nécessaires:

In [2]:
import $ivy.`org.apache.spark::spark-sql:2.4.0`
import $ivy.`sh.almond::almond-spark:0.10.9`
import $ivy.`com.databricks::spark-xml:0.11.0`
import $ivy.`com.lihaoyi::upickle:0.7.1`
import $ivy.`com.lihaoyi::requests:0.6.5`

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                               
[39m
[32mimport [39m[36m$ivy.$                                 
[39m
[32mimport [39m[36m$ivy.$                           
[39m
[32mimport [39m[36m$ivy.$                            [39m

On configure le niveau des logs:

In [3]:
// Configure le niveau des logs
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.ERROR)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m

## Chargement des données avec Spark
On crée une session Spark:

In [4]:
import org.apache.spark.sql._

val spark = {
    NotebookSparkSession.builder()
        .master("local[*]")
        .getOrCreate()
}

Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36morg.apache.spark.sql._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@1f56ee9d

On utilise la librarie [spark-xml](https://github.com/databricks/spark-xml) afin de parser les données XML avec Spark afin de pouvoir le charger en tant que structure DataFrame:

In [5]:
import com.databricks.spark.xml._

// Lire le jeu de données en format XML
val df = spark.read
    .option("rowTag", "pdv")
    .option("encoding", "UTF-8")
    .xml("PrixCarburants_quotidien_20210109.xml")

df.show()

+----+-------+-------------+-------------+----+--------------------+---------+--------+--------------------+--------------------+--------------------+--------------------+
| _cp|    _id|    _latitude|   _longitude|_pop|             adresse|fermeture|horaires|                prix|             rupture|            services|               ville|
+----+-------+-------------+-------------+----+--------------------+---------+--------+--------------------+--------------------+--------------------+--------------------+
|1000|1000001|    4620114.0|     519791.0|   R|596 AVENUE DE TRE...|     null|    null|[[, 1, 2021-01-08...|[[, 2017-09-16T09...|[[Station de gonf...|SAINT-DENIS-L�S-B...|
|1000|1000002|    4621842.0|     522767.0|   R| 16 Avenue de Marboz|     null|    null|[[, 1, 2021-01-06...|[[, 2017-09-07T11...|[[Vente de gaz do...|     BOURG-EN-BRESSE|
|1000|1000004|    4618800.0|     524500.0|   R|20 Avenue du Mar�...|     null|    null|                null|                null|           

[32mimport [39m[36mcom.databricks.spark.xml._

// Lire le jeu de données en format XML
[39m
[36mdf[39m: [32mDataFrame[39m = [_cp: bigint, _id: bigint ... 10 more fields]

## Nettoyer les données chargées
Nous considérons un utilisateur dont on connait les coordonnées géographiques qui va sélectionner le type de carburant qu'il recherche.

Dans notre exemple, il s'agit d'un véhicule au **19 place Marguerite Perey
à Palaiseau** qui souhaite se recharger en **SP95**.

In [6]:
// Informations sur le véhicule de l'utilisateur
val inputCarburantType = "SP95"
val inputLat = "48.712793"
val inputLong = "2.199441"

[36minputCarburantType[39m: [32mString[39m = [32m"SP95"[39m
[36minputLat[39m: [32mString[39m = [32m"48.712793"[39m
[36minputLong[39m: [32mString[39m = [32m"2.199441"[39m

Nous allons nettoyer dans un premier temps le jeu de données en ne conservant que les points de vente où le carburant sélectionné est vendu et disponible et en ne sélectionnant que les colonnes qui nous intéressent:

In [7]:
import org.apache.spark.sql.functions.{array_contains, col, explode}

val dfClean = df.withColumn("prix", explode(col("prix")))
    .select("_longitude", "_latitude", "adresse", "ville", "prix._valeur")
    .filter(col("prix._nom") === inputCarburantType)
    .filter(! array_contains(col("rupture._nom"), inputCarburantType))

[32mimport [39m[36morg.apache.spark.sql.functions.{array_contains, col, explode}

[39m
[36mdfClean[39m: [32mDataset[39m[[32mRow[39m] = [_longitude: double, _latitude: double ... 3 more fields]

L'affichage des données nettoyées nous donne le tableau ci-dessous:

In [8]:
dfClean.show()

+-------------+-------------+--------------------+--------------------+-------+
|   _longitude|    _latitude|             adresse|               ville|_valeur|
+-------------+-------------+--------------------+--------------------+-------+
|     519791.0|    4620114.0|596 AVENUE DE TRE...|SAINT-DENIS-L�S-B...|   1388|
|     522767.0|    4621842.0| 16 Avenue de Marboz|     BOURG-EN-BRESSE|   1383|
|     524500.0|    4618800.0|20 Avenue du Mar�...|     Bourg-en-Bresse|   1490|
|     524100.0|    4619900.0|Bd Charles de Gaulle|     BOURG-EN-BRESSE|   1388|
|     522935.0|    4619566.0|     56 Rue du Stand|     Bourg-en-Bresse|   1383|
|     566282.0|    4628404.0|     ROUTE DE DORTAN|              Arbent|   1429|
|     566000.0|    4627200.0|886 AVENUE JEAN C...|    ARBENT - OYONNAX|   1439|
|     559900.0|    4597800.0|         Rue Masonod|  HAUTEVILLE-LOMPNES|   1420|
|     569100.0|    4616700.0| 867 ROUTE DE GENEVE|LE POIZAT LALLEYRIAT|   1459|
|     527800.0|    4583900.0|Avenue Char

## Enrichir les données en calculant la distance des points de vente
Afin de pouvoir ordonner les points de vente en fonction de leur distance à l'utilisateur, nous allons ajouter une colonne dans le jeu de données qui va contenir la distance du point de vente à l'utilisateur.

Pour obtenir cette distance, nous utiliserons [l'API OpenRoute Service](https://openrouteservice.org/) qui permet entre autre de déterminer un itinéraire entre deux points.

In [None]:
import java.nio.file.{Paths, Files}
import java.nio.charset.StandardCharsets

// Définit les variables d'ORS utilisées
val orsToken = Files.readString(Paths.get(".ors_token"), StandardCharsets.UTF_8)
val orsProfile = "driving-car"


Cette API est gratuite, c'est pourquoi son utilisation est limitée par des quotas (max 40 requêtes / min et 2000 requêtes / jour). Afin de respecter ces quotas, nous choisissons de limiter notre démonstration à un échantillon de 200 points de ventes et nous attendons 1.5s entre chaque requête. Nous allons également mettre en cache les données résultant de ces requêtes afin de ne pas avoir à les refaire par la suite.

In [10]:
import java.lang.String
import java.time._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types._
import ujson._


// Définit une fonction qui interroge l'API Openroute Service et qui renvoie la distance en km
def getDistanceFromUser(long:Double, lat:Double) : String = {
    var url = "https://api.openrouteservice.org/v2/directions/driving-car/geojson"
    var data = Obj("coordinates" -> Arr(Arr(inputLong,inputLat), Arr(long,lat)), 
                   "radiuses" -> Arr(350, 2000))
    var headers = Map("Authorization" -> orsToken, "Content-Type" -> "application/json; charset=utf-8")
    
    var req = requests.post(url = url, data = data.toString, headers = headers)
    
    var distance = -1.0
    if (req.statusCode == 200) {
        distance = ujson.read(req.text)("features")(0)("properties")("summary")("distance").num / 1000
    }
    // Pause pour ne pas dépasser le quota de 40 requests/min
    Thread.sleep(1500)
    return String.valueOf(distance)
}

// Définit une UDF
def getDistance = udf(getDistanceFromUser _, StringType)

// Ajoute la colonne `distance` aux données 
// (on prend un échantillon de 200 en raison des limitations de crédits API)
val dfDistance = dfClean.sample(true, 1D*200/df.count)
                .withColumn("longitude", col("_longitude") / 100000)
                .withColumn("latitude", col("_latitude") / 100000)
                .withColumn("distance", getDistance(col("longitude"), col("latitude")))
                .filter(col("distance") > 0)
                .select("adresse", "ville", "_valeur", "longitude", "latitude", "distance")
                .cache()

[32mimport [39m[36mjava.lang.String
[39m
[32mimport [39m[36mjava.time._
[39m
[32mimport [39m[36morg.apache.spark.sql.functions.udf
[39m
[32mimport [39m[36morg.apache.spark.sql.types._
[39m
[32mimport [39m[36mujson._


// Définit une fonction qui interroge l'API Openroute Service et qui renvoie la distance en km
[39m
defined [32mfunction[39m [36mgetDistanceFromUser[39m
defined [32mfunction[39m [36mgetDistance[39m
[36mdfDistance[39m: [32mDataset[39m[[32mRow[39m] = [adresse: string, ville: string ... 4 more fields]

On peut observer qu'une colonne `distance` a bien été ajoutée à nos données:

In [11]:
dfDistance.show()

+--------------------+--------------------+-------+------------------+------------------+------------------+
|             adresse|               ville|_valeur|         longitude|          latitude|          distance|
+--------------------+--------------------+-------+------------------+------------------+------------------+
|1000 rue jean jaures|    Fresnoy-le-Grand|   1371|         3.4304281|         49.957514|210.43370000000002|
|ZAC DE W� - 2 ave...|       CARIGNAN - W�|   1384|        5.16159956|       49.63718865|            283.01|
|   Route de Toulouse|        Saint-Lizier|   1389|   1.1307646674172|   43.002708124042|          748.4149|
|    335 rue du p�age|               AUXON|   1414|             3.917|             48.11|          167.3117|
|  BOULEVARD NAPOLEON|  Brienne-le-Ch�teau|   1385|          4.528256|        48.3965167|          209.9967|
|33 Boulevard du M...|           Marseille|   1391|           5.40524|          43.31339|          764.3521|
|chemin du puits d.

## Proposer des résultats en fonction des critères sélectionnés
Une fois la distance à l'utilisateur des points de vente de notre échantillon ajoutée à nos données, nous allons pouvoir proposer des résultats.

Pour cela, nous proposerons à l'utilisateur de sélectionner la **distance maximum** de la recherche (300 km dans notre exemple car nous avons pris seulement un échantillon des données).

Il doit ensuite choisir entre deux critères: la **distance** et le **prix**.

Le choix du critère `distance` va proposer à l'utilisateur les points de vente dans l'ordre de proximité par rapport à sa localisation.

Le choix du critère `price` quant à lui va ordonner les points de vente par ordre de prix croissant.

In [12]:
import org.apache.spark.sql.functions.asc
import scala.math.round

val round_ = udf((value: String) => "%.2f".format(value.toDouble))

// Définit une fonction pour prendre les meilleurs résultats selon nos critères
def getBestResults(order: String, maxDistance: Double, dataframe: Dataset[Row]) = {
    order match {
        case "distance" => dataframe.withColumn("distance", round_(col("distance")).cast(DoubleType))
                            .withColumn("prix", col("_valeur").cast(DoubleType) / 1000)
                            .filter(col("distance") < maxDistance)
                            .orderBy(col("distance").asc)
        case "price" => dataframe.withColumn("distance", round_(col("distance")).cast(DoubleType))
                        .filter(col("distance") < maxDistance)
                        .withColumn("prix", col("_valeur").cast(DoubleType) / 1000)
                        .orderBy(col("prix").asc)
        case _ => spark.emptyDataFrame
    }
}

// Sélectionne les critères selon lesquels on choisit de classer les points de vente
var maxDistance = 300

var order = "distance"
val bestResultsByDistance = getBestResults(order, maxDistance, dfDistance).select("longitude", "latitude", "adresse", "ville", "prix", "distance")

order = "price"
val bestResultsByPrice = getBestResults(order, maxDistance, dfDistance).select("longitude", "latitude", "adresse", "ville", "prix", "distance")

## Affichage des points de vente sélectionnés
Une fois les données ordonnées, nous pouvons afficher le choix des points de vente qui correspondent aux critères:

In [13]:
// Affiche les meilleurs résultats selon le critère distance
bestResultsByDistance.show()

+---------------+------------------+--------------------+--------------------+-----+--------+
|      longitude|          latitude|             adresse|               ville| prix|distance|
+---------------+------------------+--------------------+--------------------+-----+--------+
|          2.408|            48.743|8 Place Gaston Viens|                Orly|1.419|   21.41|
|          2.393|            48.296|35 rue du general...|         MALESHERBES|1.359|   75.05|
|          2.373|            49.137|  ROUTE DE ROYAUMONT|             VIARMES|1.377|   80.62|
|          2.798|            49.298|176, Avenue de la...|B�THISY-SAINT-PIERRE|1.395|  102.78|
|          1.157|            49.319|      ROUTE DE LYONS|            IGOVILLE|1.419|  129.66|
|          1.034|            49.451|     rue de Montigny|            CANTELEU|1.369|  148.47|
|1.2993805014435|    47.58142578404|1 rue de la Quini�re|               Blois|1.375|   164.2|
|1.2993805014435|    47.58142578404|1 rue de la Quini�re|   

In [14]:
// Affiche les meilleurs résultats selon le critère prix
bestResultsByPrice.show()

+---------------+------------------+--------------------+--------------------+-----+--------+
|      longitude|          latitude|             adresse|               ville| prix|distance|
+---------------+------------------+--------------------+--------------------+-----+--------+
|          2.393|            48.296|35 rue du general...|         MALESHERBES|1.359|   75.05|
|           3.41|            50.112|      Rue de la Gare|              CAUDRY|1.363|  222.36|
|          1.034|            49.451|     rue de Montigny|            CANTELEU|1.369|  148.47|
|    -0.28047954|       49.26294216| route de Ouistreham|Saint-Aubin-d'Arq...|1.369|  253.62|
|      3.4304281|         49.957514|1000 rue jean jaures|    Fresnoy-le-Grand|1.371|  210.43|
|1.2993805014435|    47.58142578404|1 rue de la Quini�re|               Blois|1.375|   164.2|
|1.2993805014435|    47.58142578404|1 rue de la Quini�re|               Blois|1.375|   164.2|
|          0.101|            49.506|8, rue Romain Rol...|   

On peut observer que les stations suggérées dans notre exemple sont très éloignées de l'utilisateur. Cela s'explique par l'échantillon restreint de points de vente sur lequel nous avons travaillé. 

Travailler sur le jeu de données en entier aurait permis que seuls les points de vente les plus proches soient suggérés à l'utilisateur.

## Visualisation
Supposons que l'utilisateur souhaite se rendre dans le point de vente le plus proche (8 Place Gaston Viens à Orly).

La réponse de l'API OpenRoute Service nous permet également d'obtenir un itinéraire pour aller de l'utilisateur au point de vente.

Pour cela, on peut sélectionner les coordonnées du point de vente le plus proche trouvé et effectuer une nouvelle requête à l'API ORS afin de récupérer l'itinéraire sous format GeoJSON:

In [15]:
// On récupère les coordonnées du point de vente le plus proche:
val destLong = bestResultsByDistance.select(col("longitude")).first().getDouble(0)
val destLat = bestResultsByDistance.select(col("latitude")).first().getDouble(0)
                    
// On construit la requête
var url = "https://api.openrouteservice.org/v2/directions/driving-car/geojson"
var data = Obj("coordinates" -> Arr(Arr(inputLong,inputLat), Arr(destLong,destLat)), 
                   "radiuses" -> Arr(350, 2000))
var headers = Map("Authorization" -> s"$orsToken", "Content-Type" -> "application/json; charset=utf-8")
    
// On effectue la requête
var req = requests.post(url = url, data = data.toString, headers = headers)
assert(req.statusCode == 200)

Files.write(Paths.get("pathGeo.json"), ujson.transform(req.text, BytesRenderer()).toBytes)

On a enregistré l'itinéraire sous format GeoJSON dans le fichier [pathGeo.json](pathGeo.json) et pouvons le visualiser sur [OpenStreetMap](https://umap.openstreetmap.fr/fr/map/new/#6/51.000/2.000) en utilisant la fonction `Importer des données`.

On observe qu'il s'agit bien d'un itinéraire pour aller de Palaiseau à Orly.
![](osm_path.png)