In [None]:
import requests
import os

# The relative path of the folder containing your data
path_data = './data'
filename = 'usagers-2022.csv'
url = 'https://www.data.gouv.fr/fr/datasets/r/62c20524-d442-46f5-bfd8-982c59763ec8'

def load_file(filename, path_data, url):
    filepath = os.path.join(path_data, filename)
    if os.path.exists(filepath):
        print(f'The file {filename} already exists in folder {path_data}/.')
    else:
        r = requests.get(url)
        with open(filepath, 'wb') as f:
            f.write(r.content)
        print(f'Downloaded file {filename} in folder {path_data}/.')

In [None]:
# The relative path of the folder containing your data
path_data = './data'
filename = 'usagers-2022.csv'
url = 'https://www.data.gouv.fr/fr/datasets/r/62c20524-d442-46f5-bfd8-982c59763ec8'

load_file(filename, path_data, url)

In [None]:
filename = 'vehicules-2022.csv'
url = 'https://www.data.gouv.fr/fr/datasets/r/c9742921-4427-41e5-81bc-f13af8bc31a0'

load_file(filename, path_data, url)

In [None]:
filename = 'carcteristiques-2022.csv'
url = 'https://www.data.gouv.fr/fr/datasets/r/5fc299c0-4598-4c29-b74c-6a67b0cc27e7'


load_file(filename, path_data, url)

In [None]:
filename = 'lieux-2022.csv'
url = 'https://www.data.gouv.fr/fr/datasets/r/a6ef711a-1f03-44cb-921a-0ce8ec975995'

load_file(filename, path_data, url)

In [None]:
filename = 'usagers-2021.csv'
url = 'https://www.data.gouv.fr/fr/datasets/r/ba5a1956-7e82-41b7-a602-89d7dd484d7a'

load_file(filename, path_data, url)

In [None]:
filename = 'vehicules-2021.csv'
url = 'https://www.data.gouv.fr/fr/datasets/r/0bb5953a-25d8-46f8-8c25-b5c2f5ba905e'

load_file(filename, path_data, url)


In [None]:
filename = 'lieux-2021.csv'
url = 'https://www.data.gouv.fr/fr/datasets/r/8a4935aa-38cd-43af-bf10-0209d6d17434'

load_file(filename, path_data, url)


In [None]:
filename = 'carcteristiques-2021.csv'
url = 'https://www.data.gouv.fr/fr/datasets/r/85cfdc0c-23e4-4674-9bcd-79a970d7269b'

load_file(filename, path_data, url)


## Initialisation de la Session Spark

In [None]:
import numpy as np
import pandas as pd
import pyspark.pandas as ps

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField



types = ['usagers', 'lieux', 'vehicules', 'carcteristiques']
spark = SparkSession.builder.getOrCreate()

## Librairie de fonction utiles au nettoyage

In [None]:

from pyspark.sql.functions import *
import re

def bad_float_convertion(x, col):
    dk = x.withColumn(col, regexp_replace(col, ",", "."))
    dk = dk.withColumn(col, dk[col].cast(DoubleType()))
    return dk

def bad_date_convertion(x, col_day, col_month, col_year, col_hour):
    dk = x.withColumn("date_str",
                        concat(x[col_year], lit("-"), x[col_month], lit("-"), x[col_day], lit(" "), x[col_hour]))   
    dk = dk.withColumn("date_complete", to_timestamp(dk["date_str"], "yyyy-MM-dd HH:mm"))
    dk = dk.drop("date_str")
    dk = dk.drop(col_year)
    dk = dk.drop(col_month)
    dk = dk.drop(col_day)
    dk = dk.drop(col_hour)
    return dk   

def replace_with (df, column, value, new_value):
    return df.withColumn(column, when(col(column).isNull() | (col(column) == value), new_value).otherwise(col(column)))

def replace_empty_with_value(df, column, new_value):
    return replace_with(df, column, "", new_value)

def replace_empty_with_zero(df, value):
    return replace_empty_with_value(df, value, "0")

def replace_empty_with_null(df, column_name):
    return replace_empty_with_value(df, column_name, None)

def replace_minus_one_with_null(df, column_name):
    return replace_with(df, column_name, "-1", None)

def trim_cast_integer(df, column):
    return df.withColumn(column, trim(df[column]).cast(IntegerType()))

def trim_replace_minus_one (df, column):
    res = trim_cast_integer(df, column)
    res = replace_minus_one_with_null(res, column)
    return res

def get_type_columns(df , type_):
    integer_columns = [col_name for col_name, col_type in df.dtypes if col_type == type_]
    return integer_columns

def get_integer_columns(df):
    return get_type_columns(df, "int")

def remove_spaces(s):
    return re.sub(r'\s+', '', s)

def get_unique_column_values(df, column_name):
   
    unique_values_df = df.select(column_name).distinct()
    unique_values_list = [row[column_name] for row in unique_values_df.collect()]
    return unique_values_list

def show_unique_column_values(df, column_name):
    unique_num_acc_values = df.select(column_name).distinct().rdd.flatMap(lambda x: x).collect()
    print("unique :", unique_num_acc_values)

def remove_spaces_column(df, column_name):
    remove_spaces_udf = udf(remove_spaces, StringType())
    return df.withColumn(column_name, remove_spaces_udf(df[column_name]).cast(LongType()))

spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")



## Constructeur de dataframes "caracteristiques"

Note: les variables qui sont soit une valeur soit une valeur vide sont directement converti.
Mais les variables catégoriques sont nettoyé (truncate et -1 ou "" changé en NULL par exemple) et puis converti. Une valeur hors des catégories spécifiées peuvent être intéressantes à étudier. 

In [None]:
def caracteristique_df(path):
    caracteristique_schema = StructType([
    StructField("Num_Acc", LongType(), False),
    StructField("jour", StringType(), False),
    StructField("mois", StringType(), False),
    StructField("an", StringType(), False),  
    StructField("hrmn", StringType(), False),  
    StructField("lum", StringType(), True),  
    StructField("dep", StringType(), True),  
    StructField("com", StringType(), True),      
    StructField("agg", IntegerType(), True), 
    StructField("int", IntegerType(), True),  
    StructField("atm", IntegerType(), True),  
    StructField("col", StringType(), True),      
    StructField("adr", StringType(), True),
    StructField("lat", StringType(), True),
    StructField("long", StringType(), True)
    ])
    


    df_caract = spark.read.csv(path, header=True, sep=';', schema=caracteristique_schema)

    df_caract = bad_float_convertion(df_caract, 'lat')
    df_caract = bad_float_convertion(df_caract, 'long')
    df_caract = bad_date_convertion(df_caract, "jour", "mois", "an", "hrmn")
    df_caract = df_caract.withColumn("agg", 
                                           when(col("agg") == 1, True)
                                           .when(col("agg") == 2, False)
                                           .cast(BooleanType()))
    
    dk = get_integer_columns(df_caract)
    for column in dk:
        df_caract = replace_minus_one_with_null(df_caract, column)
    df_caract = trim_replace_minus_one(df_caract, "col")

    return df_caract

df_caract = caracteristique_df('./data/carcteristiques-2022.csv')
df_caract.show(5)
df_caract.dtypes

## Constructeur de dataframes "usagers"

In [None]:
def usager_df(path):

    schema_usager = StructType([
        StructField("Num_Acc", LongType(), True),
        StructField("id_usager", StringType(), False),
        StructField("id_vehicule", StringType(), False),
        StructField("num_veh", StringType(), False),
        StructField("place", IntegerType(), False),
        StructField("catu", StringType(), False),
        StructField("grav", StringType(), False),
        StructField("sexe", StringType(), False),
        StructField("an_nais", IntegerType(), False),
        StructField("trajet", StringType(), True),
        StructField("secu1", StringType(), True),
        StructField("secu2", StringType(), True),
        StructField("secu3", StringType(), True),
        StructField("locp", StringType(), True),
        StructField("actp", StringType(), True),
        StructField("etatp", StringType(), True)
    ])

    df_usager = spark.read.csv(path, header=True, sep=';', schema=schema_usager) 
    df_usager = df_usager.withColumn("sexe", 
                                            when(col("sexe") == '1', 0)
                                            .when(col("sexe") == '2', 1)
                                            .cast(IntegerType()))

    to_remove_space = ["id_usager", "id_vehicule"]
    for column in to_remove_space:
        df_usager = remove_spaces_column(df_usager, column)

    to_trim_replace_minus_one = ["secu1", "secu2", "trajet", "locp", "actp", "etatp", "secu3", "grav"]
    for column in to_trim_replace_minus_one:
        df_usager = trim_replace_minus_one(df_usager, column)
    
    return df_usager


df_usager = usager_df('./data/usagers-2022.csv')
df_usager.show(5)
df_usager.dtypes


## Constructeur de dataframes "lieux"

In [None]:
def lieux_df(path):
    schema_lieux = StructType([
        StructField("Num_Acc", StringType(), True),
        StructField("catr", StringType(), True),
        StructField("voie", StringType(), True),
        StructField("v1", StringType(), True),
        StructField("v2", StringType(), True),
        StructField("circ", StringType(), True),
        StructField("nbv", StringType(), True),
        StructField("vosp", StringType(), True),
        StructField("prof", StringType(), True),
        StructField("pr", StringType(), True),
        StructField("pr1", StringType(), True),
        StructField("plan", StringType(), True),
        StructField("lartpc", IntegerType(), True),
        StructField("larrout", IntegerType(), True),
        StructField("surf", StringType(), True),
        StructField("infra", StringType(), True),
        StructField("situ", StringType(), True),
        StructField("vma", StringType(), True)
    ])

    df_lieux = spark.read.csv(path, header=True, sep=';', schema=schema_lieux)
    to_replace_empty = ["voie", "v1", "v2", "lartpc"]
    for column in to_replace_empty:
        df_lieux = replace_empty_with_null(df_lieux, column)
        df_lieux = replace_with(df_lieux, column, "N/A", None)

    to_trim_replace_minus_one = ["catr", "circ", "nbv", "vosp", "prof", "pr", "pr1", "plan", "surf", "infra", "situ", "vma", "v1"]
    for column in to_trim_replace_minus_one:
        df_lieux = trim_replace_minus_one(df_lieux, column)

    return df_lieux

df_lieux = lieux_df('./data/lieux-2022.csv')
df_lieux.show(200)
df_lieux.dtypes



## Constructeur de dataframes "vehicule"

In [None]:


def vehicule_df(path):
    schema_vehicule = StructType([
        StructField("Num_Acc", LongType(), True),
        StructField("id_vehicule", StringType(), False),
        StructField("num_veh", StringType(), False),
        StructField("senc", IntegerType(), False),
        StructField("catv", IntegerType(), False),
        StructField("obs", IntegerType(), False),
        StructField("obsm", IntegerType(), False),
        StructField("choc", IntegerType(), False),
        StructField("manv", IntegerType(), False),
        StructField("motor", IntegerType(), False),
        StructField("occutc", StringType(), False),
    ])

    df_veh = spark.read.csv(path, header=True, sep=';', schema=schema_vehicule)
    df_veh = replace_empty_with_zero(df_veh, "occutc")
    df_veh = df_veh.withColumn("occutc", df_veh["occutc"].cast(IntegerType()))     
    df_veh = remove_spaces_column(df_veh, "id_vehicule")
    
    return df_veh


df_veh = vehicule_df('./data/vehicules-2022.csv')
df_veh.show(5)
df_veh.dtypes

## Construction tableau data_frames gardant les dataframes

In [None]:
years = ['2021', '2022']
data_frames = {}
for y in years:
    for type_ in types:
        df_name = type_ + ('-') + y
        filename = df_name + '.csv'
        filepath = os.path.join(path_data, filename)
        prefix = df_name.split('-')[0]
        if prefix == 'usagers':
            data_frames [df_name] = usager_df(filepath)
        elif prefix == 'lieux':
            data_frames [df_name] = lieux_df(filepath)
        elif prefix == 'vehicules':
            data_frames [df_name] = vehicule_df(filepath)
        elif prefix == 'carcteristiques':
            data_frames [df_name] = caracteristique_df(filepath)

## Chargement des dataframes union de 2021-2O22

In [None]:
lieux_22 = data_frames['lieux-2022']
lieux_21 = data_frames['lieux-2021']

lieux = lieux_21.union(lieux_22)

In [None]:
usagers_22 = data_frames['usagers-2022']
usagers_21 = data_frames['usagers-2021']

usagers = usagers_21.union(usagers_22)

In [None]:
vehicules_22 = data_frames['vehicules-2022']
vehicules_21 = data_frames['vehicules-2021']

vehicules = vehicules_21.union(vehicules_22)

In [None]:
caracteristiques_22 = data_frames['carcteristiques-2022']
caracteristiques_21 = data_frames['carcteristiques-2021']

caracteristiques = caracteristiques_21.union(caracteristiques_22)

In [None]:
def compute_statistics(df, numerical_columns):
    statistics = {}
    for col_name in numerical_columns:
        column_stats = {}
        column_stats['Mean'] = df.selectExpr(f'avg({col_name})').collect()[0][0]
        column_stats['Median'] = df.approxQuantile(col_name, [0.5], 0.01)[0]
        column_stats['Q1'], column_stats['Q3'] = df.approxQuantile(col_name, [0.25, 0.75], 0.01)
        column_stats['Standard Deviation'] = df.selectExpr(f'stddev_pop({col_name})').collect()[0][0]
        column_stats['Skewness'] = df.selectExpr(f'skewness({col_name})').collect()[0][0]
        column_stats['Kurtosis'] = df.selectExpr(f'kurtosis({col_name})').collect()[0][0]
        column_stats['IQR'] = column_stats['Q3'] - column_stats['Q1']
        statistics[col_name] = column_stats

    return statistics

def show_statistics(statistics):
    print("Statistiques des colonnes numériques:")
    for col_name, stats in statistics.items():
        print(f"Colonne: {col_name}")
        for stat_name, value in stats.items():
            print(f"{stat_name}: {value}")
        print()

In [None]:
usagers_statistics = compute_statistics(usagers,['trajet'])

In [None]:
show_statistics(usagers_statistics)

In [None]:
import plotly.graph_objects as go

# Extract column names and statistics
def plot_statistics(statistics):
    column_names = list(statistics.keys())
    mean_values = [stats['Mean'] for stats in statistics.values()]
    median_values = [stats['Median'] for stats in statistics.values()]
    q1_values = [stats['Q1'] for stats in statistics.values()]
    q3_values = [stats['Q3'] for stats in statistics.values()]
    std_dev_values = [stats['Standard Deviation'] for stats in statistics.values()]
    skewness_values = [stats['Skewness'] for stats in statistics.values()]
    kurtosis_values = [stats['Kurtosis'] for stats in statistics.values()]
    iqr_values = [stats['IQR'] for stats in statistics.values()]

    # Create traces
    mean_trace = go.Bar(x=column_names, y=mean_values, name='Mean')
    median_trace = go.Bar(x=column_names, y=median_values, name='Median')
    q1_trace = go.Bar(x=column_names,y=q1_values, name='Q1')
    q3_trace = go.Bar(x=column_names,y=q3_values, name='Q3')
    std_dev_trace = go.Bar(x=column_names,y=std_dev_values, name='Standard Deviation')
    skewness_trace = go.Bar(x=column_names,y=skewness_values, name='Skewness')
    kurtosis_trace = go.Bar(x=column_names,y=kurtosis_values, name='Kurtosis')
    iqr_trace = go.Bar(x=column_names,y=iqr_values, name='IQR')

    # Create layout
    layout = go.Layout(title='Box Plot des colonnes numériques', xaxis=dict(title='Colonne'),
                   yaxis=dict(title='Valeur'))

    # Create figure
    fig = go.Figure(data=[mean_trace, median_trace, q1_trace, q3_trace, std_dev_trace, skewness_trace, kurtosis_trace, iqr_trace], layout=layout)

    # Show the plot
    fig.show()

In [None]:
lieux_statistics = compute_statistics(lieux,['pr1'])
plot_statistics(lieux_statistics)

## Heatmap de la répartition des accidents de la route selon l'heure et le jour de la semaine

In [None]:
import plotly.express as px 


df_c = caracteristiques
df_v = vehicules.select("Num_Acc", "obsm")
df_pieton = df_v.where(col("obsm") == 1)


df_caracteristique_time = df_c.withColumn("day", dayofweek("date_complete")).withColumn("hour", hour("date_complete")).withColumn("minute", minute("date_complete")).withColumn("month", month("date_complete")).select("Num_Acc","day", "hour", "minute", "month")
df_pieton = df_pieton.join(df_caracteristique_time, "Num_Acc")


name_pieton = 'Répartition des accidents impliquant des piétons par jour de la semaine et heure de la journée'

def plot_day(df, name):
    df_pd = df.toPandas()

    fig = px.density_heatmap(df_pd, x='hour', y='day',
                            title=name,
                            labels={'hour': 'Heure de la journée', 'day': 'Jour de la semaine'},
                            nbinsx=24, nbinsy=7, marginal_x='histogram', marginal_y='histogram')

    jour_labels = {1: 'Lundi', 2: 'Mardi', 3: 'Mercredi', 4: 'Jeudi', 5: 'Vendredi', 6: 'Samedi', 7: 'Dimanche'}
    fig.update_yaxes(ticktext=[jour_labels[i] for i in range(1, 8)], tickvals=list(range(1, 8)))

    fig.show()



plot_day(df_pieton, name_pieton)


## Graphique de la répartition des accidents sur les de l'année

In [None]:
def plot_month(df):
   df_monthly = df.groupBy('month').count().orderBy('month')
   month_names = {
      1: 'Janvier', 2: 'Février', 3: 'Mars', 4: 'Avril', 5: 'Mai', 6: 'Juin',
      7: 'Juillet', 8: 'Août', 9: 'Septembre', 10: 'Octobre', 11: 'Novembre', 12: 'Décembre'
   }
 
   df_monthly_pd = df_monthly.toPandas()

   df_monthly_pd['month'] = df_monthly_pd['month'].map(month_names)

   
   fig = px.line(df_monthly_pd, x='month', y='count',
               title='Répartition des accidents par mois',
               labels={'month': 'Mois', 'count': 'Nombre d\'accidents'})
   fig.show()

plot_month(df_caracteristique_time)



## Graphique de la distribution de l'âge des usagers qui ont eu un accident dans une route de métropole

In [None]:

annee_acc = caracteristiques.select("Num_Acc", "date_complete").withColumn("year", year("date_complete")).drop("date_complete")

joined_usagers_lieux_df = usagers.join(lieux, 'Num_Acc')
usagers_urbains = joined_usagers_lieux_df.select("Num_Acc", "id_usager", "sexe","An_nais").where("catr=7")
usagers_urbains = usagers_urbains.join(annee_acc, "Num_Acc")
usagers_urbains = usagers_urbains.withColumn("age", usagers_urbains["year"] - usagers_urbains["An_nais"])
usagers_urbains.show(5)

title = 'Distribution de l\'âge des usagers qui ont eu un accident dans une route de métropole'

def plot_age(df, title):
    df_pd = df.toPandas()
    fig = px.histogram(df_pd, x='age', title=title, labels={'age': 'Âge'})
    fig.show()

plot_age(usagers_urbains, title)

## Distribution de l'âge des usagers qui ont eu un accident dans les autres types de route

In [None]:
usagers_non_urbains = joined_usagers_lieux_df.select("Num_Acc", "id_usager", "sexe","An_nais").where("catr!=7")
usagers_non_urbains = usagers_non_urbains.join(annee_acc, "Num_Acc")
usagers_non_urbains = usagers_non_urbains.withColumn("age", usagers_non_urbains["year"] - usagers_non_urbains["An_nais"])
title = 'Distribution de l\'âge des usagers qui ont eu un accident dans les autres types de route'
plot_age(usagers_non_urbains, title)

## Conclusion
- Les jeunes dans la vingtaine sont toujours les plus touché par les accidents

## Révisions des noms et des colonnes inutiles pour sauvegarde

In [None]:
def clean_names_caract(df):
    df = df.withColumnRenamed("lum", "lumière")
    df = df.withColumnRenamed("dep", "département")
    df = df.withColumnRenamed("com", "commune")
    df = df.withColumnRenamed("agg", "est_agglomération")
    df = df.withColumnRenamed("int", "intersection")
    df = df.withColumnRenamed("atm", "conditions_météorologiques")
    df = df.withColumnRenamed("col", "collision")
    df = df.withColumnRenamed("adr", "lieu")
    df = df.withColumnRenamed("lat", "latitude")
    df = df.withColumnRenamed("long", "longitude")
    return df

def clean_names_usagers(df):
    df = df.withColumnRenamed("num_veh", "numéro_vehicule")
    df = df.withColumnRenamed("catu", "catégorie_usager")
    df = df.withColumnRenamed("grav", "gravité")
    df = df.withColumnRenamed("an_nais", "année_naissance")
    df = df.withColumnRenamed("secu1", "sécurité_1")
    df = df.withColumnRenamed("secu2", "sécurité_2")
    df = df.withColumnRenamed("secu3", "sécurité_3")
    df = df.withColumnRenamed("locp", "localisation_piéton")
    df = df.withColumnRenamed("actp", "action_piéton")
    df = df.withColumnRenamed("etatp", "état_piéton")
    return df

def clean_names_lieux(df):
    df = df.withColumnRenamed("catr", "catégorie_route")
    df = df.withColumnRenamed("circ", "circulation")
    df = df.withColumnRenamed("nbv", "nombre_voies")
    df = df.withColumnRenamed("vosp", "voie_réservée")
    df = df.withColumnRenamed("prof", "profil_route")
    df = df.withColumnRenamed("pr", "numéro_pr")
    df = df.withColumnRenamed("pr1", "indice_pr")
    df = df.withColumnRenamed("plan", "tracé_route")
    df = df.withColumnRenamed("lartpc", "largeur_terre_plein_central")
    df = df.withColumnRenamed("larrout", "largeur_route")
    df = df.withColumnRenamed("surf", "surface_route")
    df = df.withColumnRenamed("infra", "infrastructure")
    df = df.withColumnRenamed("situ", "situation_accident")  
    df = df.withColumnRenamed("vma", "vitesse_max_autorisée")
    return df


def clean_names_vehicules(df):
    df = df.withColumnRenamed("num_veh", "numéro_vehicule")
    df = df.withColumnRenamed("senc", "sens_circulation")
    df = df.withColumnRenamed("catv", "catégorie_vehicule")
    df = df.withColumnRenamed("obs", "obstacle_fixe_heurté")
    df = df.withColumnRenamed("obsm", "obstacle_mobile_heurté")
    df = df.withColumnRenamed("choc", "point_choc_initial")
    df = df.withColumnRenamed("manv", "manoeuvre_principale_avant_accident")
    df = df.withColumnRenamed("motor", "type_motorisation")
    df = df.withColumnRenamed("occutc", "nombre_occupants")
    return df


def caracteristique_df_trunc_useless (df):
    df = df.drop("adr")
    df = df.drop("lat")
    df = df.drop("long")
    df = df.drop("agg")
    df = df.drop("dep")
    df = df.drop("com")
    return df

def usagers_df_trunc_useless (df):
    df = df.drop("num_veh")
    return df

def caracteristique_df_trunc(df):
    df = df.drop("date_complete").drop("lum").drop("int").drop("atm").drop("col").drop("adr")
    return df

def info_lieux (df_lieux, df_caract):
    df = df_lieux.join(caracteristique_df_trunc(df_caract), "Num_Acc")
    return df




## Création des nouveaux dataframes à sauvegarder

In [None]:
df_new_l = info_lieux(lieux, caracteristiques)
df_new_l = clean_names_lieux(df_new_l)
df_new_l.show(5)

df_new_c = caracteristique_df_trunc_useless(caracteristiques)
df_new_c = clean_names_caract(df_new_c) 
df_new_c.show(5)

df_new_u = usagers_df_trunc_useless(usagers)
df_new_u = clean_names_usagers(df_new_u)
df_new_u.show(5)

df_new_v = clean_names_vehicules(vehicules)
df_new_v.show(5)

## Choix Partitionnement 

Le partitionnement reste le même, mais "caractéristiques" donne des colonnes à lieux. 
Maintenant:
- "caractéristiques" donne l'environnement et la date de l'accident.
- "lieux" donne l'état du lieu à l'instant de l'accident ainsi que les détails à l'endroit précis (sur une rue on peut avoir deux dénivelés différents et on enlève cette ambiguité avec la latitude et la longitude)

"caractérisques" et lieux ne peuvent donc pas être fusionnés car plusieurs accident peuvent survenir dans le même endroit.

"usagers" ne peut pas être fusionner avec caractérisque car un id_usager peut être associé au même Num_Acc, on évitera donc de répéter des lignes avec le join. 

"vehicules" devrait rester seul car cela permettra d'avoir les informations sur le véhicule et son comportement lors de l'accident. 
De plus, il est possible d'avoir plusieurs accidents avec le même véhicule dans la même année. Cependant "usagers" perd num_veh car on associe déjà avec le couple (Num_Acc, id_vehicule).

"vehicules" et "usagers" gardent leur colonne id_vehicule car dans un accident on peut avoir plusieurs véhicules avec plusieurs usagers.

Tous les dataframes sont l'union de l'année 2021 et 2022. 

## Création de sauvegardes Parquet

In [None]:
from pathlib import Path
import shutil
def create_save_parquet(df, filename):
    if Path(filename).exists():
        shutil.rmtree(filename)
        df.write.parquet(filename)
        print("Fichier Parquet remplacé avec succès.")
    else:
        df.write.parquet(filename)
        print("Fichier Parquet créé avec succès.")

create_save_parquet(df_new_l, './saves/lieux.parquet')
create_save_parquet(df_new_c, './saves/caracteristiques.parquet')
create_save_parquet(df_new_u, './saves/usagers.parquet')
create_save_parquet(df_new_v, './saves/vehicules.parquet')
