In [118]:
import luigi
import pandas as pd
import os


## Création du dossier pour stocker les données nettoyées

In [119]:
# Vérifiez si le dossier n'existe pas déjà

dossier = 'CleanData'

if not os.path.exists(dossier):
    # Créez le dossier
    os.makedirs(dossier, mode=0o777)
    os.makedirs(os.path.join(dossier, "2017"), mode=0o777)
    os.makedirs(os.path.join(dossier, "2022"), mode=0o777)
    print("Dossier créé avec succès.")


Dossier créé avec succès.


## Extraire les données sur la délinquance

In [120]:
class ExtractCommunes(luigi.Task):
    file_path = luigi.Parameter()
    file_path1 = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget("./CleanData/2017/communes.csv")


    def complete(self):
        return self.output().exists()
    
    def run(self):
        
        
        # Chargement des dataset
        communes = pd.read_csv(self.file_path, sep=",", low_memory=False)
        df_partis = pd.read_csv('./RawData/Partis.csv', sep=",")

        # Sélectionner les colonnes nécessaires
        colonnes = ['Code du département', 'Libellé du département', 'Code de la commune', 'Libellé de la commune', 'Inscrits', 'Abstentions', '% Abs/Ins', 'Votants', '% Vot/Ins', 'Blancs', '% Blancs/Ins', '% Blancs/Vot', 'Nuls', '% Nuls/Ins', '% Nuls/Vot', 'Exprimés', '% Exp/Ins', '% Exp/Vot']

        # Créer un nouveau DataFrame pour les candidats distincts
        df_candidats = pd.DataFrame()

        # Parcourir les colonnes par groupes de 8 (car chaque candidat occupe 8 colonnes)
        for i in range(18, len(communes.columns), 7):
            # Sélectionner les colonnes du candidat actuel
            colonnes_candidat = communes.columns[i:i+7]

            # Renommer les colonnes du candidat
            df_candidat = communes[colonnes_candidat].copy()
            
            df_candidat = pd.DataFrame(df_candidat.values, columns=['N°Panneau','Sexe','Nom','Prenom','Voix','% Voix/Ins','% Voix/Exp'])

            # Ajouter les colonnes supplémentaires (Code du département, Libellé du département, etc.)
            df_candidat = pd.concat([communes[colonnes], df_candidat], axis=1)            
            
            # Ajouter les données du candidat au DataFrame des candidats distincts
            df_candidats = pd.concat([df_candidats, df_candidat])

        df_candidats = pd.merge(df_candidats, df_partis, on=['Nom', 'Prenom'], how='left')

        df_candidats = df_candidats.drop(["Abstentions","% Abs/Ins","Votants","% Vot/Ins","% Blancs/Ins","% Blancs/Vot","% Nuls/Ins","% Nuls/Vot","Exprimés","% Exp/Ins","% Exp/Vot", "N°Panneau","Sexe","Nom","Prenom", "% Voix/Ins","% Voix/Exp"], axis=1)
        # Réinitialiser les index du DataFrame des candidats distincts
        df_candidats = df_candidats.reset_index(drop=True)
        # Enregistrer le DataFrame des candidats distincts dans un nouveau fichier CSV
        df_candidats.to_csv(self.output().path, index=False)




In [121]:
class ExtractRSA(luigi.Task):
    file_path = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget("./CleanData/2017/rsa.csv")


    def complete(self):
        return self.output().exists()
    
    def run(self):
        
        # Chargement des dataset
        rsa = pd.read_csv(self.file_path, sep=",")

        #Filtre sur l'année 2017
        rsa = rsa.query('an == 2017')

        #Suppresion des valeurs null
        rsa = rsa.dropna()

        #renomes les colonnes
        rsa = rsa.rename(columns={"libgeo": "Libellé de la commune", 'alloc_rsa_com' : 'rsa'})
        
        median = rsa['rsa'].median()
        iqr = rsa['rsa'].quantile(0.75) - rsa['rsa'].quantile(0.25)
        lower_bound = rsa['rsa'].quantile(0.25) - (1.5 * iqr)
        upper_bound = rsa['rsa'].quantile(0.75) + (1.5 * iqr)

        rsa = rsa[(rsa['rsa'] >= lower_bound) & (rsa['rsa'] <= upper_bound)]
        
        rsa = rsa.drop(columns={"an"})
        
        rsa.to_csv(self.output().path, index=False)


In [122]:
class MergeCommuneRSA(luigi.Task):
    file_path = luigi.Parameter()
    file_path1 = luigi.Parameter()

    def requires(self):
        return [ExtractCommunes(file_path="./RawData/communes.csv", file_path1="./RawData/partis.csv"),
                ExtractRSA(file_path="./RawData/rsa.csv")]
    
    def output(self):
        return luigi.LocalTarget("./CleanData/2017/data.csv")


    def complete(self):
        return self.output().exists()
    
    def run(self):
        
        # Chargement des dataset
        rsa = pd.read_csv(self.file_path, sep=",")
        communes = pd.read_csv(self.file_path1, sep=",")

        merge_data = pd.merge(communes, rsa, on='Libellé de la commune', how='inner')

        merge_data.to_csv(self.output().path, index=False)

In [123]:
class ExtractRevenusPauvretes(luigi.Task):
    file_path = luigi.Parameter()

    
    def output(self):
        return luigi.LocalTarget("./CleanData/2017/revenus_pauvretes.csv")


    def complete(self):
        return self.output().exists()
    
    def run(self):
        
        # Chargement des dataset
        revenus = pd.read_csv(self.file_path, sep=";")

        #renomes les colonnes
        revenus = revenus.rename(columns={"CODGEO": "codgeo", "MED17" : "Niveau de vie", "NBMENFISC17" : "Nombre de ménages fiscaux"})
        
        #Suppresion des valeurs null
        revenus = revenus.dropna(subset=["Niveau de vie", "Nombre de ménages fiscaux"])

        colonnes = ["codgeo","Niveau de vie", "Nombre de ménages fiscaux"]
        revenus = revenus[colonnes]
        
        revenus.to_csv(self.output().path, index=False)


In [124]:
class MergeRevenusCommunes(luigi.Task):
    file_path = luigi.Parameter()
    file_path1 = luigi.Parameter()

    def requires(self):
        return [MergeCommuneRSA(file_path="./CleanData/2017/communes.csv", file_path1="./CleanData/2017/rsa.csv"), ExtractRevenusPauvretes(file_path="./RawData/revenus_pauvretes.csv")]
    
    def output(self):
        return luigi.LocalTarget("./CleanData/2017/data.csv")


    def complete(self):
        return self.output().exists()
    
    def run(self):
        
        # Chargement des dataset
        revenus = pd.read_csv(self.file_path, sep=",")
        communes = pd.read_csv(self.file_path1, sep=",")

        merge_data = pd.merge(communes, revenus, on='codgeo', how='inner')

        merge_data.to_csv(self.output().path, index=False)

In [125]:
class ExtractEmploi(luigi.Task):
    file_path = luigi.Parameter()

    
    def output(self):
        return luigi.LocalTarget("./CleanData/2017/emploi.csv")


    def complete(self):
        return self.output().exists()
    
    def run(self):
        
        # Chargement des dataset
        emploi = pd.read_csv(self.file_path, sep=";")

        #renomes les colonnes
        emploi = emploi.rename(columns={"CODGEO": "codgeo", "P17_CHOM1564" : "Nombre de chômeurs de 15 à 64 ans"})
        
        #Suppresion des valeurs null
        emploi = emploi.dropna(subset=["Nombre de chômeurs de 15 à 64 ans"])

        colonnes = ["codgeo", "Nombre de chômeurs de 15 à 64 ans"]
        emploi = emploi[colonnes]
        
        emploi.to_csv(self.output().path, index=False)


In [126]:
class MergeEmloiCommunes(luigi.Task):
    file_path = luigi.Parameter()
    file_path1 = luigi.Parameter()

    def requires(self):
        return [MergeRevenusCommunes(file_path="./CleanData/2017/revenus_pauvretes.csv", file_path1="./CleanData/2017/data.csv"), ExtractEmploi(file_path="./RawData/emploi.csv")]
    
    def output(self):
        return luigi.LocalTarget("./CleanData/2017/data.csv")


    def complete(self):
        return self.output().exists()
    
    def run(self):
        
        # Chargement des dataset
        emploi = pd.read_csv(self.file_path, sep=",")
        communes = pd.read_csv(self.file_path1, sep=",")

        merge_data = pd.merge(communes, emploi, on='codgeo', how='inner')

        merge_data.to_csv(self.output().path, index=False)

In [127]:
class ExtractAide(luigi.Task):
    file_path = luigi.Parameter()

    
    def output(self):
        return luigi.LocalTarget("./CleanData/2017/aide.csv")


    def complete(self):
        return self.output().exists()
    
    def run(self):
        
        # Chargement des dataset
        aide = pd.read_csv(self.file_path, sep=";")

        display(aide)
        #renomes les colonnes
        aide = aide.rename(columns={"NUMCOM": "codgeo"})
        
        #Suppresion des valeurs null
        aide = aide.dropna(subset=["total_allocataires"])

        colonnes = ["codgeo", "total_allocataires"]

        aide = aide[colonnes]
        
        aide.to_csv(self.output().path, index=False)

In [128]:
class MergeAideCommunes(luigi.Task):
    file_path = luigi.Parameter()
    file_path1 = luigi.Parameter()

    def requires(self):
        return [MergeEmloiCommunes(file_path="./CleanData/2017/emploi.csv", file_path1="./CleanData/2017/data.csv"), ExtractAide(file_path="./RawData/aide.csv")]
    
    def output(self):
        return luigi.LocalTarget("./CleanData/2017/data.csv")


    def complete(self):
        return self.output().exists()
    
    def run(self):
        
        # Chargement des dataset
        aide = pd.read_csv(self.file_path, sep=",")
        communes = pd.read_csv(self.file_path1, sep=",")

        merge_data = pd.merge(communes, aide, on='codgeo', how='inner')

        merge_data.to_csv(self.output().path, index=False)

## Exécuter toutes les tâches

In [129]:
class ReadAllcommunes(luigi.Task):
    def requires(self):
        return [MergeAideCommunes(file_path="./CleanData/2017/aide.csv", file_path1="./CleanData/2017/data.csv")]
    
    def run(self):
        print("lancement")

    def output(self):
        return luigi.LocalTarget('result.txt')

In [130]:

config = luigi.configuration.get_config()
config.set('core', 'no_lock', 'False')

dossier = "./Cleancommunes/"
Restart = True

if Restart is False:
    # Parcourir tous les fichiers du dossier
    for fichier in os.listdir(dossier):
        chemin_fichier = os.path.join(dossier, fichier)
        # Supprimer le fichier
        os.remove(chemin_fichier)

luigi.build([ReadAllcommunes()], local_scheduler=False, no_lock=True)

DEBUG: Checking if ReadAllcommunes() is complete
DEBUG: Checking if MergeAideCommunes(file_path=./CleanData/2017/aide.csv, file_path1=./CleanData/2017/data.csv) is complete
INFO: Informed scheduler that task   ReadAllcommunes__99914b932b   has status   PENDING
DEBUG: Checking if MergeEmloiCommunes(file_path=./CleanData/2017/emploi.csv, file_path1=./CleanData/2017/data.csv) is complete
DEBUG: Checking if ExtractAide(file_path=./RawData/aide.csv) is complete
INFO: Informed scheduler that task   MergeAideCommunes___CleanData_2017___CleanData_2017_7995b5b5c4   has status   PENDING
INFO: Informed scheduler that task   ExtractAide___RawData_aide_c_f762f3e7ee   has status   PENDING
DEBUG: Checking if MergeRevenusCommunes(file_path=./CleanData/2017/revenus_pauvretes.csv, file_path1=./CleanData/2017/data.csv) is complete
DEBUG: Checking if ExtractEmploi(file_path=./RawData/emploi.csv) is complete
INFO: Informed scheduler that task   MergeEmloiCommunes___CleanData_2017___CleanData_2017_fe2071318

Unnamed: 0,NOMNIVGE,NUMCOM,total_allocataires,total_ALF,total_ALS,total_APL,locataire_ALF,locataire_ALS,locataire_APL,proprietaire_ALF,proprietaire_ALS,proprietaire_APL,total_locataire,total_proprietaire,total_allocataires_logement
0,NOMNIVGE,NUMCOM,total_allocataires,total_ALF,total_ALS,total_APL,locataire_ALF,locataire_ALS,locataire_APL,proprietaire_ALF,proprietaire_ALS,proprietaire_APL,total_locataire,total_proprietaire,total_allocataires_logement
1,RESIDENCE A L'ETRANGER,99999,6760,318,723,273,304,,,14,,,1294,20,1314
2,COMMUNES INCONNUES INSEE,XXXXX,5217,488,509,784,315,478,720,173,31,64,1513,268,1781
3,L'ABERGEMENT-CLEMENCIAT,1001,96,,,8,,,,,,,,,15
4,L'ABERGEMENT-DE-VAREY,1002,38,,,,,,,,,,,,6
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
35393,PAMANDZI,97615,1031,36,,,,,,,,,,,44
35394,SADA,97616,1111,,,,,,,,,,,,39
35395,TSINGONI,97617,914,,,,,,,,,,45,5,50
35396,SAINT-BARTHELEMY,97701,12,,,,,,,,,,,,


INFO: [pid 14588] Worker Worker(salt=6377367430, workers=1, host=DESKTOP-PBH195U, username=Marin, pid=14588) done      ExtractAide(file_path=./RawData/aide.csv)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   ExtractAide___RawData_aide_c_f762f3e7ee   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 9
INFO: [pid 14588] Worker Worker(salt=6377367430, workers=1, host=DESKTOP-PBH195U, username=Marin, pid=14588) running   ExtractEmploi(file_path=./RawData/emploi.csv)
  emploi = pd.read_csv(self.file_path, sep=";")
INFO: [pid 14588] Worker Worker(salt=6377367430, workers=1, host=DESKTOP-PBH195U, username=Marin, pid=14588) done      ExtractEmploi(file_path=./RawData/emploi.csv)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   ExtractEmploi___RawData_emploi_805348f591   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 8
INFO: [pid 14588] Worker Worker(s

False