<a href="https://colab.research.google.com/github/pierredumontel/subway_validations-/blob/main/Notebook/Traitement_des_validations_du_re%CC%81seau_de_transport_d'Ile_de_France_a%CC%80_comple%CC%81ter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Mini-projet Apache Spark

Merci d'indiquer les noms composant le binôme :

|Nom | Prénom|
|---|---|
| DUMONTEL | Pierre |
| ROUET | William |

L'objectif est de collecter les données de validations quotidiennes de titres de transport de la région parisienne, disponibles en Open Data sur le site de Mobilités Ile-de-France.

On se concentrera sur le **réseau ferré** exclusivement pour cet exercice.

## Installation de Spark

Apache Spark n'est pas disponible en standard sur Google Colab.
Procéder à son installation, ainsi qu'à son initialisation pour réaliser le traitement à venir.

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 45.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=adb57d8310472bc2c515338f6e38538a5c2d38364b67c07f69ce831f0dea4d05
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
import pyspark

sc = pyspark.SparkContext()
sql_sc = pyspark.SQLContext(sc)



In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
spark = (pyspark.sql.SparkSession
         .builder
         .appName('GasPrice')
         .getOrCreate()
)

## Récupération des données

Sur le site https://data.iledefrance-mobilites.fr, récupérer les données de validation par jour.

Exemple :
https://data.iledefrance-mobilites.fr/explore/dataset/validations-sur-le-reseau-ferre-nombre-de-validations-par-jour-2e-sem/information/

Récupérer les données de S1 2020 et S2 2021 pour disposer d'une année complète.

On utilisera pour ce faire les commandes de téléchargement de fichiers depuis un site (pas de chargement manuel).

__Attention__ : prévoir une vingtaine de minutes pour le téléchargement, au moins une première fois, et donc une copie sur Google Drive si Google Colab est utilisée, afin d'éviter ce temps d'attente lors de sessions de travail successives.

In [5]:
!curl -O https://data.iledefrance-mobilites.fr/explore/dataset/validations-sur-le-reseau-ferre-nombre-de-validations-par-jour-2e-sem/download/?format=csv&timezone=Europe/Berlin&lang=fr&use_labels_for_header=true&csv_separator=%3B

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 47.4M    0 47.4M    0     0   140k      0 --:--:--  0:05:45 --:--:--  203k


In [6]:
!ls -lh

total 48M
drwx------ 5 root root 4.0K Mar 11 10:17 drive
drwxr-xr-x 1 root root 4.0K Mar  9 14:48 sample_data
-rw-r--r-- 1 root root  48M Mar 11 10:23 S_TWO_Twenty.csv


In [None]:
!curl -O https://data.iledefrance-mobilites.fr/explore/dataset/validations-sur-le-reseau-ferre-nombre-de-validations-par-jour-1er-sem/download/?format=csv&timezone=Europe/Berlin&lang=fr&use_labels_for_header=true&csv_separator=%3B

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 6766k    0 6766k    0     0   128k      0 --:--:--  0:00:52 --:--:--  191k

In [18]:
!ls -lh

total 79M
drwx------ 5 root root 4.0K Mar 11 10:17 drive
drwxr-xr-x 1 root root 4.0K Mar  9 14:48 sample_data
-rw-r--r-- 1 root root  31M Mar 11 10:35 S_ONE_twenty_one.csv
-rw-r--r-- 1 root root  48M Mar 11 10:23 S_TWO_Twenty.csv


### Commentaires sur les données disponibles sur ce portail

Investiguer les données diponibles sur le portail.

Question : peut-on constituer un historique de données s'étendant sur les trois dernières années (2019 à 2021) ?

**A cette adresse https://data.iledefrance-mobilites.fr/explore/dataset/histo-validations/table/?sort=types_de_validations, il y a les données de validation du S1 2019, mais rien pour le S2 2019. Rien n'est renseigné pour le S1 2020 également. **

## Lecture des fichiers dans Spark

Lire les fichiers en choisissant les bonnes options de lecture.
Concaténer les données en une seule table.

In [9]:
S_TWO_twenty =spark.read.csv('/content/S_TWO_Twenty.csv', sep=';', header = True)
S_ONE_twenty_one =spark.read.csv('/content/S_ONE_twenty_one.csv', sep=';', header = True)

Concaténation des fichiers

In [12]:
validations_ddf = S_ONE_twenty_one.union(S_TWO_twenty)

In [16]:
S_TWO_twenty.show(5)

+----------+--------------+-------------+---------------+--------------------+-----------+---------------+-------+
|      jour|code_stif_trns|code_stif_res|code_stif_arret|       libelle_arret|id_refa_lda|categorie_titre|nb_vald|
+----------+--------------+-------------+---------------+--------------------+-----------+---------------+-------+
|2020-08-01|           100|          110|            691|PORTE DE LA CHAPELLE|      72064|            TST|    449|
|2020-08-01|           100|          110|            692|PORTE DE LA VILLETTE|      72430|      IMAGINE R|    624|
|2020-08-01|           100|          110|            692|PORTE DE LA VILLETTE|      72430|         NAVIGO|   2232|
|2020-08-01|           100|          110|            692|PORTE DE LA VILLETTE|      72430|            TST|    729|
|2020-08-01|           100|          110|            693|  PORTE DE MONTREUIL|      71710|      IMAGINE R|    493|
+----------+--------------+-------------+---------------+--------------------+--

## Validation

Si vous avez appelé votre dataframe `validations_ddf`, le test suivant ne doit pas générer d'erreur.

In [13]:
assert validations_ddf.count() == 1_316_287, "Le nombre de lignes ne correspond pas"

AssertionError: ignored

Dans le cas où ce code génère une erreur, il s'agit probablement d'un problème de récupération ou de lecture des deux fichiers.

## Préparation des données

Réaliser les transformations nécessaires pour exploiter ces données :
- préparation des dates
- transformation du nombre de validation

### Explication pour le nombre de validations

Analyser les valeurs prises par ce champ et déterminer le problème.
Présenter votre stratégie pour remédier à ce choix de codage par Mobilités Ile-de-France.

In [None]:
validations_ddf = (validations_ddf
 .withColumn("nb_vald",validations_ddf.nb_vald.cast('int'))
 .withColumn("jour",validations_ddf.jour.cast('timestamp'))
)

validations_ddf.printSchema()

In [None]:
validations_ddf.show(5)

Choix de codage de Mobilités Ile-de-France : le nombre de validation est renseigné par catégories de titre sur une journée. C'est à dire que pour obtenir le nombre total de validations par jour sur une seule ligne, il faudrait sommer le nombre de validations par catégories de titre pour un jour. 
Ou regrouper toutes les mêmes dates ensemble

## Détermination des principales catégories de titre

Différentes catégories de titre sont utilisées sur le réseau.

Déterminer les deux catégories principalement utilisées. Seules ces catégories seront utilisées dans les travaux ci-après (les utiliser comme filtre sur les validations dans la suite).

In [None]:
import pyspark.sql.functions as F 
validations_ddf.groupBy('categorie_titre').agg(F.count('nb_vald').alias('effectif')).sort('effectif', ascending = False).show(2)

In [None]:
(validations_ddf
 .filter((validations_ddf.categorie_titre == "NAVIGO") | (validations_ddf.categorie_titre == "IMAGINE R"))
)

## Visualisation du trafic dans une station

Visualiser le trafic à la gare de Lyon pour les deux catégories de titre principales.

Attention à gérer le cas des gares (comme la gare de Lyon) présentes sur plusieurs lignes et dont le libellé apparaît donc sur plusieurs lignes. Investiguer ce cas avant de déterminer la bonne façon de calculer le nombre de validations pour la gare de Lyon.

In [None]:
# Votre code mettant en évidence le cas des gares sur plusieurs lignes

# La colonne "code_stif_arret" donne le code de l'arret et/ou de la station, on voit que la Gare de Lyon accueille plusieurs arrets : 
(validations_ddf
 .filter((validations_ddf.categorie_titre == "NAVIGO") | (validations_ddf.categorie_titre == "IMAGINE R"))
.filter(validations_ddf.libelle_arret == "GARE DE LYON")
 .groupBy('code_stif_arret').agg(F.sum('nb_vald').alias('nb de vald par arret'))
 .show()
)

In [None]:
# Votre code visualisant le trafic à la gare de Lyon
(validations_ddf
 .filter((validations_ddf.categorie_titre == "NAVIGO") | (validations_ddf.categorie_titre == "IMAGINE R"))
 .filter(validations_ddf.libelle_arret == "GARE DE LYON")
 .groupBy('libelle_arret','categorie_titre','code_stif_arret').agg(F.sum('nb_vald').alias('effectif'))
 .show()
)

In [None]:
(validations_ddf
 .filter((validations_ddf.categorie_titre == "NAVIGO") | (validations_ddf.categorie_titre == "IMAGINE R"))
 .filter(validations_ddf.libelle_arret == "GARE DE LYON")
 .groupBy('libelle_arret','categorie_titre').agg(F.sum('nb_vald').alias('nb_de_vald_par_arret'))
 .show()
)

## Fluctuation du trafic hebdomadaire

Calculer le trafic total et le pourcentage par jour de la semaine sur l'ensemble du réseau.

Trier le résultat par ordre décroissant de validations.

Note : considérer l'usage d'une fonction analytique (`Window.partitionBy()`).

dayofweek = sunday is 1, monday is 2,…, Saturday is 7 

In [None]:
from pyspark.sql.functions import col, round

prt_df = (validations_ddf
            .filter((validations_ddf.categorie_titre == "NAVIGO") | (validations_ddf.categorie_titre == "IMAGINE R"))
            .withColumn('jour_semaine', F.dayofweek(F.col('jour')))
            .withColumn('num_semaine', F.weekofyear(F.col('jour')))
            .toPandas()
            )

(validations_ddf
 .filter((validations_ddf.categorie_titre == "NAVIGO") | (validations_ddf.categorie_titre == "IMAGINE R"))
 .withColumn('jour_semaine', F.dayofweek(F.col('jour')))
 .withColumn('num_semaine', F.weekofyear(F.col('jour')))
 .groupBy('jour_semaine').agg(F.sum('NB_VALD').alias('nb_vald_sumcum_byday')).sort('nb_vald_sumcum_byday', ascending = False)
 .withColumn('% of total', round((F.col('nb_vald_sumcum_byday')/prt_df['nb_vald'].sum()),2))
 .show()
)

In [None]:
(validations_ddf
 .filter((validations_ddf.categorie_titre == "NAVIGO") | (validations_ddf.categorie_titre == "IMAGINE R"))
 .agg(F.sum(F.col('nb_vald')))
 ).show()

## Analyse de l'impact du reconfinement d'octobre 2020

Mettre en évidence graphiquement l'impact du reconfinement.

N'utiliser que les catégories de titre _IMAGINE R_ et _Navigo_.

Le reconfinement d'octobre 2020 fut acté le 29 octobre 2020. L'idée serait de visualiser le nombre de validation par jour avant et après cette date.

In [None]:
import seaborn as sns 
import plotly.graph_objects as go
sns.set()


prt2_df = (validations_ddf
            .filter((validations_ddf.categorie_titre == "NAVIGO") | (validations_ddf.categorie_titre == "IMAGINE R"))
            .groupBy('jour').agg(F.sum('nb_vald').alias('tot_vald'))
            .toPandas()
            )


data_prt = [go.Scatter(x = prt2_df.sort_values('jour')['jour'],
                         y = prt2_df.sort_values('jour')['tot_vald'])]


    # Disposition (layout)
layout = dict(
        title='Analyse quotidienne du trafic 07/2020-05/2021',
        xaxis=dict(
            rangeselector=dict(
                buttons=list([
                    dict(count=1,
                         label='1d',
                         step='day',
                         stepmode='backward'),
                    dict(count=6,
                         label='6d',
                         step='day',
                         stepmode='backward'),
                    dict(step='all')
                ])
            ),
            rangeslider=dict(
                visible = True
            ),
            type='date'
        )
)

    # Affichage (version 4 de plotly nécessaire pour Colab)
fig = go.Figure(
        data=data_prt,
        layout=layout)
    
fig.show()

#### Bonus

Calculer la moyenne glissante sur 7 jours par categorie de titre pour réduire les variations hebdomadaires.

In [None]:
rolling_prt = (validations_ddf
 .filter((validations_ddf.categorie_titre == "NAVIGO") | (validations_ddf.categorie_titre == "IMAGINE R"))
 .groupBy('JOUR').agg(F.sum('NB_VALD').alias('nb_vald_sumcum_byday'))
 .toPandas()
)

rolling_prt = rolling_prt.sort_values('JOUR', ascending=True)
rolling_prt["MA"] = rolling_prt['nb_vald_sumcum_byday'].rolling(window=7, center = False).mean()
rolling_qui_glisse = [go.Scatter(x = rolling_prt.sort_values('JOUR')['JOUR'],
                         y = rolling_prt.sort_values('JOUR')['MA'])]


    # Disposition (layout)
layout = dict(
        title='Moyenne glissante par période de 7 jours avec rolling',
        xaxis=dict(
            rangeselector=dict(
                buttons=list([
                    dict(count=1,
                         label='1d',
                         step='day',
                         stepmode='backward'),
                    dict(count=6,
                         label='6d',
                         step='day',
                         stepmode='backward'),
                    dict(step='all')
                ])
            ),
            rangeslider=dict(
                visible = True
            ),
            type='date'
        )
)

    # Affichage (version 4 de plotly nécessaire pour Colab)
fig = go.Figure(
        data=rolling_qui_glisse,
        layout=layout)
    
fig.show()

# Modélisation avec Apache Spark

On essaie de faire un modèle basique de prévision du trafic dans les 7 prochains jours, pour une station.

Apache Spark MLlib n'intègre pas de modèle pour les séries chronologiques.

L'approche classique est alors d'utiliser une technique de régression classique (régression linéaire bien sûr, mais aussi RandomForestRegressor par exemple).

Pour une première version simple, utiliser un vecteur constituer des validations sur les 14 jours précédents (X) pour prédire les validations du jour (y). Dans cette version, on utilisera une `LinearRegression` ou un `RandomForestRegressor`, au choix.

Le code doit comporter :
- la préparation des _features_ (X)
- la constitution d'un ensemble d'apprentissage et de test
- l'entrainement d'un modèle
- le mesure de la performance du modèle : RMSE

Rappel : ne travailler que sur les deux catégories de titre principales.

## Première version simple 

Hypothèses : 
0n filtre le jeu de données sur les 2 principales catégories de titres et sur une seule station (via "LIBELLE_ARRET"). 
On somme le nombre de validation par jour (dans le cas où il y a plusieurs arrets dans une même station)

In [None]:
import pyspark.sql.functions as F 
first_one = (validations_ddf
 .filter((validations_ddf.categorie_titre == "NAVIGO") | (validations_ddf.categorie_titre == "IMAGINE R"))
 .filter(validations_ddf.libelle_arret == "GARE DE LYON")
 .groupBy('jour').agg(F.sum('nb_vald').alias('tot_vald'))
 .sort('jour', ascending = True)
 .toPandas()
)

first_one.head()

#### Préparations des features (X)



*   On construit un jeu de données avec 14 colonnes représentant le nombre total de validations pour une station les 14 jours précédents la variable à prédire (dans la première colonne, la valeur de la veille de la variable dépendante).
*   Le premier élément du vecteur à prédire est la première observation avec un historique complet de 14 jours (soit le 15 juillet 2020).

In [None]:
import numpy as np
import pandas as pd 
def rolling_window(a, window):
    shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
    strides = a.strides + (a.strides[-1],)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)


X = pd.DataFrame(rolling_window(first_one['tot_vald'].to_numpy(), 14), 
                  columns=['lag14','lag13','lag12','lag11','lag10',
                           'lag9','lag8','lag7','lag6','lag5',
                           'lag4','lag3','lag2','lag1'])

X = X[['lag1','lag2','lag3','lag4','lag5',
         'lag6','lag7','lag8','lag9','lag10',
         'lag11','lag12','lag13','lag14']]
X = X.drop(index=321)
X.head()

In [None]:
dep_mask = (first_one['jour'] > "2020-07-14") & (first_one['jour'] <= "2021-05-31")
y = first_one.loc[dep_mask]
y = y[["tot_vald"]]
y = y.reset_index()
y = y.drop(columns='index')
y.head()

In [None]:
first_one.shape, y.shape, X.shape

#### Constitution d'ensembles d'apprentissage et de test 

On conserve les deux premiers tiers du jeu de données pour entrainer le modèle, le dernier tier fait office d'échantillon test.

In [None]:
Xtrain = X.iloc[:224]
Xtest = X.iloc[224:]
ytrain = y.iloc[:224]
ytest = y.iloc[224:]

y_test = ytest.reset_index()
y_test = y_test.drop(columns='index')
Xtrain.shape, ytrain.shape, Xtest.shape, ytest.shape

#### Entrainement du modèle 

On entraine le modèle via une régression linéaire et une régression par forêts aléatoires

In [None]:
from sklearn.linear_model import LinearRegression

linear_model = LinearRegression()
lm = linear_model.fit(X=Xtrain, y=ytrain)
y_pred = lm.predict(Xtest)
lm.coef_, lm.intercept_