# **Projet NF26 - Estimation du BGES d'une organisation**

## **0 - Préliminaire**

Notre travail préliminaire se compose des étapes suivantes :
- Mettre en place l’environnement d’exécution du programme.
- Extraire les données à partir du fichier compressé.
- Charger les modules et bibliothèques requis.

In [66]:
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [67]:
# ---------- Bibliothèques standard ---------- #
import os  # Importation standard sans alias

from collections import defaultdict  # Importation sélective de classes sans alias
from datetime import date
from functools import reduce  # Importation sélective de fonctions sans alias

# ---------- Bibliothèques tierces ---------- #
# Importation standard sans alias
import numpy
import pandas as pd
import matplotlib.pyplot as plt
import folium

# Importation sélective de classes sans alias
from geopy.geocoders import Nominatim
from geopy.location import Location
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB

# Importation sélective de modules avec alias
from pyspark.sql import functions as spark_funcs
from pyspark.sql import types as spark_types
import matplotlib.dates as mdates

# Importation sélective de classes avec alias
from pyspark.sql import Column as SparkColumn
from pyspark.sql import Row as SparkRow
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql import Window as SparkWindow
from pyspark.sql.functions import col, split, lower, regexp_replace, month, year, sum as Fsum

# Importation spécifique à Jupyter
from IPython.display import display

# ---------- Modules Python personnalisés ---------- #
from src import utils

import pandas as pd
import numpy as np
from datetime import datetime, date

import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [68]:
spark = SparkSession.builder.getOrCreate()

## **1 - Lecture des données**

# Lecture de Paris

In [69]:
psdf_person_paris=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_PARIS/PERSONNEL_PARIS.txt', sep=';')
psdf_person_paris.head(5)
#psdf_person_paris.shape[0]  :5000



Unnamed: 0,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DT_NAISS,VILLE_NAISS,PAYS_NAISS,NUM_SECU,IND_PAYS_NUM_TELP,NUM_TELEPHONE,NUM_VOIE,DSC_VOIE,CMPL_VOIE,CD_POSTAL,VILLE,PAYS,FONCTION_PERSONNEL,TS_CREATION_PERSONNEL,TS_MAJ_PPERSONNEL
0,KeyPers_Paris_1230000,Name0,FistName0,1992-03-13,Lille,France,NS000000000,,+336##0151713,22,NomVoie100,,#9423,Paris,France,Ingénieur Informaticien,2005-06-24 06:48:21,2005-06-24 06:48:21
1,KeyPers_Paris_1230001,Name1,FistName1,1955-02-17,Pekin,China,NS000000001,,+336##0797149,33,NomVoie731,,#2429,Paris,France,Ingénieur Data,2007-11-16 22:38:43,2007-11-16 22:38:43
2,KeyPers_Paris_1230002,Name2,FistName2,1982-10-06,Alger,Algeria,NS000000002,,+336##0319378,55,NomVoie622,,#7861,Paris,France,Ingénieur Informaticien,2004-07-09 14:25:16,2004-07-09 14:25:16
3,KeyPers_Paris_1230003,Name3,FistName3,1955-08-22,Lima,Peru,NS000000003,,+336##0205027,61,NomVoie363,,#4028,Paris,France,Ingénieur Data,2013-02-16 23:23:05,2013-02-16 23:23:05
4,KeyPers_Paris_1230004,Name4,FistName4,2017-04-07,Pekin,China,NS000000004,,+336##0456031,68,NomVoie914,,#0111,Paris,France,Cadre,2001-10-13 22:42:51,2001-10-13 22:42:51


In [70]:
psdf_informatique_paris=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_PARIS/BDD_BGES_PARIS_INFORMATIQUE', sep=';')
psdf_informatique_paris.head(5)
#psdf_informatique_paris.shape[0]  :1545



Unnamed: 0,ID_MATERIELINFO,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DATE_ACHAT,TYPE,MODELE
0,Paris_MATERIEL_INFO_202406250,KeyPers_Paris_1233515,Name3515,FistName3515,2024-06-25 11:37:24,Ecran,modèle par défaut
1,Paris_MATERIEL_INFO_202406251,KeyPers_Paris_1232092,Name2092,FistName2092,2024-06-25 10:48:59,Vidéo projecteur,modèle par défaut
2,Paris_MATERIEL_INFO_202406252,KeyPers_Paris_1231450,Name1450,FistName1450,2024-06-25 10:52:46,PC fixe sans ecran,"Elite (Tower, SFF, One)"
3,Paris_MATERIEL_INFO_202406253,KeyPers_Paris_1234268,Name4268,FistName4268,2024-06-25 11:55:34,imprimante,Laser A3 (40-99kg)
4,Paris_MATERIEL_INFO_202406254,KeyPers_Paris_1230872,Name872,FistName872,2024-06-25 16:44:37,PC portable,Moyenne 14/15pouces


In [71]:
psdf_mission_paris=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_PARIS/BDD_BGES_PARIS_MISSION', sep=';')
psdf_mission_paris.head(5)
#psdf_mission_paris.shape[0]  :4518



Unnamed: 0,ID_MISSION,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DATE_MISSION,TYPE_MISSION,VILLE_DEPART,PAYS_DEPART,VILLE_DESTINATION,PAYS_DESTINATION,TRANSPORT,ALLER_RETOUR
0,Paris_202405180,KeyPers_Paris_1233185,Name3185,FistName3185,2024-05-18 23:19:11,Réunion,Paris,France,Wellington,New Zealand,Avion,oui
1,Paris_202405181,KeyPers_Paris_1230183,Name183,FistName183,2024-05-18 10:54:36,Développement,Paris,France,Sidney,Australia,Avion,oui
2,Paris_202405182,KeyPers_Paris_1232953,Name2953,FistName2953,2024-05-18 06:43:42,Développement,Paris,France,Auckland,New Zealand,Avion,oui
3,Paris_202405183,KeyPers_Paris_1234798,Name4798,FistName4798,2024-05-18 22:28:46,Conférence,Paris,France,Oslo,Norvège,Avion,non
4,Paris_202405184,KeyPers_Paris_1231863,Name1863,FistName1863,2024-05-18 04:16:39,Rencontre entreprises,Paris,France,Lima,Peru,Avion,oui


**2.导入上海数据**

In [72]:
psdf_person_shanghai=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_SHANGHAI/PERSONNEL_SHANGHAI.txt', sep=';')
psdf_person_shanghai.head(5)



Unnamed: 0,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DT_NAISS,VILLE_NAISS,PAYS_NAISS,NUM_SECU,IND_PAYS_NUM_TELP,NUM_TELEPHONE,NUM_VOIE,DSC_VOIE,CMPL_VOIE,CD_POSTAL,VILLE,PAYS,FONCTION_PERSONNEL,TS_CREATION_PERSONNEL,TS_MAJ_PPERSONNEL
0,KeyPers_Shanghai_1230000,Name0,FistName0,2001-05-14,Rio de Janeiro,Brazil,NS000000000,,+336##0481414,86,NomVoie440,,#4474,Shanghai,China,Business Executive,2015-07-07 10:18:57,2015-07-07 10:18:57
1,KeyPers_Shanghai_1230001,Name1,FistName1,1995-06-01,Compiègne,France,NS000000001,,+336##0648537,86,NomVoie661,,#3428,Shanghai,China,Data Engineer,2015-02-14 08:32:07,2015-02-14 08:32:07
2,KeyPers_Shanghai_1230002,Name2,FistName2,1983-06-24,Compiègne,France,NS000000002,,+336##0641720,60,NomVoie722,,#2672,Shanghai,China,Computer Engineer,2011-03-24 21:17:01,2011-03-24 21:17:01
3,KeyPers_Shanghai_1230003,Name3,FistName3,1963-05-11,Shanghai,China,NS000000003,,+336##0405078,43,NomVoie773,,#5575,Shanghai,China,Data Engineer,2013-02-26 20:56:07,2013-02-26 20:56:07
4,KeyPers_Shanghai_1230004,Name4,FistName4,1988-10-08,Vancouver,Canada,NS000000004,,+336##0899659,94,NomVoie594,,#7804,Shanghai,China,Data Engineer,2004-08-18 06:54:54,2004-08-18 06:54:54


In [73]:
psdf_informatique_shanghai=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_SHANGHAI/BDD_BGES_SHANGHAI_INFORMATIQUE', sep=';')
psdf_informatique_shanghai.head(5)



Unnamed: 0,ID_MATERIELINFO,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DATE_ACHAT,TYPE,MODELE
0,Shanghai_MATERIEL_INFO_202409200,KeyPers_Shanghai_1230677,Name677,FistName677,2024-09-20 13:48:41,PC portable,modèle par défaut
1,Shanghai_MATERIEL_INFO_202409201,KeyPers_Shanghai_1230621,Name621,FistName621,2024-09-20 08:39:31,PC fixe sans ecran,modèle par défaut
2,Shanghai_MATERIEL_INFO_202409202,KeyPers_Shanghai_1230799,Name799,FistName799,2024-09-20 15:09:36,Vidéo projecteur,Pour salle
3,Shanghai_MATERIEL_INFO_202409203,KeyPers_Shanghai_1230479,Name479,FistName479,2024-09-20 15:02:34,PC portable,EliteBook 6xx
4,Shanghai_MATERIEL_INFO_202409204,KeyPers_Shanghai_1230452,Name452,FistName452,2024-09-20 13:19:18,,modèle par défaut


In [74]:
psdf_mission_shanghai=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_SHANGHAI/BDD_BGES_SHANGHAI_MISSION', sep=';')
psdf_mission_shanghai.head(5)



Unnamed: 0,ID_MISSION,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DATE_MISSION,TYPE_MISSION,VILLE_DEPART,PAYS_DEPART,VILLE_DESTINATION,PAYS_DESTINATION,TRANSPORT,ALLER_RETOUR
0,Shanghai_202406050,KeyPers_Shanghai_1230528,Name528,FistName528,2024-06-05 15:10:01,Development,Shanghai,China,Los Angeles,USA,Avion,oui
1,Shanghai_202406051,KeyPers_Shanghai_1230794,Name794,FistName794,2024-06-05 08:12:49,Vocational Training,Shanghai,China,Dubaï,Emirats,Avion,oui
2,Shanghai_202406052,KeyPers_Shanghai_1230681,Name681,FistName681,2024-06-05 09:43:46,Development,Shanghai,China,Pekin,China,Train,oui
3,Shanghai_202406053,KeyPers_Shanghai_1230807,Name807,FistName807,2024-06-05 15:55:40,Vocational Training,Shanghai,China,Marseille,France,Avion,oui
4,Shanghai_202406054,KeyPers_Shanghai_1230435,Name435,FistName435,2024-06-05 11:04:37,Conference,Shanghai,China,Alger,Algeria,Avion,non


**3.导入柏林数据**

In [75]:
psdf_person_berlin=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_BERLIN/PERSONNEL_BERLIN.txt', sep=';')
psdf_person_berlin.head(5)



Unnamed: 0,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DT_NAISS,VILLE_NAISS,PAYS_NAISS,NUM_SECU,IND_PAYS_NUM_TELP,NUM_TELEPHONE,NUM_VOIE,DSC_VOIE,CMPL_VOIE,CD_POSTAL,VILLE,PAYS,FONCTION_PERSONNEL,TS_CREATION_PERSONNEL,TS_MAJ_PPERSONNEL
0,KeyPers_Berlin_1230000,Name0,FistName0,1993-11-04,Osaka,Japan,NS000000000,,+336##0263188,57,NomVoie940,,#8830,Berlin,Germany,Dateningenieur,2010-09-09 11:22:17,2010-09-09 11:22:17
1,KeyPers_Berlin_1230001,Name1,FistName1,1932-11-22,Wellington,New Zealand,NS000000001,,+336##0401873,64,NomVoie711,,#9785,Berlin,Germany,Führungskraft,2017-07-24 16:51:18,2017-07-24 16:51:18
2,KeyPers_Berlin_1230002,Name2,FistName2,1990-08-12,Sidney,Australia,NS000000002,,+336##0524126,94,NomVoie322,,#4816,Berlin,Germany,Dateningenieur,1997-08-20 04:01:46,1997-08-20 04:01:46
3,KeyPers_Berlin_1230003,Name3,FistName3,1965-05-26,Rabat,Maroc,NS000000003,,+336##0418484,78,NomVoie593,,#3546,Berlin,Germany,Ökonom,1998-11-17 19:25:01,1998-11-17 19:25:01
4,KeyPers_Berlin_1230004,Name4,FistName4,1959-01-18,Shanghai,China,NS000000004,,+336##0986317,65,NomVoie404,,#6788,Berlin,Germany,Dateningenieur,2010-11-10 04:24:37,2010-11-10 04:24:37


In [76]:
psdf_informatique_berlin=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_BERLIN/BDD_BGES_BERLIN_INFORMATIQUE', sep=';')
psdf_informatique_berlin.head(5)



Unnamed: 0,ID_MATERIELINFO,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DATE_ACHAT,TYPE,MODELE
0,BERLIN_MATERIEL_INFO_202411050,KeyPers_Berlin_1233545,Name3545,FistName3545,2024-11-05 12:14:51,PC fixe sans ecran,"Elite (Tower, SFF, One)"
1,BERLIN_MATERIEL_INFO_202411051,KeyPers_Berlin_1233662,Name3662,FistName3662,2024-11-05 16:09:40,Serveur,Modèle par défaut
2,BERLIN_MATERIEL_INFO_202411052,KeyPers_Berlin_1231784,Name1784,FistName1784,2024-11-05 15:46:01,souris,modèle par défaut
3,BERLIN_MATERIEL_INFO_202411053,KeyPers_Berlin_1230638,Name638,FistName638,2024-11-05 10:21:49,PC fixe sans ecran,Precision tower 5xxx
4,BERLIN_MATERIEL_INFO_202411054,KeyPers_Berlin_1233852,Name3852,FistName3852,2024-11-05 16:47:30,PC fixe sans ecran,ZHAN


In [77]:
psdf_mission_berlin=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_BERLIN/BDD_BGES_BERLIN_MISSION', sep=';')
psdf_mission_berlin.head(5)



Unnamed: 0,ID_MISSION,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DATE_MISSION,TYPE_MISSION,VILLE_DEPART,PAYS_DEPART,VILLE_DESTINATION,PAYS_DESTINATION,TRANSPORT,ALLER_RETOUR
0,BERLIN_202411110,KeyPers_Berlin_1230238,Name238,FistName238,2024-11-11 13:34:45,Schulung,Berlin,Allemagne,Buenos Aires,Argentina,Avion,oui
1,BERLIN_202411111,KeyPers_Berlin_1230352,Name352,FistName352,2024-11-11 03:42:11,Schulung,Berlin,Allemagne,Berlin,Allemagne,Transports en commun,oui
2,BERLIN_202411112,KeyPers_Berlin_1233087,Name3087,FistName3087,2024-11-11 18:32:48,Entwicklung,New-York,USA,Vancouver,Canada,Avion,oui
3,BERLIN_202411113,KeyPers_Berlin_1234060,Name4060,FistName4060,2024-11-11 22:06:55,Entwicklung,Paris,France,Tokyo,Japan,Avion,oui
4,BERLIN_202411114,KeyPers_Berlin_1233427,Name3427,FistName3427,2024-11-11 02:12:52,Geschäftstreffen,Berlin,Allemagne,Los Angeles,USA,Avion,oui


**4.导入伦敦数据**

In [78]:
psdf_person_london=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_LONDON/PERSONNEL_LONDON.txt', sep=';')
psdf_person_london.head(5)



Unnamed: 0,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DT_NAISS,VILLE_NAISS,PAYS_NAISS,NUM_SECU,IND_PAYS_NUM_TELP,NUM_TELEPHONE,NUM_VOIE,DSC_VOIE,CMPL_VOIE,CD_POSTAL,VILLE,PAYS,FONCTION_PERSONNEL,TS_CREATION_PERSONNEL,TS_MAJ_PPERSONNEL
0,KeyPers_London_1230000,Name0,FistName0,2004-07-14,Melbourne,Australia,NS000000000,,+336##0612365,97,NomVoie120,,#1819,London,England,Data Engineer,2020-10-03 17:41:19,2020-10-03 17:41:19
1,KeyPers_London_1230001,Name1,FistName1,1954-10-23,Tunis,Tunisie,NS000000001,,+336##0487142,65,NomVoie351,,#1748,London,England,Data Engineer,2022-09-09 06:06:04,2022-09-09 06:06:04
2,KeyPers_London_1230002,Name2,FistName2,1937-04-09,Tokyo,Japan,NS000000002,,+336##0059989,71,NomVoie442,,#0669,London,England,Computer Engineer,2007-08-27 10:18:05,2007-08-27 10:18:05
3,KeyPers_London_1230003,Name3,FistName3,1970-08-07,Bordeaux,France,NS000000003,,+336##0366753,10,NomVoie743,,#9255,London,England,Data Engineer,2023-10-02 21:57:57,2023-10-02 21:57:57
4,KeyPers_London_1230004,Name4,FistName4,1994-11-26,Helsinki,Finlande,NS000000004,,+336##0422772,47,NomVoie314,,#7467,London,England,Computer Engineer,2013-01-24 14:22:05,2013-01-24 14:22:05


In [79]:
psdf_informatique_london=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_LONDON/BDD_BGES_LONDON_INFORMATIQUE', sep=';')
psdf_informatique_london.head(5)



Unnamed: 0,ID_MATERIELINFO,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DATE_ACHAT,TYPE,MODELE
0,LONDON_MATERIEL_INFO_202410030,KeyPers_London_1231533,Name1533,FistName1533,2024-10-03 16:48:26,,"Prodesk (Tower, SFF)"
1,LONDON_MATERIEL_INFO_202410031,KeyPers_London_1231342,Name1342,FistName1342,2024-10-03 09:05:51,PC portable,ProBook
2,LONDON_MATERIEL_INFO_202410032,KeyPers_London_1232922,Name2922,FistName2922,2024-10-03 15:55:28,PC fixe sans ecran,Wyse thin client
3,LONDON_MATERIEL_INFO_202410033,KeyPers_London_1233394,Name3394,FistName3394,2024-10-03 10:38:04,,Latitude 5xxx
4,LONDON_MATERIEL_INFO_202410034,KeyPers_London_1233566,Name3566,FistName3566,2024-10-03 12:11:29,PC portable,MacBook air pre-retina


In [80]:
psdf_mission_london=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_LONDON/BDD_BGES_LONDON_MISSION', sep=';')
psdf_mission_london.head(5)



Unnamed: 0,ID_MISSION,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DATE_MISSION,TYPE_MISSION,VILLE_DEPART,PAYS_DEPART,VILLE_DESTINATION,PAYS_DESTINATION,TRANSPORT,ALLER_RETOUR
0,LONDON_202410190,KeyPers_London_1231718,Name1718,FistName1718,2024-10-19 01:15:13,Vocational Training,London,England,Melbourne,Australia,Avion,non
1,LONDON_202410191,KeyPers_London_1230403,Name403,FistName403,2024-10-19 12:03:35,Development,London,England,Alger,Algeria,Avion,oui
2,LONDON_202410192,KeyPers_London_1233125,Name3125,FistName3125,2024-10-19 03:22:25,Vocational Training,London,England,Montreal,Canada,Avion,oui
3,LONDON_202410193,KeyPers_London_1231247,Name1247,FistName1247,2024-10-19 09:20:18,Team Meeting,London,England,Rabat,Maroc,Avion,oui
4,LONDON_202410194,KeyPers_London_1231939,Name1939,FistName1939,2024-10-19 06:26:02,Development,London,England,Los Angeles,USA,Avion,oui


**5.导入洛杉矶数据**

In [81]:
psdf_person_losangeles=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_LOSANGELES/PERSONNEL_LOSANGELES.txt', sep=';')
psdf_person_losangeles.head(5)



Unnamed: 0,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DT_NAISS,VILLE_NAISS,PAYS_NAISS,NUM_SECU,IND_PAYS_NUM_TELP,NUM_TELEPHONE,NUM_VOIE,DSC_VOIE,CMPL_VOIE,CD_POSTAL,VILLE,PAYS,FONCTION_PERSONNEL,TS_CREATION_PERSONNEL,TS_MAJ_PPERSONNEL
0,KeyPers_LA_1230000,Name0,FistName0,2012-03-20,Compiègne,France,NS000000000,,+336##0864857,80,NomVoie730,,#8378,Los Angeles,USA,Business Executive,2011-01-20 04:14:27,2011-01-20 04:14:27
1,KeyPers_LA_1230001,Name1,FistName1,1943-11-04,Auckland,New Zealand,NS000000001,,+336##0389143,72,NomVoie51,,#5530,Los Angeles,USA,Data Engineer,2001-04-12 16:39:19,2001-04-12 16:39:19
2,KeyPers_LA_1230002,Name2,FistName2,1993-10-25,Paris,France,NS000000002,,+336##0553330,6,NomVoie112,,#5279,Los Angeles,USA,Economist,2010-10-21 15:52:41,2010-10-21 15:52:41
3,KeyPers_LA_1230003,Name3,FistName3,1983-02-27,Washington,USA,NS000000003,,+336##0860769,18,NomVoie503,,#5909,Los Angeles,USA,Data Engineer,2021-08-09 11:11:53,2021-08-09 11:11:53
4,KeyPers_LA_1230004,Name4,FistName4,2013-11-05,London,England,NS000000004,,+336##0791470,23,NomVoie454,,#2837,Los Angeles,USA,Data Engineer,2022-05-23 18:50:27,2022-05-23 18:50:27


In [82]:
psdf_informatique_losangeles=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_LOSANGELES/BDD_BGES_LOSANGELES_INFORMATIQUE', sep=';')
psdf_informatique_losangeles.head(5)



Unnamed: 0,ID_MATERIELINFO,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DATE_ACHAT,TYPE,MODELE
0,LA_MATERIEL_INFO_202407060,KeyPers_LA_1231978,Name1978,FistName1978,2024-07-06 08:39:05,Vidéo projecteur,modèle par défaut
1,LA_MATERIEL_INFO_202407061,KeyPers_LA_1231609,Name1609,FistName1609,2024-07-06 08:10:36,PC fixe sans ecran,Optiplex small
2,LA_MATERIEL_INFO_202407062,KeyPers_LA_1230112,Name112,FistName112,2024-07-06 13:35:18,imprimante,Laser A3 (>100kg)
3,LA_MATERIEL_INFO_202407063,KeyPers_LA_1232436,Name2436,FistName2436,2024-07-06 15:32:55,Serveur,Modèle par défaut
4,LA_MATERIEL_INFO_202407064,KeyPers_LA_1232904,Name2904,FistName2904,2024-07-06 10:21:24,PC fixe sans ecran,"EliteDesk (Tower, SFF, One)"


In [83]:
psdf_mission_losangeles=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_LOSANGELES/BDD_BGES_LOSANGELES_MISSION', sep=';')
psdf_mission_losangeles.head(5)



Unnamed: 0,ID_MISSION,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DATE_MISSION,TYPE_MISSION,VILLE_DEPART,PAYS_DEPART,VILLE_DESTINATION,PAYS_DESTINATION,TRANSPORT,ALLER_RETOUR
0,LA_202405070,KeyPers_LA_1232039,Name2039,FistName2039,2024-05-07 07:03:44,Development,Los Angeles,USA,Lille,France,Avion,oui
1,LA_202405071,KeyPers_LA_1232529,Name2529,FistName2529,2024-05-07 22:57:21,Vocational Training,Los Angeles,USA,Los Angeles,USA,Transports en commun,non
2,LA_202405072,KeyPers_LA_1231329,Name1329,FistName1329,2024-05-07 06:27:09,Conference,Los Angeles,USA,Sao Paulo,Brazil,Avion,oui
3,LA_202405073,KeyPers_LA_1230475,Name475,FistName475,2024-05-07 12:06:31,Team Meeting,Paris,France,Bogota,Colombia,Avion,oui
4,LA_202405074,KeyPers_LA_1230188,Name188,FistName188,2024-05-07 19:41:53,Conference,New-York,USA,Rabat,Maroc,Avion,oui


**6.导入纽约数据**

In [84]:
psdf_person_newyork=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_NEWYORK/PERSONNEL_NEWYORK.txt', sep=';')
psdf_person_newyork.head(5)



Unnamed: 0,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DT_NAISS,VILLE_NAISS,PAYS_NAISS,NUM_SECU,IND_PAYS_NUM_TELP,NUM_TELEPHONE,NUM_VOIE,DSC_VOIE,CMPL_VOIE,CD_POSTAL,VILLE,PAYS,FONCTION_PERSONNEL,TS_CREATION_PERSONNEL,TS_MAJ_PPERSONNEL
0,KeyPers_NewYork_1230000,Name0,FistName0,2011-06-01,Tokyo,Japan,NS000000000,,+336##0264749,33,NomVoie790,,#9358,New-York,USA,Business Executive,2021-07-10 15:48:40,2021-07-10 15:48:40
1,KeyPers_NewYork_1230001,Name1,FistName1,1995-04-10,Oslo,Norvège,NS000000001,,+336##0035150,94,NomVoie871,,#1775,New-York,USA,Computer Engineer,2013-11-04 14:06:40,2013-11-04 14:06:40
2,KeyPers_NewYork_1230002,Name2,FistName2,1963-04-24,Stockholm,Suède,NS000000002,,+336##0076311,31,NomVoie472,,#4431,New-York,USA,Computer Engineer,2020-10-20 11:36:07,2020-10-20 11:36:07
3,KeyPers_NewYork_1230003,Name3,FistName3,1977-09-03,Osaka,Japan,NS000000003,,+336##0151475,41,NomVoie593,,#9525,New-York,USA,Data Engineer,2012-10-27 03:07:45,2012-10-27 03:07:45
4,KeyPers_NewYork_1230004,Name4,FistName4,1950-06-17,Mexico,Mexico,NS000000004,,+336##0394564,0,NomVoie794,,#3464,New-York,USA,Computer Engineer,2019-03-21 11:07:16,2019-03-21 11:07:16


In [85]:
psdf_informatique_newyork=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_NEWYORK/BDD_BGES_NEWYORK_INFORMATIQUE', sep=';')
psdf_informatique_newyork.head(5)



Unnamed: 0,ID_MATERIELINFO,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DATE_ACHAT,TYPE,MODELE
0,NewYork_MATERIEL_INFO_202406250,KeyPers_NewYork_1231314,Name1314,FistName1314,2024-06-25 09:01:53,PC fixe sans ecran,Station de travail (moy)
1,NewYork_MATERIEL_INFO_202406251,KeyPers_NewYork_1231458,Name1458,FistName1458,2024-06-25 09:41:42,Station d'accueil,modèle par défaut
2,NewYork_MATERIEL_INFO_202406252,KeyPers_NewYork_1231652,Name1652,FistName1652,2024-06-25 15:21:36,PC fixe tout-en-un,Mac pro
3,NewYork_MATERIEL_INFO_202406253,KeyPers_NewYork_1231180,Name1180,FistName1180,2024-06-25 08:28:16,PC fixe sans ecran,Precision tower 3xxx
4,NewYork_MATERIEL_INFO_202406254,KeyPers_NewYork_1230731,Name731,FistName731,2024-06-25 14:04:36,PC fixe sans ecran,Wyse thin client


In [86]:
psdf_mission_newyork=ps.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/BDD_BGES/BDD_BGES_NEWYORK/BDD_BGES_NEWYORK_MISSION', sep=';')
psdf_mission_newyork.head(5)



Unnamed: 0,ID_MISSION,ID_PERSONNEL,NOM_PERSONNEL,PRENOM_PERSONNEL,DATE_MISSION,TYPE_MISSION,VILLE_DEPART,PAYS_DEPART,VILLE_DESTINATION,PAYS_DESTINATION,TRANSPORT,ALLER_RETOUR
0,NewYork_202405010,KeyPers_NewYork_1230589,Name589,FistName589,2024-05-01 02:03:41,Conference,New-York,USA,Auckland,New Zealand,Avion,oui
1,NewYork_202405011,KeyPers_NewYork_1231060,Name1060,FistName1060,2024-05-01 21:53:59,Business Meeting,New-York,USA,Tunis,Tunisie,Avion,oui
2,NewYork_202405012,KeyPers_NewYork_1230977,Name977,FistName977,2024-05-01 02:25:02,Vocational Training,New-York,USA,Sao Paulo,Brazil,Avion,oui
3,NewYork_202405013,KeyPers_NewYork_1230284,Name284,FistName284,2024-05-01 05:31:53,Business Meeting,New-York,USA,Pekin,China,Avion,oui
4,NewYork_202405014,KeyPers_NewYork_1232661,Name2661,FistName2661,2024-05-01 05:55:53,Team Meeting,New-York,USA,London,England,Avion,oui


**转换为spark类数据**

In [87]:

sdf_person_paris=psdf_person_paris.to_spark()
sdf_informatique_paris=psdf_informatique_paris.to_spark()
sdf_mission_paris=psdf_mission_paris.to_spark()

sdf_person_shanghai=psdf_person_shanghai.to_spark()
sdf_informatique_shanghai=psdf_informatique_shanghai.to_spark()
sdf_mission_shanghai=psdf_mission_shanghai.to_spark()

sdf_person_berlin=psdf_person_berlin.to_spark()
sdf_informatique_berlin=psdf_informatique_berlin.to_spark()
sdf_mission_berlin=psdf_mission_berlin.to_spark()

sdf_person_london=psdf_person_london.to_spark()
sdf_informatique_london=psdf_informatique_london.to_spark() 
sdf_mission_london=psdf_mission_london.to_spark()

sdf_person_losangeles=psdf_person_losangeles.to_spark()
sdf_informatique_losangeles=psdf_informatique_losangeles.to_spark()
sdf_mission_losangeles=psdf_mission_losangeles.to_spark()

sdf_person_newyork=psdf_person_newyork.to_spark()
sdf_informatique_newyork=psdf_informatique_newyork.to_spark()
sdf_mission_newyork=psdf_mission_newyork.to_spark()





In [88]:
import pyspark.pandas as ps
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB


psdf_train = ps.concat([
    psdf_informatique_berlin, 
    psdf_informatique_paris, 
    psdf_informatique_london, 
    psdf_informatique_losangeles, 
    psdf_informatique_newyork, 
    psdf_informatique_shanghai
], axis=0)


psdf_train = psdf_train.dropna(subset=["TYPE", "MODELE"])


X_type_train = psdf_train["MODELE"].to_pandas()
y_type_train = psdf_train["TYPE"].to_pandas()

X_model_train = psdf_train["TYPE"].to_pandas()
y_model_train = psdf_train["MODELE"].to_pandas()


type_vectorizer = CountVectorizer().fit(X_type_train)
model_vectorizer = CountVectorizer().fit(X_model_train)

type_classifier = MultinomialNB().fit(type_vectorizer.transform(X_type_train), y_type_train)
model_classifier = MultinomialNB().fit(model_vectorizer.transform(X_model_train), y_model_train)






In [89]:

def fill_missing_values(df, type_classifier, type_vectorizer, model_classifier, model_vectorizer):
   
    # 转换为 Pandas DataFrame 进行处理
    df = df.toPandas()

    # 预测 TYPE 列
    typeindextopredict = df[(df['TYPE'].isnull()) | (df['TYPE'].str.strip() == "")].index
    typetopredict = df.loc[typeindextopredict, 'MODELE']

    if not typetopredict.empty:
        print(f"PREDICTING {len(typetopredict)} TYPE...")
        predicted_types = type_classifier.predict(type_vectorizer.transform(typetopredict))
        df.loc[typeindextopredict, 'TYPE'] = predicted_types
    else:
        print("NO NULL TYPE")

    # 预测 MODELE 列
    modelindextopredict = df[(df['MODELE'].isnull()) | (df['MODELE'].str.strip() == "")].index
    modeltopredict = df.loc[modelindextopredict, 'TYPE']

    if not modeltopredict.empty:
        print(f"PREDICTING {len(modeltopredict)} MODEL...")
        predicted_models = model_classifier.predict(model_vectorizer.transform(modeltopredict))
        df.loc[modelindextopredict, 'MODELE'] = predicted_models
    else:
        print("NO NULL MODEL")

    # 转回 Spark DataFrame
    return spark.createDataFrame(df)

# 列出所有要处理的 Spark DataFrame
sdf_list = [
    sdf_informatique_paris,
    sdf_informatique_berlin,
    sdf_informatique_london,
    sdf_informatique_losangeles,
    sdf_informatique_newyork,
    sdf_informatique_shanghai
]

# 对每个 DataFrame 执行填充操作

for i, sdf in enumerate(sdf_list):
    print(f"\n processing N'  {i + 1}:")
    sdf_list[i] = fill_missing_values(sdf, type_classifier, type_vectorizer, model_classifier, model_vectorizer)



 processing N'  1:
PREDICTING 327 TYPE...
PREDICTING 131 MODEL...

 processing N'  2:
PREDICTING 126 TYPE...
PREDICTING 53 MODEL...

 processing N'  3:
PREDICTING 296 TYPE...
PREDICTING 109 MODEL...

 processing N'  4:
PREDICTING 274 TYPE...
PREDICTING 120 MODEL...

 processing N'  5:
PREDICTING 293 TYPE...
PREDICTING 115 MODEL...

 processing N'  6:
PREDICTING 322 TYPE...
PREDICTING 116 MODEL...


**ETL_person  Unification des différentes langues**

In [90]:
sdf_person_paris.show(2)
print('before replace')
sdf_person_paris_1=sdf_person_paris.select(sdf_person_paris.FONCTION_PERSONNEL).distinct().show()
sdf_person_paris=sdf_person_paris.replace({'Ingénieur Data':'Data Engineer','Economiste':'Economist','DRH':'HRD','Ingénieur Informaticien':'Computer Engineer','Cadre':'Business Executive'},subset=['FONCTION_PERSONNEL'])
print('after replace')
sdf_person_paris_1=sdf_person_paris.select(sdf_person_paris.FONCTION_PERSONNEL).distinct().show()

+--------------------+-------------+----------------+----------+-----------+----------+-----------+-----------------+-------------+--------+----------+---------+---------+-----+------+--------------------+---------------------+-------------------+
|        ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|  DT_NAISS|VILLE_NAISS|PAYS_NAISS|   NUM_SECU|IND_PAYS_NUM_TELP|NUM_TELEPHONE|NUM_VOIE|  DSC_VOIE|CMPL_VOIE|CD_POSTAL|VILLE|  PAYS|  FONCTION_PERSONNEL|TS_CREATION_PERSONNEL|  TS_MAJ_PPERSONNEL|
+--------------------+-------------+----------------+----------+-----------+----------+-----------+-----------------+-------------+--------+----------+---------+---------+-----+------+--------------------+---------------------+-------------------+
|KeyPers_Paris_123...|        Name0|       FistName0|1992-03-13|      Lille|    France|NS000000000|             NULL|+336##0151713|      22|NomVoie100|     NULL|    #9423|Paris|France|Ingénieur Informa...|  2005-06-24 06:48:21|2005-06-24 06:48:21|
|KeyPers

In [91]:
sdf_person_shanghai.show(3)
sdf_person_shanghai_1=sdf_person_shanghai.select(sdf_person_shanghai.FONCTION_PERSONNEL).distinct().show(20)
print('no need to replace')


+--------------------+-------------+----------------+----------+--------------+----------+-----------+-----------------+-------------+--------+----------+---------+---------+--------+-----+------------------+---------------------+-------------------+
|        ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|  DT_NAISS|   VILLE_NAISS|PAYS_NAISS|   NUM_SECU|IND_PAYS_NUM_TELP|NUM_TELEPHONE|NUM_VOIE|  DSC_VOIE|CMPL_VOIE|CD_POSTAL|   VILLE| PAYS|FONCTION_PERSONNEL|TS_CREATION_PERSONNEL|  TS_MAJ_PPERSONNEL|
+--------------------+-------------+----------------+----------+--------------+----------+-----------+-----------------+-------------+--------+----------+---------+---------+--------+-----+------------------+---------------------+-------------------+
|KeyPers_Shanghai_...|        Name0|       FistName0|2001-05-14|Rio de Janeiro|    Brazil|NS000000000|             NULL|+336##0481414|      86|NomVoie440|     NULL|    #4474|Shanghai|China|Business Executive|  2015-07-07 10:18:57|2015-07-07 10:18:

In [92]:
sdf_person_berlin.show(3)
print('before replace ')
sdf_person_berlin.select("FONCTION_PERSONNEL").distinct().show()

sdf_person_berlin = sdf_person_berlin.replace(
    {
        'Dateningenieur': 'Data Engineer',
        'Ökonom': 'Economist',
        'Personalleiter': 'HRD',
        'Computeringenieur': 'Computer Engineer',
        'Führungskraft': 'Business Executive'
    },
    subset=['FONCTION_PERSONNEL']
)

print('after replace ：')
sdf_person_berlin.select("FONCTION_PERSONNEL").distinct().show()

+--------------------+-------------+----------------+----------+-----------+-----------+-----------+-----------------+-------------+--------+----------+---------+---------+------+-------+------------------+---------------------+-------------------+
|        ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|  DT_NAISS|VILLE_NAISS| PAYS_NAISS|   NUM_SECU|IND_PAYS_NUM_TELP|NUM_TELEPHONE|NUM_VOIE|  DSC_VOIE|CMPL_VOIE|CD_POSTAL| VILLE|   PAYS|FONCTION_PERSONNEL|TS_CREATION_PERSONNEL|  TS_MAJ_PPERSONNEL|
+--------------------+-------------+----------------+----------+-----------+-----------+-----------+-----------------+-------------+--------+----------+---------+---------+------+-------+------------------+---------------------+-------------------+
|KeyPers_Berlin_12...|        Name0|       FistName0|1993-11-04|      Osaka|      Japan|NS000000000|             NULL|+336##0263188|      57|NomVoie940|     NULL|    #8830|Berlin|Germany|    Dateningenieur|  2010-09-09 11:22:17|2010-09-09 11:22:17|
|Key

In [93]:
sdf_person_london.show(3)
sdf_person_london_1=sdf_person_london.select(sdf_person_london.FONCTION_PERSONNEL).distinct().show(20)
print('no need to replace')

+--------------------+-------------+----------------+----------+-----------+----------+-----------+-----------------+-------------+--------+----------+---------+---------+------+-------+------------------+---------------------+-------------------+
|        ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|  DT_NAISS|VILLE_NAISS|PAYS_NAISS|   NUM_SECU|IND_PAYS_NUM_TELP|NUM_TELEPHONE|NUM_VOIE|  DSC_VOIE|CMPL_VOIE|CD_POSTAL| VILLE|   PAYS|FONCTION_PERSONNEL|TS_CREATION_PERSONNEL|  TS_MAJ_PPERSONNEL|
+--------------------+-------------+----------------+----------+-----------+----------+-----------+-----------------+-------------+--------+----------+---------+---------+------+-------+------------------+---------------------+-------------------+
|KeyPers_London_12...|        Name0|       FistName0|2004-07-14|  Melbourne| Australia|NS000000000|             NULL|+336##0612365|      97|NomVoie120|     NULL|    #1819|London|England|     Data Engineer|  2020-10-03 17:41:19|2020-10-03 17:41:19|
|KeyPers

In [94]:
sdf_person_losangeles.show(3)
sdf_person_losangeles_1=sdf_person_losangeles.select(sdf_person_losangeles.FONCTION_PERSONNEL).distinct().show(20)
print('no need to replace')

+------------------+-------------+----------------+----------+-----------+-----------+-----------+-----------------+-------------+--------+----------+---------+---------+-----------+----+------------------+---------------------+-------------------+
|      ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|  DT_NAISS|VILLE_NAISS| PAYS_NAISS|   NUM_SECU|IND_PAYS_NUM_TELP|NUM_TELEPHONE|NUM_VOIE|  DSC_VOIE|CMPL_VOIE|CD_POSTAL|      VILLE|PAYS|FONCTION_PERSONNEL|TS_CREATION_PERSONNEL|  TS_MAJ_PPERSONNEL|
+------------------+-------------+----------------+----------+-----------+-----------+-----------+-----------------+-------------+--------+----------+---------+---------+-----------+----+------------------+---------------------+-------------------+
|KeyPers_LA_1230000|        Name0|       FistName0|2012-03-20|  Compiègne|     France|NS000000000|             NULL|+336##0864857|      80|NomVoie730|     NULL|    #8378|Los Angeles| USA|Business Executive|  2011-01-20 04:14:27|2011-01-20 04:14:27|
|Key

In [95]:
sdf_person_newyork.show(3)
sdf_person_newyork_1=sdf_person_newyork.select(sdf_person_newyork.FONCTION_PERSONNEL).distinct().show(20)
print('no need to replace')

+--------------------+-------------+----------------+----------+-----------+----------+-----------+-----------------+-------------+--------+----------+---------+---------+--------+----+------------------+---------------------+-------------------+
|        ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|  DT_NAISS|VILLE_NAISS|PAYS_NAISS|   NUM_SECU|IND_PAYS_NUM_TELP|NUM_TELEPHONE|NUM_VOIE|  DSC_VOIE|CMPL_VOIE|CD_POSTAL|   VILLE|PAYS|FONCTION_PERSONNEL|TS_CREATION_PERSONNEL|  TS_MAJ_PPERSONNEL|
+--------------------+-------------+----------------+----------+-----------+----------+-----------+-----------------+-------------+--------+----------+---------+---------+--------+----+------------------+---------------------+-------------------+
|KeyPers_NewYork_1...|        Name0|       FistName0|2011-06-01|      Tokyo|     Japan|NS000000000|             NULL|+336##0264749|      33|NomVoie790|     NULL|    #9358|New-York| USA|Business Executive|  2021-07-10 15:48:40|2021-07-10 15:48:40|
|KeyPers_New

**ETL_MISSION   Unification des différentes langues**

In [96]:
sdf_mission_paris.show(3)
print('before replace')
sdf_mission_paris_1=sdf_mission_paris.select(sdf_mission_paris.TYPE_MISSION).distinct().show(20)
print('after replace')
sdf_mission_paris=sdf_mission_paris.replace({'Conférence':'Conference','Réunion':'Team Meeting','Développement':'Development','Formation':'Vocational Training','Rencontre entreprises':'Business Meeting'},subset=['TYPE_MISSION'])
sdf_mission_paris.select(sdf_mission_paris.TYPE_MISSION).distinct().show(20)

+---------------+--------------------+-------------+----------------+-------------------+-------------+------------+-----------+-----------------+----------------+---------+------------+
|     ID_MISSION|        ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|       DATE_MISSION| TYPE_MISSION|VILLE_DEPART|PAYS_DEPART|VILLE_DESTINATION|PAYS_DESTINATION|TRANSPORT|ALLER_RETOUR|
+---------------+--------------------+-------------+----------------+-------------------+-------------+------------+-----------+-----------------+----------------+---------+------------+
|Paris_202405180|KeyPers_Paris_123...|     Name3185|    FistName3185|2024-05-18 23:19:11|      Réunion|       Paris|     France|       Wellington|     New Zealand|    Avion|         oui|
|Paris_202405181|KeyPers_Paris_123...|      Name183|     FistName183|2024-05-18 10:54:36|Développement|       Paris|     France|           Sidney|       Australia|    Avion|         oui|
|Paris_202405182|KeyPers_Paris_123...|     Name2953|    FistName2

In [97]:
sdf_mission_shanghai.show(3)
sdf_mission_shanghai_1=sdf_mission_shanghai.select(sdf_mission_shanghai.TYPE_MISSION).distinct().show(20)
print('no need to replace')

+------------------+--------------------+-------------+----------------+-------------------+-------------------+------------+-----------+-----------------+----------------+---------+------------+
|        ID_MISSION|        ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|       DATE_MISSION|       TYPE_MISSION|VILLE_DEPART|PAYS_DEPART|VILLE_DESTINATION|PAYS_DESTINATION|TRANSPORT|ALLER_RETOUR|
+------------------+--------------------+-------------+----------------+-------------------+-------------------+------------+-----------+-----------------+----------------+---------+------------+
|Shanghai_202406050|KeyPers_Shanghai_...|      Name528|     FistName528|2024-06-05 15:10:01|        Development|    Shanghai|      China|      Los Angeles|             USA|    Avion|         oui|
|Shanghai_202406051|KeyPers_Shanghai_...|      Name794|     FistName794|2024-06-05 08:12:49|Vocational Training|    Shanghai|      China|            Dubaï|         Emirats|    Avion|         oui|
|Shanghai_202406052|

In [98]:
sdf_mission_berlin.show(3)
print('before replace')
sdf_mission_berlin_1=sdf_mission_berlin.select(sdf_mission_berlin.TYPE_MISSION).distinct().show(20)
print('after replace')
sdf_mission_berlin=sdf_mission_berlin.replace({'Geschäftstreffen':'Business Meeting','Konferenz':'Conference','Schulung':'Vocational Training','Meeting':'Team Meeting','Entwicklung':'Development'},subset=['TYPE_MISSION'])
sdf_mission_berlin.select(sdf_mission_berlin.TYPE_MISSION).distinct().show()

+----------------+--------------------+-------------+----------------+-------------------+------------+------------+-----------+-----------------+----------------+--------------------+------------+
|      ID_MISSION|        ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|       DATE_MISSION|TYPE_MISSION|VILLE_DEPART|PAYS_DEPART|VILLE_DESTINATION|PAYS_DESTINATION|           TRANSPORT|ALLER_RETOUR|
+----------------+--------------------+-------------+----------------+-------------------+------------+------------+-----------+-----------------+----------------+--------------------+------------+
|BERLIN_202411110|KeyPers_Berlin_12...|      Name238|     FistName238|2024-11-11 13:34:45|    Schulung|      Berlin|  Allemagne|     Buenos Aires|       Argentina|               Avion|         oui|
|BERLIN_202411111|KeyPers_Berlin_12...|      Name352|     FistName352|2024-11-11 03:42:11|    Schulung|      Berlin|  Allemagne|           Berlin|       Allemagne|Transports en commun|         oui|
|BERLIN_20

In [99]:
sdf_mission_london.show(3)
sdf_mission_london_1=sdf_mission_london.select(sdf_mission_london.TYPE_MISSION).distinct().show(20)
print('no need to replace')

+----------------+--------------------+-------------+----------------+-------------------+-------------------+------------+-----------+-----------------+----------------+---------+------------+
|      ID_MISSION|        ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|       DATE_MISSION|       TYPE_MISSION|VILLE_DEPART|PAYS_DEPART|VILLE_DESTINATION|PAYS_DESTINATION|TRANSPORT|ALLER_RETOUR|
+----------------+--------------------+-------------+----------------+-------------------+-------------------+------------+-----------+-----------------+----------------+---------+------------+
|LONDON_202410190|KeyPers_London_12...|     Name1718|    FistName1718|2024-10-19 01:15:13|Vocational Training|      London|    England|        Melbourne|       Australia|    Avion|         non|
|LONDON_202410191|KeyPers_London_12...|      Name403|     FistName403|2024-10-19 12:03:35|        Development|      London|    England|            Alger|         Algeria|    Avion|         oui|
|LONDON_202410192|KeyPers_Lond

In [100]:
sdf_mission_losangeles.show(3)
sdf_mission_losangeles_1=sdf_mission_losangeles.select(sdf_mission_losangeles.TYPE_MISSION).distinct().show(20)
print('no need to replace')

+------------+------------------+-------------+----------------+-------------------+-------------------+------------+-----------+-----------------+----------------+--------------------+------------+
|  ID_MISSION|      ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|       DATE_MISSION|       TYPE_MISSION|VILLE_DEPART|PAYS_DEPART|VILLE_DESTINATION|PAYS_DESTINATION|           TRANSPORT|ALLER_RETOUR|
+------------+------------------+-------------+----------------+-------------------+-------------------+------------+-----------+-----------------+----------------+--------------------+------------+
|LA_202405070|KeyPers_LA_1232039|     Name2039|    FistName2039|2024-05-07 07:03:44|        Development| Los Angeles|        USA|            Lille|          France|               Avion|         oui|
|LA_202405071|KeyPers_LA_1232529|     Name2529|    FistName2529|2024-05-07 22:57:21|Vocational Training| Los Angeles|        USA|      Los Angeles|             USA|Transports en commun|         non|
|LA_2

In [101]:
sdf_mission_newyork.show(3)
sdf_mission_newyork_1=sdf_mission_newyork.select(sdf_mission_newyork.TYPE_MISSION).distinct().show(20)
print('no need to replace')

+-----------------+--------------------+-------------+----------------+-------------------+-------------------+------------+-----------+-----------------+----------------+---------+------------+
|       ID_MISSION|        ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|       DATE_MISSION|       TYPE_MISSION|VILLE_DEPART|PAYS_DEPART|VILLE_DESTINATION|PAYS_DESTINATION|TRANSPORT|ALLER_RETOUR|
+-----------------+--------------------+-------------+----------------+-------------------+-------------------+------------+-----------+-----------------+----------------+---------+------------+
|NewYork_202405010|KeyPers_NewYork_1...|      Name589|     FistName589|2024-05-01 02:03:41|         Conference|    New-York|        USA|         Auckland|     New Zealand|    Avion|         oui|
|NewYork_202405011|KeyPers_NewYork_1...|     Name1060|    FistName1060|2024-05-01 21:53:59|   Business Meeting|    New-York|        USA|            Tunis|         Tunisie|    Avion|         oui|
|NewYork_202405012|KeyPer

**1er object modilisation d'etoile**

Dimension Site

In [102]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f


cities = ["paris", "shanghai", "berlin", "london", "losangeles", "newyork"]

sdf_dimension_site = spark.createDataFrame(
    [(i + 1, city) for i, city in enumerate(cities)],
    ["ID_SITE", "SITE"]
)

sdf_dimension_site.show()

+-------+----------+
|ID_SITE|      SITE|
+-------+----------+
|      1|     paris|
|      2|  shanghai|
|      3|    berlin|
|      4|    london|
|      5|losangeles|
|      6|   newyork|
+-------+----------+



Dimension Personnel

In [103]:
from pyspark.sql import functions as f

cities = ["paris", "shanghai", "berlin", "london", "losangeles", "newyork"]
dataframes = [
    sdf_person_paris.drop(*[col for col in sdf_person_paris.columns if col.startswith("key_")]),
    sdf_person_shanghai.drop(*[col for col in sdf_person_shanghai.columns if col.startswith("key_")]),
    sdf_person_berlin.drop(*[col for col in sdf_person_berlin.columns if col.startswith("key_")]),
    sdf_person_london.drop(*[col for col in sdf_person_london.columns if col.startswith("key_")]),
    sdf_person_losangeles.drop(*[col for col in sdf_person_losangeles.columns if col.startswith("key_")]),
    sdf_person_newyork.drop(*[col for col in sdf_person_newyork.columns if col.startswith("key_")])
]

sdf_dimension_personnel = None

for city, df in zip(cities, dataframes):
    city_df = df.withColumn("SITE", f.lit(city.upper())) \
                .select("SITE", *df.columns)  
    
    if sdf_dimension_personnel is None:
        sdf_dimension_personnel = city_df
    else:
        sdf_dimension_personnel = sdf_dimension_personnel.unionByName(city_df)

#sdf_dimension_personnel.printSchema()
sdf_dimension_personnel.show(10)



+-----+--------------------+-------------+----------------+----------+-----------+-----------+-----------+-----------------+-------------+--------+----------+---------+---------+-----+------+------------------+---------------------+-------------------+
| SITE|        ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|  DT_NAISS|VILLE_NAISS| PAYS_NAISS|   NUM_SECU|IND_PAYS_NUM_TELP|NUM_TELEPHONE|NUM_VOIE|  DSC_VOIE|CMPL_VOIE|CD_POSTAL|VILLE|  PAYS|FONCTION_PERSONNEL|TS_CREATION_PERSONNEL|  TS_MAJ_PPERSONNEL|
+-----+--------------------+-------------+----------------+----------+-----------+-----------+-----------+-----------------+-------------+--------+----------+---------+---------+-----+------+------------------+---------------------+-------------------+
|PARIS|KeyPers_Paris_123...|        Name0|       FistName0|1992-03-13|      Lille|     France|NS000000000|             NULL|+336##0151713|      22|NomVoie100|     NULL|    #9423|Paris|France| Computer Engineer|  2005-06-24 06:48:21|2005-06-2

In [104]:
print(f"Total:{sdf_dimension_personnel.count()}")
sdf_dimension_personnel.groupBy("SITE").count().show()

Total:20572
+----------+-----+
|      SITE|count|
+----------+-----+
|     PARIS| 5000|
|  SHANGHAI| 1000|
|    BERLIN| 4243|
|    LONDON| 4219|
|LOSANGELES| 3027|
|   NEWYORK| 3083|
+----------+-----+



Dimension Modele Materiel

先检验Modele有没有空项

In [105]:
from pyspark.sql import functions as f

# 城市列表和对应的 DataFrame
cities = {
    "paris": sdf_informatique_paris,
    "shanghai": sdf_informatique_shanghai,
    "berlin": sdf_informatique_berlin,
    "london": sdf_informatique_london,
    "losangeles": sdf_informatique_losangeles,
    "newyork": sdf_informatique_newyork
}

total_null_count = 0

# 自动替换每个城市的 MODELE 列中的空值为 "NULL"
for city, df in cities.items():
    print(f"\n🚀 {city.capitalize()} - 检查并替换 MODELE 列中的空值：")
    
    # 检查空值：包括 NULL、空字符串和仅包含空格
    sdf_modele_null = df.filter(
        (f.col("MODELE").isNull()) | (f.trim(f.col("MODELE")) == "")
    )
    
    # 统计空值数量
    null_count = sdf_modele_null.count()
    total_null_count += null_count
    
    if null_count > 0:
        print(f" {city.capitalize()} 中有 {null_count} 行 MODELE 是空值。")
        
        # 替换空值为字符串 "NULL"
        cities[city] = df.withColumn(
            "MODELE",
            f.when(
                (f.col("MODELE").isNull()) | (f.trim(f.col("MODELE")) == ""), 
                "NULL"
            ).otherwise(f.col("MODELE"))
        )
    else:
        print(f"✅ {city.capitalize()} 中没有空值。")

# 最后显示总空值数量
print(f"\n🚀 总共有 {total_null_count} 行 MODELE 是空值，已替换为 'NULL'。")

# 显示每个城市的结果（示例）
for city, df in cities.items():
    print(f"\n🚀 {city.capitalize()} - 替换后的 DataFrame：")
    df.show(10, truncate=False)


🚀 Paris - 检查并替换 MODELE 列中的空值：
 Paris 中有 131 行 MODELE 是空值。

🚀 Shanghai - 检查并替换 MODELE 列中的空值：
 Shanghai 中有 116 行 MODELE 是空值。

🚀 Berlin - 检查并替换 MODELE 列中的空值：
 Berlin 中有 53 行 MODELE 是空值。

🚀 London - 检查并替换 MODELE 列中的空值：
 London 中有 109 行 MODELE 是空值。

🚀 Losangeles - 检查并替换 MODELE 列中的空值：
 Losangeles 中有 120 行 MODELE 是空值。

🚀 Newyork - 检查并替换 MODELE 列中的空值：
 Newyork 中有 115 行 MODELE 是空值。

🚀 总共有 644 行 MODELE 是空值，已替换为 'NULL'。

🚀 Paris - 替换后的 DataFrame：
+-----------------------------+---------------------+-------------+----------------+-------------------+------------------+-----------------------+
|ID_MATERIELINFO              |ID_PERSONNEL         |NOM_PERSONNEL|PRENOM_PERSONNEL|DATE_ACHAT         |TYPE              |MODELE                 |
+-----------------------------+---------------------+-------------+----------------+-------------------+------------------+-----------------------+
|Paris_MATERIEL_INFO_202406250|KeyPers_Paris_1233515|Name3515     |FistName3515    |2024-06-25 11:37:24|Ecran      

In [106]:
from pyspark.sql import functions as f
from pyspark.sql import Window

# 城市列表和对应的 DataFrame
cities = {
    "paris": sdf_informatique_paris,
    "shanghai": sdf_informatique_shanghai,
    "berlin": sdf_informatique_berlin,
    "london": sdf_informatique_london,
    "losangeles": sdf_informatique_losangeles,
    "newyork": sdf_informatique_newyork
}

# 合并所有城市的 MODELE 数据
sdf_all_modele = None

for city, df in cities.items():
    # 替换空值为字符串 "NULL"
    df = df.withColumn(
        "MODELE",
        f.when(
            (f.col("MODELE").isNull()) | (f.trim(f.col("MODELE")) == ""), 
            "NULL"
        ).otherwise(f.col("MODELE"))
    )
    
    # 合并所有城市的 MODELE 数据
    if sdf_all_modele is None:
        sdf_all_modele = df.select("MODELE")
    else:
        sdf_all_modele = sdf_all_modele.union(df.select("MODELE"))

# 删除重复项并确保 "NULL" 作为唯一项
sdf_dimension_modele_materiel = sdf_all_modele.distinct()

# 为每个 MODELE 添加唯一的 ID
window_spec = Window.orderBy("MODELE").partitionBy(f.lit(1))
sdf_dimension_modele_materiel = sdf_dimension_modele_materiel.withColumn(
    "ID_MODELE", f.row_number().over(window_spec)
).select("ID_MODELE", "MODELE")

# 显示总数和数据
modele_count = sdf_dimension_modele_materiel.count()
print(f"\n🚀 总共有 {modele_count} 种唯一的 MODELE（型号）。")
sdf_dimension_modele_materiel.show(modele_count, truncate=False)


🚀 总共有 65 种唯一的 MODELE（型号）。
+---------+---------------------------+
|ID_MODELE|MODELE                     |
+---------+---------------------------+
|1        |24pouces-31pouces          |
|2        |32pouces et plus           |
|3        |Chromebook 11-13pouces     |
|4        |Chromebook 14-15pouces     |
|5        |Elite (Tower, SFF, One)    |
|6        |Elite mini                 |
|7        |EliteBook 10xx             |
|8        |EliteBook 6xx              |
|9        |EliteBook 8xx              |
|10       |EliteDesk (Tower, SFF, One)|
|11       |EliteDesk DM               |
|12       |Laser A3 (40-99kg)         |
|13       |Laser A3 (>100kg)          |
|14       |Laser à poser (<40kg)      |
|15       |Latitude 3xxx              |
|16       |Latitude 5xxx              |
|17       |Latitude 7xxx              |
|18       |Mac mini                   |
|19       |Mac pro                    |
|20       |MacBook air                |
|21       |MacBook air pre-retina     |
|22       |Ma

In [107]:
# 查找 NULL（字符串 "NULL"） 在 sdf_dimension_modele_materiel 中的位置
sdf_null_position = sdf_dimension_modele_materiel.filter(f.col("MODELE") == "NULL")

# 显示 NULL 在模型列表中的位置
sdf_null_position.show(truncate=False)

+---------+------+
|ID_MODELE|MODELE|
+---------+------+
|29       |NULL  |
+---------+------+



不对，前面的dimension要重新弄一下。  有两个表有问题sdf_dimension_type和sdf_dimension_modele_materiel。

Dimension Type Materiel

In [108]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# 城市列表和对应的 DataFrame
cities = {
    "paris": sdf_informatique_paris,
    "shanghai": sdf_informatique_shanghai,
    "berlin": sdf_informatique_berlin,
    "london": sdf_informatique_london,
    "losangeles": sdf_informatique_losangeles,
    "newyork": sdf_informatique_newyork
}

# 合并所有城市的 TYPE 列
sdf_all_type = None

for df in cities.values():
    if sdf_all_type is None:
        sdf_all_type = df.select("TYPE")
    else:
        sdf_all_type = sdf_all_type.union(df.select("TYPE"))

# 替换空白或无效的 TYPE 为 'NULL'
sdf_all_type = sdf_all_type.withColumn(
    "TYPE", 
    f.when(f.col("TYPE").isNull() | (f.trim(f.col("TYPE")) == ""), "NULL")
     .otherwise(f.col("TYPE"))
)

# 删除重复项，并按顺序排序
sdf_dimension_type = sdf_all_type.distinct().orderBy("TYPE")

# 为每个 TYPE 添加唯一的 ID
window_spec = Window.orderBy("TYPE").partitionBy(f.lit(1))
sdf_dimension_type = sdf_dimension_type.withColumn(
    "ID_TYPE", f.row_number().over(window_spec)
).select("ID_TYPE", "TYPE")

# 显示总数和数据
type_count = sdf_dimension_type.count()
print(f"\n🚀 总共有 {type_count} 种唯一的 TYPE（类型）。")
sdf_dimension_type.show(type_count, truncate=False)


🚀 总共有 16 种唯一的 TYPE（类型）。
+-------+------------------+
|ID_TYPE|TYPE              |
+-------+------------------+
|1      |Borne wifi        |
|2      |Disque dur        |
|3      |Ecran             |
|4      |NULL              |
|5      |PC fixe sans ecran|
|6      |PC fixe tout-en-un|
|7      |PC portable       |
|8      |Serveur           |
|9      |Smartphone        |
|10     |Station d'accueil |
|11     |Tablette          |
|12     |Telephone IP      |
|13     |Vidéo projecteur  |
|14     |clavier           |
|15     |imprimante        |
|16     |souris            |
+-------+------------------+



Dimension Date

Dimension Date Achat

去掉了具体的时间： 2024-10-03 16:48:26 去除了后面的具体时间，如果留着会有很大的数据冗余，且没有必要

In [109]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# 城市列表和对应的 DataFrame
cities = {
    "paris": sdf_informatique_paris,
    "shanghai": sdf_informatique_shanghai,
    "berlin": sdf_informatique_berlin,
    "london": sdf_informatique_london,
    "losangeles": sdf_informatique_losangeles,
    "newyork": sdf_informatique_newyork
}

# 合并所有城市的 DATE_ACHAT 数据
sdf_all_date = None

for df in cities.values():
    # 提取 DATE_ACHAT，并只保留日期（YYYY-MM-DD）
    df = df.withColumn("DATE_ACHAT", f.to_date(f.col("DATE_ACHAT")))
    
    if sdf_all_date is None:
        sdf_all_date = df.select("DATE_ACHAT")
    else:
        sdf_all_date = sdf_all_date.union(df.select("DATE_ACHAT"))

# 删除重复项，过滤空值，并按日期排序
sdf_dimension_date_achat = sdf_all_date.distinct() \
    .filter(f.col("DATE_ACHAT").isNotNull()) \
    .orderBy("DATE_ACHAT")

# 为每个 DATE_ACHAT 添加唯一的 ID
window_spec = Window.orderBy("DATE_ACHAT").partitionBy(f.lit(1))
sdf_dimension_date_achat = sdf_dimension_date_achat.withColumn(
    "ID_DATE_ACHAT", f.row_number().over(window_spec)
).select("ID_DATE_ACHAT", "DATE_ACHAT")

# 显示总数和数据
date_count = sdf_dimension_date_achat.count()
print(f"\n🚀 总共有 {date_count} 种唯一的 DATE_ACHAT（购买日期）。")
sdf_dimension_date_achat.show(date_count, truncate=False)


🚀 总共有 200 种唯一的 DATE_ACHAT（购买日期）。
+-------------+----------+
|ID_DATE_ACHAT|DATE_ACHAT|
+-------------+----------+
|1            |2024-04-29|
|2            |2024-04-30|
|3            |2024-05-01|
|4            |2024-05-02|
|5            |2024-05-03|
|6            |2024-05-04|
|7            |2024-05-05|
|8            |2024-05-06|
|9            |2024-05-07|
|10           |2024-05-08|
|11           |2024-05-09|
|12           |2024-05-10|
|13           |2024-05-11|
|14           |2024-05-12|
|15           |2024-05-13|
|16           |2024-05-14|
|17           |2024-05-15|
|18           |2024-05-16|
|19           |2024-05-17|
|20           |2024-05-18|
|21           |2024-05-19|
|22           |2024-05-20|
|23           |2024-05-21|
|24           |2024-05-22|
|25           |2024-05-23|
|26           |2024-05-24|
|27           |2024-05-25|
|28           |2024-05-26|
|29           |2024-05-27|
|30           |2024-05-28|
|31           |2024-05-29|
|32           |2024-05-30|
|33           |2024-0

Dimension Date Mission

In [110]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# 城市列表和对应的 DataFrame
cities = {
    "paris": sdf_mission_paris,
    "shanghai": sdf_mission_shanghai,
    "berlin": sdf_mission_berlin,
    "london": sdf_mission_london,
    "losangeles": sdf_mission_losangeles,
    "newyork": sdf_mission_newyork
}

# 合并所有城市的 DATE_MISSION 数据
sdf_all_date_mission = None

for df in cities.values():
    # 提取 DATE_MISSION，并只保留日期（YYYY-MM-DD）
    df = df.withColumn("DATE_MISSION", f.to_date(f.col("DATE_MISSION")))
    
    if sdf_all_date_mission is None:
        sdf_all_date_mission = df.select("DATE_MISSION")
    else:
        sdf_all_date_mission = sdf_all_date_mission.union(df.select("DATE_MISSION"))

# 删除重复项，过滤空值，并按日期排序
sdf_dimension_date_mission = sdf_all_date_mission.distinct() \
    .filter(f.col("DATE_MISSION").isNotNull()) \
    .orderBy("DATE_MISSION")

# 为每个 DATE_MISSION 添加唯一的 ID
window_spec = Window.orderBy("DATE_MISSION").partitionBy(f.lit(1))
sdf_dimension_date_mission = sdf_dimension_date_mission.withColumn(
    "ID_DATE_MISSION", f.row_number().over(window_spec)
).select("ID_DATE_MISSION", "DATE_MISSION")

# 显示总数和数据
date_count_mission = sdf_dimension_date_mission.count()
print(f"\n🚀 总共有 {date_count_mission} 种唯一的 DATE_MISSION（任务日期）。")
sdf_dimension_date_mission.show(date_count_mission, truncate=False)


🚀 总共有 200 种唯一的 DATE_MISSION（任务日期）。
+---------------+------------+
|ID_DATE_MISSION|DATE_MISSION|
+---------------+------------+
|1              |2024-04-29  |
|2              |2024-04-30  |
|3              |2024-05-01  |
|4              |2024-05-02  |
|5              |2024-05-03  |
|6              |2024-05-04  |
|7              |2024-05-05  |
|8              |2024-05-06  |
|9              |2024-05-07  |
|10             |2024-05-08  |
|11             |2024-05-09  |
|12             |2024-05-10  |
|13             |2024-05-11  |
|14             |2024-05-12  |
|15             |2024-05-13  |
|16             |2024-05-14  |
|17             |2024-05-15  |
|18             |2024-05-16  |
|19             |2024-05-17  |
|20             |2024-05-18  |
|21             |2024-05-19  |
|22             |2024-05-20  |
|23             |2024-05-21  |
|24             |2024-05-22  |
|25             |2024-05-23  |
|26             |2024-05-24  |
|27             |2024-05-25  |
|28             |2024-05-26  |
|29

这个就是把两个：mission和informatique表的时间放一起，因为可以看出两个表的时间是一样的

In [111]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# 合并 DATE_ACHAT 和 DATE_MISSION 两个日期表
sdf_all_dates = sdf_dimension_date_achat.select("DATE_ACHAT").withColumnRenamed("DATE_ACHAT", "DATE") \
    .union(sdf_dimension_date_mission.select("DATE_MISSION").withColumnRenamed("DATE_MISSION", "DATE"))

# 删除重复项并按日期排序
sdf_dimension_date = sdf_all_dates.distinct() \
    .filter(f.col("DATE").isNotNull()) \
    .orderBy("DATE")

# 为每个日期分配唯一的 ID
window_spec = Window.orderBy("DATE").partitionBy(f.lit(1))
sdf_dimension_date = sdf_dimension_date.withColumn(
    "ID_DATE", f.row_number().over(window_spec)
).select("ID_DATE", "DATE")

# 显示总数和数据
date_count = sdf_dimension_date.count()
print(f"\n🚀 总共有 {date_count} 种唯一的 DATE（日期）。")
sdf_dimension_date.show(date_count, truncate=False)


🚀 总共有 200 种唯一的 DATE（日期）。


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/anaconda3/envs/pyspark_env/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/envs/pyspark_env/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/envs/pyspark_env/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Dimension Transport

In [None]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# 城市列表和对应的 DataFrame
cities = {
    "paris": sdf_mission_paris,
    "shanghai": sdf_mission_shanghai,
    "berlin": sdf_mission_berlin,
    "london": sdf_mission_london,
    "losangeles": sdf_mission_losangeles,
    "newyork": sdf_mission_newyork
}

# 合并所有城市的 TRANSPORT 数据
sdf_all_transport = None

for df in cities.values():
    if sdf_all_transport is None:
        sdf_all_transport = df.select("TRANSPORT")
    else:
        sdf_all_transport = sdf_all_transport.union(df.select("TRANSPORT"))

# 删除重复项，过滤空值，并按字母顺序排序
sdf_dimension_transport = sdf_all_transport.distinct() \
    .filter((f.col("TRANSPORT").isNotNull()) & (f.trim(f.col("TRANSPORT")) != "")) \
    .orderBy("TRANSPORT")

# 为每种 TRANSPORT 添加唯一的 ID
window_spec = Window.orderBy("TRANSPORT").partitionBy(f.lit(1))
sdf_dimension_transport = sdf_dimension_transport.withColumn(
    "ID_TRANSPORT", f.row_number().over(window_spec)
).select("ID_TRANSPORT", "TRANSPORT")

# 显示总数和数据
transport_count = sdf_dimension_transport.count()
print(f"\n🚀 总共有 {transport_count} 种唯一的 TRANSPORT（运输方式）。")
sdf_dimension_transport.show(transport_count, truncate=False)


🚀 总共有 4 种唯一的 TRANSPORT（运输方式）。
+------------+--------------------+
|ID_TRANSPORT|TRANSPORT           |
+------------+--------------------+
|1           |Avion               |
|2           |Taxi                |
|3           |Train               |
|4           |Transports en commun|
+------------+--------------------+



Dimension Mission

In [None]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# 城市列表和对应的 DataFrame
cities = {
    "paris": sdf_mission_paris,
    "shanghai": sdf_mission_shanghai,
    "berlin": sdf_mission_berlin,
    "london": sdf_mission_london,
    "losangeles": sdf_mission_losangeles,
    "newyork": sdf_mission_newyork
}

# 合并所有城市的 TYPE_MISSION 数据
sdf_all_type_mission = None

for df in cities.values():
    if sdf_all_type_mission is None:
        sdf_all_type_mission = df.select("TYPE_MISSION")
    else:
        sdf_all_type_mission = sdf_all_type_mission.union(df.select("TYPE_MISSION"))

# 删除重复项，过滤空值，并按字母顺序排序
sdf_dimension_mission = sdf_all_type_mission.distinct() \
    .filter((f.col("TYPE_MISSION").isNotNull()) & (f.trim(f.col("TYPE_MISSION")) != "")) \
    .orderBy("TYPE_MISSION")

# 为每种 TYPE_MISSION 添加唯一的 ID
window_spec = Window.orderBy("TYPE_MISSION").partitionBy(f.lit(1))
sdf_dimension_mission = sdf_dimension_mission.withColumn(
    "ID_TYPE_MISSION", f.row_number().over(window_spec)
).select("ID_TYPE_MISSION", "TYPE_MISSION")

# 显示总数和数据
type_mission_count = sdf_dimension_mission.count()
print(f"\n🚀 总共有 {type_mission_count} 种唯一的 TYPE_MISSION（任务类型）。")
sdf_dimension_mission.show(type_mission_count, truncate=False)


🚀 总共有 5 种唯一的 TYPE_MISSION（任务类型）。
+---------------+-------------------+
|ID_TYPE_MISSION|TYPE_MISSION       |
+---------------+-------------------+
|1              |Business Meeting   |
|2              |Conference         |
|3              |Development        |
|4              |Team Meeting       |
|5              |Vocational Training|
+---------------+-------------------+



Dimension Location

In [None]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# 城市列表和对应的 DataFrame
cities = {
    "paris": sdf_mission_paris,
    "shanghai": sdf_mission_shanghai,
    "berlin": sdf_mission_berlin,
    "london": sdf_mission_london,
    "losangeles": sdf_mission_losangeles,
    "newyork": sdf_mission_newyork
}

# 合并所有城市的 VILLE 和 PAYS 数据（出发地和目的地）
sdf_all_location = None

for df in cities.values():
    # 提取 VILLE 和 PAYS 列（包括出发地和目的地）
    df_ville_pays = df.select("VILLE_DEPART", "PAYS_DEPART").withColumnRenamed("VILLE_DEPART", "VILLE").withColumnRenamed("PAYS_DEPART", "PAYS") \
        .union(df.select("VILLE_DESTINATION", "PAYS_DESTINATION").withColumnRenamed("VILLE_DESTINATION", "VILLE").withColumnRenamed("PAYS_DESTINATION", "PAYS"))
    
    if sdf_all_location is None:
        sdf_all_location = df_ville_pays
    else:
        sdf_all_location = sdf_all_location.union(df_ville_pays)

# 删除重复项，过滤空值，并按字母顺序排序
sdf_dimension_location = sdf_all_location.distinct() \
    .filter((f.col("VILLE").isNotNull()) & (f.trim(f.col("VILLE")) != "")) \
    .filter((f.col("PAYS").isNotNull()) & (f.trim(f.col("PAYS")) != "")) \
    .orderBy("PAYS", "VILLE")

# 为每个唯一的 VILLE 和 PAYS 组合分配唯一的 ID
window_spec = Window.orderBy("PAYS", "VILLE").partitionBy(f.lit(1))
sdf_dimension_location = sdf_dimension_location.withColumn(
    "ID_LOCATION", f.row_number().over(window_spec)
).select("ID_LOCATION", "VILLE", "PAYS")

# 显示总数和数据
location_count = sdf_dimension_location.count()
print(f"\n🚀 总共有 {location_count} 种唯一的 LOCATION（城市和国家）。")
sdf_dimension_location.show(location_count, truncate=False)

                                                                                


🚀 总共有 33 种唯一的 LOCATION（城市和国家）。




+-----------+--------------+-----------+
|ID_LOCATION|VILLE         |PAYS       |
+-----------+--------------+-----------+
|1          |Alger         |Algeria    |
|2          |Berlin        |Allemagne  |
|3          |Buenos Aires  |Argentina  |
|4          |Melbourne     |Australia  |
|5          |Sidney        |Australia  |
|6          |Rio de Janeiro|Brazil     |
|7          |Sao Paulo     |Brazil     |
|8          |Montreal      |Canada     |
|9          |Vancouver     |Canada     |
|10         |Pekin         |China      |
|11         |Shanghai      |China      |
|12         |Bogota        |Colombia   |
|13         |Dubaï         |Emirats    |
|14         |London        |England    |
|15         |Helsinki      |Finlande   |
|16         |Bordeaux      |France     |
|17         |Compiègne     |France     |
|18         |Lille         |France     |
|19         |Marseille     |France     |
|20         |Paris         |France     |
|21         |Osaka         |Japan      |
|22         |Tok

                                                                                

FAIT Mission

In [None]:
from pyspark.sql import functions as f

# 城市列表和对应的 DataFrame
cities = [
    "paris", "shanghai", "berlin", "london", "losangeles", "newyork"
]

# 初始化 fait_mission
fait_mission = None

for city in cities:
    # 动态加载每个城市的 DataFrame
    sdf_person = globals().get(f"sdf_person_{city}")
    sdf_mission = globals().get(f"sdf_mission_{city}")

    if sdf_person is None:
        print(f"❌ {city} 数据表未找到。")
        continue

    # 获取 ID_SITE
    site_row = sdf_dimension_site.filter(f.col("SITE") == city.lower()).select("ID_SITE").first()

    if site_row is None:
        print(f"❌ {city} 在 sdf_dimension_site 中未找到。")
        continue

    site_id = site_row[0]

    # 保留所有人员
    city_fait = sdf_person.withColumn("ID_SITE", f.lit(site_id))

    # 左连接任务表（如果存在）
    if sdf_mission is not None:
        sdf_mission = sdf_mission.withColumn("DATE_MISSION", f.to_date(f.col("DATE_MISSION")))
        city_fait = city_fait.join(sdf_mission, "ID_PERSONNEL", "left") \
            .join(sdf_dimension_date, sdf_mission["DATE_MISSION"] == sdf_dimension_date["DATE"], "left") \
            .join(sdf_dimension_mission, sdf_mission["TYPE_MISSION"] == sdf_dimension_mission["TYPE_MISSION"], "left") \
            .join(sdf_dimension_transport, sdf_mission["TRANSPORT"] == sdf_dimension_transport["TRANSPORT"], "left") \
            .join(sdf_dimension_location.alias("depart"), (sdf_mission["VILLE_DEPART"] == f.col("depart.VILLE")) & (sdf_mission["PAYS_DEPART"] == f.col("depart.PAYS")), "left") \
            .join(sdf_dimension_location.alias("dest"), (sdf_mission["VILLE_DESTINATION"] == f.col("dest.VILLE")) & (sdf_mission["PAYS_DESTINATION"] == f.col("dest.PAYS")), "left")

    # 选择最终列，任务和设备数据可能为空
    city_fait = city_fait.select(
        f.lit(None).alias("ID_FAIT_MISSION"),
        f.lit(site_id).alias("ID_SITE"),
        sdf_person["ID_PERSONNEL"],
        sdf_dimension_date["ID_DATE"],
        f.col("depart.ID_LOCATION").alias("ID_DEPART"),
        f.col("dest.ID_LOCATION").alias("ID_DESTINATION"),
        sdf_dimension_mission["ID_TYPE_MISSION"].alias("ID_MISSION"),
        sdf_dimension_transport["ID_TRANSPORT"],
        sdf_mission["ALLER_RETOUR"]
    )

    # 合并每个城市的任务数据
    if fait_mission is None:
        fait_mission = city_fait
    else:
        fait_mission = fait_mission.unionByName(city_fait)

# 为 ID_FAIT_MISSION 添加唯一自增 ID（按城市分区）
from pyspark.sql.window import Window
fait_mission = fait_mission.withColumn(
    "ID_FAIT_MISSION",
    f.row_number().over(Window.partitionBy("ID_SITE").orderBy("ID_PERSONNEL"))
)

fait_mission.show(20, truncate=False)


                                                                                

+---------------+-------+---------------------+-------+---------+--------------+----------+------------+------------+
|ID_FAIT_MISSION|ID_SITE|ID_PERSONNEL         |ID_DATE|ID_DEPART|ID_DESTINATION|ID_MISSION|ID_TRANSPORT|ALLER_RETOUR|
+---------------+-------+---------------------+-------+---------+--------------+----------+------------+------------+
|1              |1      |KeyPers_Paris_1230000|NULL   |NULL     |NULL          |NULL      |NULL        |NULL        |
|2              |1      |KeyPers_Paris_1230001|NULL   |NULL     |NULL          |NULL      |NULL        |NULL        |
|3              |1      |KeyPers_Paris_1230002|NULL   |NULL     |NULL          |NULL      |NULL        |NULL        |
|4              |1      |KeyPers_Paris_1230003|NULL   |NULL     |NULL          |NULL      |NULL        |NULL        |
|5              |1      |KeyPers_Paris_1230004|85     |20       |16            |3         |1           |oui         |
|6              |1      |KeyPers_Paris_1230004|82     |2

查看每个非NULL的有多少项，因为每个人都可能

In [None]:
# 统计每个城市中非 NULL 的任务项数量
for city in cities:
    site_row = sdf_dimension_site.filter(f.col("SITE") == city.lower()).select("ID_SITE").first()
    if site_row is None:
        continue
    
    site_id = site_row[0]
    print(f"\n🚀 {city.capitalize()} (ID_SITE = {site_id}) - 非 NULL 项数量：")
    
    non_null_count = fait_mission.filter(
        (f.col("ID_SITE") == site_id) & 
        (f.col("ID_DATE").isNotNull() | 
         f.col("ID_DEPART").isNotNull() | 
         f.col("ID_DESTINATION").isNotNull() | 
         f.col("ID_MISSION").isNotNull() | 
         f.col("ID_TRANSPORT").isNotNull() | 
         f.col("ALLER_RETOUR").isNotNull())
    ).count()
    
    print(f"✅ 非 NULL 任务数量：{non_null_count}")


🚀 Paris (ID_SITE = 1) - 非 NULL 项数量：


                                                                                

✅ 非 NULL 任务数量：4518

🚀 Shanghai (ID_SITE = 2) - 非 NULL 项数量：


                                                                                

✅ 非 NULL 任务数量：2392

🚀 Berlin (ID_SITE = 3) - 非 NULL 项数量：


                                                                                

✅ 非 NULL 任务数量：4642

🚀 London (ID_SITE = 4) - 非 NULL 项数量：


                                                                                

✅ 非 NULL 任务数量：4239

🚀 Losangeles (ID_SITE = 5) - 非 NULL 项数量：


                                                                                

✅ 非 NULL 任务数量：4442

🚀 Newyork (ID_SITE = 6) - 非 NULL 项数量：


                                                                                

✅ 非 NULL 任务数量：3950


FAIT Materiel

In [None]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# 城市列表和对应的 DataFrame
cities = [
    "paris", "shanghai", "berlin", "london", "losangeles", "newyork"
]

# 初始化 fait_materiel
fait_materiel = None

for city in cities:
    # 动态加载每个城市的 DataFrame
    sdf_person = globals().get(f"sdf_person_{city}")
    sdf_informatique = globals().get(f"sdf_informatique_{city}")

    if sdf_person is None:
        print(f"❌ {city} 数据表未找到。")
        continue

    # 获取 ID_SITE
    site_row = sdf_dimension_site.filter(f.col("SITE") == city.lower()).select("ID_SITE").first()

    if site_row is None:
        print(f"❌ {city} 在 sdf_dimension_site 中未找到。")
        continue

    site_id = site_row[0]

    # 保留所有人员
    city_fait = sdf_person.withColumn("ID_SITE", f.lit(site_id))

    # 左连接设备表（如果存在）
    if sdf_informatique is not None:
        sdf_informatique = sdf_informatique.withColumn("DATE_ACHAT", f.to_date(f.col("DATE_ACHAT")))

        city_fait = city_fait.join(sdf_informatique, "ID_PERSONNEL", "left")

    # 选择最终列，设备数据可能为空
    city_fait = city_fait.select(
        f.lit(None).alias("ID_FAIT_MATERIEL"),
        f.lit(site_id).alias("ID_SITE"),
        sdf_person["ID_PERSONNEL"],
        f.col("DATE_ACHAT").alias("ID_DATE"),
        f.col("TYPE").alias("ID_TYPE"),
        f.col("MODELE").alias("ID_MODELE")
    )

    # 合并每个城市的设备数据
    if fait_materiel is None:
        fait_materiel = city_fait
    else:
        fait_materiel = fait_materiel.unionByName(city_fait)

# 为 ID_FAIT_MATERIEL 添加唯一自增 ID（按城市和 ID_PERSONNEL 排序）
fait_materiel = fait_materiel.withColumn(
    "ID_FAIT_MATERIEL",
    f.row_number().over(Window.partitionBy("ID_SITE").orderBy("ID_PERSONNEL"))
)

fait_materiel.show(20, truncate=False)


+----------------+-------+---------------------+----------+------------------+--------------------+
|ID_FAIT_MATERIEL|ID_SITE|ID_PERSONNEL         |ID_DATE   |ID_TYPE           |ID_MODELE           |
+----------------+-------+---------------------+----------+------------------+--------------------+
|1               |1      |KeyPers_Paris_1230000|NULL      |NULL              |NULL                |
|2               |1      |KeyPers_Paris_1230001|NULL      |NULL              |NULL                |
|3               |1      |KeyPers_Paris_1230002|2024-05-18|souris            |modèle par défaut   |
|4               |1      |KeyPers_Paris_1230003|NULL      |NULL              |NULL                |
|5               |1      |KeyPers_Paris_1230004|2024-10-17|PC fixe sans ecran|                    |
|6               |1      |KeyPers_Paris_1230005|NULL      |NULL              |NULL                |
|7               |1      |KeyPers_Paris_1230006|NULL      |NULL              |NULL                |


In [None]:
# 筛选出 ID_DATE、ID_TYPE、ID_MODELE 任意一个不是 NULL 的行
non_null_rows = fait_materiel.filter(
    (f.col("ID_DATE").isNotNull()) |
    (f.col("ID_TYPE").isNotNull()) |
    (f.col("ID_MODELE").isNotNull())
)

# 显示这些非 NULL 的行
non_null_rows.show(truncate=False)

+----------------+-------+---------------------+----------+------------------+---------------------------+
|ID_FAIT_MATERIEL|ID_SITE|ID_PERSONNEL         |ID_DATE   |ID_TYPE           |ID_MODELE                  |
+----------------+-------+---------------------+----------+------------------+---------------------------+
|3               |1      |KeyPers_Paris_1230002|2024-05-18|souris            |modèle par défaut          |
|5               |1      |KeyPers_Paris_1230004|2024-10-17|PC fixe sans ecran|                           |
|8               |1      |KeyPers_Paris_1230007|2024-09-12|                  |Latitude 7xxx              |
|9               |1      |KeyPers_Paris_1230007|2024-09-10|PC portable       |Moyenne 14/15pouces        |
|10              |1      |KeyPers_Paris_1230008|2024-10-24|PC fixe sans ecran|                           |
|11              |1      |KeyPers_Paris_1230009|2024-08-12|                  |modèle par défaut          |
|13              |1      |KeyPers_Par

In [None]:
# 统计 ID_DATE、ID_TYPE、ID_MODELE 任意一个不是 NULL 的行数
non_null_count = fait_materiel.filter(
    (f.col("ID_DATE").isNotNull()) |
    (f.col("ID_TYPE").isNotNull()) |
    (f.col("ID_MODELE").isNotNull())
).count()

print(f"\n🚀 共有 {non_null_count} 行，其中 ID_DATE、ID_TYPE 或 ID_MODELE 至少有一个非 NULL。")


🚀 共有 8204 行，其中 ID_DATE、ID_TYPE 或 ID_MODELE 至少有一个非 NULL。


## 在上面我们知道在这个点子设备的表中，我们存在很多的有的是模型没有，有的设备type没有的情况，所谓我们需要使用机器学习的方法来替代这些的空白的值 ##

In [None]:
import pyspark.pandas as ps
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB


psdf_train = ps.concat([
    psdf_informatique_berlin, 
    psdf_informatique_paris, 
    psdf_informatique_london, 
    psdf_informatique_losangeles, 
    psdf_informatique_newyork, 
    psdf_informatique_shanghai
], axis=0)


psdf_train = psdf_train.dropna(subset=["TYPE", "MODELE"])


X_type_train = psdf_train["MODELE"].to_pandas()
y_type_train = psdf_train["TYPE"].to_pandas()

X_model_train = psdf_train["TYPE"].to_pandas()
y_model_train = psdf_train["MODELE"].to_pandas()


type_vectorizer = CountVectorizer().fit(X_type_train)
model_vectorizer = CountVectorizer().fit(X_model_train)

type_classifier = MultinomialNB().fit(type_vectorizer.transform(X_type_train), y_type_train)
model_classifier = MultinomialNB().fit(model_vectorizer.transform(X_model_train), y_model_train)






In [None]:
df = sdf_informatique_paris.toPandas()
typeindextopredict=df[(df['TYPE'].isnull()) | (df['TYPE'].str.strip() == "")].index
typetopredict=df.loc[typeindextopredict,'MODELE']

modelindextopredict=df[(df['MODELE'].isnull()) | (df['MODELE'].str.strip() == "")].index
modeltopredict=df.loc[modelindextopredict,'TYPE']

if typetopredict.empty:
    print("NO NULL TYPE")
else:
    
    print(f"PREDICTING {len(typetopredict)} TYPE...")
    predicted_types = type_classifier.predict(type_vectorizer.transform(typetopredict))
    df.loc[typeindextopredict, 'TYPE'] = predicted_types
    

if modeltopredict.empty:
    print("NO NULL MODEL")
else:
    print(f"PREDICTING {len(modeltopredict)} MODEL...")
    predicted_models = model_classifier.predict(model_vectorizer.transform(modeltopredict))
    df.loc[modelindextopredict, 'MODELE'] = predicted_models

sdf_informatique_paris = spark.createDataFrame(df)
sdf_informatique_paris.show(20)






NO NULL TYPE
NO NULL MODEL
+--------------------+--------------------+-------------+----------------+-------------------+------------------+--------------------+
|     ID_MATERIELINFO|        ID_PERSONNEL|NOM_PERSONNEL|PRENOM_PERSONNEL|         DATE_ACHAT|              TYPE|              MODELE|
+--------------------+--------------------+-------------+----------------+-------------------+------------------+--------------------+
|Paris_MATERIEL_IN...|KeyPers_Paris_123...|     Name3515|    FistName3515|2024-06-25 11:37:24|             Ecran|   modèle par défaut|
|Paris_MATERIEL_IN...|KeyPers_Paris_123...|     Name2092|    FistName2092|2024-06-25 10:48:59|  Vidéo projecteur|   modèle par défaut|
|Paris_MATERIEL_IN...|KeyPers_Paris_123...|     Name1450|    FistName1450|2024-06-25 10:52:46|PC fixe sans ecran|Elite (Tower, SFF...|
|Paris_MATERIEL_IN...|KeyPers_Paris_123...|     Name4268|    FistName4268|2024-06-25 11:55:34|        imprimante|  Laser A3 (40-99kg)|
|Paris_MATERIEL_IN...|KeyPer


🔍 处理 ID_MATERIELINFO 表：
NO NULL TYPE
NO NULL MODEL

🔍 处理 ID_MATERIELINFO 表：
PREDICTING 126 TYPE...
PREDICTING 53 MODEL...

🔍 处理 ID_MATERIELINFO 表：
PREDICTING 296 TYPE...
PREDICTING 109 MODEL...

🔍 处理 ID_MATERIELINFO 表：
PREDICTING 274 TYPE...
PREDICTING 120 MODEL...

🔍 处理 ID_MATERIELINFO 表：
PREDICTING 293 TYPE...
PREDICTING 115 MODEL...

🔍 处理 ID_MATERIELINFO 表：
PREDICTING 322 TYPE...
PREDICTING 116 MODEL...


KeyError: 'ID'

**GEOPY表示距离**

In [None]:
from geopy import distance
from geopy.geocoders import Nominatim
geolocator = Nominatim(user_agent="NF26_UTC",timeout=10)
location_p = geolocator.geocode("Paris, France")
print((location_p.latitude, location_p.longitude))
location_l = geolocator.geocode("Los Angeles, USA")
print((location_l.latitude, location_l.longitude))
location_s = geolocator.geocode("Shanghai, China")
print((location_s.latitude, location_s.longitude))
location_n = geolocator.geocode("New York, USA")
print((location_n.latitude, location_n.longitude))
location_b = geolocator.geocode("Berlin, Allemagne")
print((location_b.latitude, location_b.longitude))
location_london = geolocator.geocode("London, England")
print((location_london.latitude, location_london.longitude))



(48.8588897, 2.320041)
(34.0536909, -118.242766)
(31.2312707, 121.4700152)
(40.7127281, -74.0060152)
(52.510885, 13.3989367)
(51.5074456, -0.1277653)


In [None]:
def get_lat_lon(city, country,):
    try:
        location = geolocator.geocode(f"{city}, {country}")
        if location:
            return location.latitude, location.longitude
        else:
            return None, None
    except Exception as e:
        print(f"Error retrieving coordinates for {city}, {country}: {e}")
        return None, None
sdf_lat_lon_paris = sdf_mission_paris.select('VILLE_DEPART','PAYS_DEPART','VILLE_DESTINATION','PAYS_DESTINATION','TRANSPORT','ALLER_RETOUR')
psdf_lat_lon_paris = sdf_lat_lon_paris.toPandas()
psdf_lat_lon_paris.head(3)
sdf0=sdf_mission_paris.select('VILLE_DEPART','PAYS_DEPART').distinct()
psdf0 = sdf0.toPandas()
results_depart=psdf0.apply(lambda row:pd.Series(get_lat_lon(row['VILLE_DEPART'], row['PAYS_DEPART'])), axis=1)
print(results_depart)



           0           1
0  34.053691 -118.242766
1  48.858890    2.320041
2  31.231271  121.470015
3  40.712728  -74.006015
4  51.507446   -0.127765
5  52.510885   13.398937


In [None]:
psdf0[['latitude', 'longitude']] = results_depart
psdf0.head(3)

Unnamed: 0,VILLE_DEPART,PAYS_DEPART,latitude,longitude
0,Los Angeles,USA,34.053691,-118.242766
1,Paris,France,48.85889,2.320041
2,Shanghai,China,31.231271,121.470015


In [None]:
print(sdf_mission_paris.select('VILLE_DESTINATION', 'PAYS_DESTINATION').distinct().count())
sdf1 = sdf_mission_paris.select('VILLE_DESTINATION', 'PAYS_DESTINATION').distinct()
psdf1 = sdf1.toPandas()
psdf1.head(3)



32


Unnamed: 0,VILLE_DESTINATION,PAYS_DESTINATION
0,Dubaï,Emirats
1,Los Angeles,USA
2,Wellington,New Zealand


In [None]:
#####运算时见较长
results_destination=psdf1.apply(lambda row:pd.Series(get_lat_lon(row['VILLE_DESTINATION'], row['PAYS_DESTINATION'])), axis=1)
print(results_destination)


            0           1
0   25.264723   55.292415
1   34.053691 -118.242766
2  -41.288795  174.777211
3   23.658512 -102.007710
4   60.167488   24.942747
5   31.231271  121.470015
6  -36.852095  174.763180
7    4.653411  -74.083655
8   49.260872 -123.113952
9   50.636565    3.063528
10  34.021845   -6.840893
11  59.325117   18.071093
12  49.417950    2.826317
13  51.507446   -0.127765
14  36.772933    3.058845
15  35.676860  139.763895
16 -23.550651  -46.633382
17  45.503182  -73.569806
18  40.190632  116.412144
19  38.895037  -77.036543
20 -22.911014  -43.209373
21 -12.062107  -77.036526
22  43.296174    5.369953
23  36.800207   10.185776
24  52.510885   13.398937
25  40.712728  -74.006015
26  44.841225   -0.580036
27 -34.608370  -58.444058
28  59.913330   10.738970
29  34.693757  135.501454
30 -37.814245  144.963173
31 -33.869844  151.208285


In [None]:
psdf1[['latitude', 'longitude']] = results_destination
psdf1.head(3)

Unnamed: 0,VILLE_DESTINATION,PAYS_DESTINATION,latitude,longitude
0,Dubaï,Emirats,25.264723,55.292415
1,Los Angeles,USA,34.053691,-118.242766
2,Wellington,New Zealand,-41.288795,174.777211


In [None]:
merged_depart=psdf_lat_lon_paris.merge(psdf0, on=['VILLE_DEPART','PAYS_DEPART'], how='left')
merged_depart=merged_depart[['VILLE_DEPART','PAYS_DEPART','latitude','longitude']]
psdf_lat_lon_paris[['latitude_depart','longitude_depart']]=merged_depart[['latitude','longitude']]
print(psdf_lat_lon_paris.shape[0])
psdf_lat_lon_paris.head(3)


4518


Unnamed: 0,VILLE_DEPART,PAYS_DEPART,VILLE_DESTINATION,PAYS_DESTINATION,TRANSPORT,ALLER_RETOUR,latitude_depart,longitude_depart
0,Paris,France,Wellington,New Zealand,Avion,oui,48.85889,2.320041
1,Paris,France,Sidney,Australia,Avion,oui,48.85889,2.320041
2,Paris,France,Auckland,New Zealand,Avion,oui,48.85889,2.320041


In [None]:
merged_destination=psdf_lat_lon_paris.merge(psdf1, on=['VILLE_DESTINATION','PAYS_DESTINATION'], how='left')
merged_destination=merged_destination[['VILLE_DESTINATION','PAYS_DESTINATION','latitude','longitude']]
psdf_lat_lon_paris[['latitude_destination','longitude_destination']]=merged_destination[['latitude','longitude']]
print(psdf_lat_lon_paris.shape[0])
psdf_lat_lon_paris.head(3)

4518


Unnamed: 0,VILLE_DEPART,PAYS_DEPART,VILLE_DESTINATION,PAYS_DESTINATION,TRANSPORT,ALLER_RETOUR,latitude_depart,longitude_depart,latitude_destination,longitude_destination
0,Paris,France,Wellington,New Zealand,Avion,oui,48.85889,2.320041,-41.288795,174.777211
1,Paris,France,Sidney,Australia,Avion,oui,48.85889,2.320041,-33.869844,151.208285
2,Paris,France,Auckland,New Zealand,Avion,oui,48.85889,2.320041,-36.852095,174.76318


In [None]:
psdf_lat_lon_paris['each_distance']=psdf_lat_lon_paris.apply(lambda row: distance.distance((row['latitude_depart'], row['longitude_depart']), (row['latitude_destination'], row['longitude_destination'])).km, axis=1)
psdf_lat_lon_paris.head(3)
psdf_lat_lon_paris['ALLER_RETOUR']=psdf_lat_lon_paris['ALLER_RETOUR'].replace({'oui':2,'non':1})
psdf_lat_lon_paris.head(3)

  psdf_lat_lon_paris['ALLER_RETOUR']=psdf_lat_lon_paris['ALLER_RETOUR'].replace({'oui':2,'non':1})


Unnamed: 0,VILLE_DEPART,PAYS_DEPART,VILLE_DESTINATION,PAYS_DESTINATION,TRANSPORT,ALLER_RETOUR,latitude_depart,longitude_depart,latitude_destination,longitude_destination,each_distance
0,Paris,France,Wellington,New Zealand,Avion,2,48.85889,2.320041,-41.288795,174.777211,18980.624227
1,Paris,France,Sidney,Australia,Avion,2,48.85889,2.320041,-33.869844,151.208285,16959.632942
2,Paris,France,Auckland,New Zealand,Avion,2,48.85889,2.320041,-36.852095,174.76318,18538.879977


In [None]:
psdf_lat_lon_paris['total_distance']=psdf_lat_lon_paris['each_distance']*psdf_lat_lon_paris['ALLER_RETOUR']
psdf_lat_lon_paris.head(20)

Unnamed: 0,VILLE_DEPART,PAYS_DEPART,VILLE_DESTINATION,PAYS_DESTINATION,TRANSPORT,ALLER_RETOUR,latitude_depart,longitude_depart,latitude_destination,longitude_destination,each_distance,total_distance
0,Paris,France,Wellington,New Zealand,Avion,2,48.85889,2.320041,-41.288795,174.777211,18980.624227,37961.248453
1,Paris,France,Sidney,Australia,Avion,2,48.85889,2.320041,-33.869844,151.208285,16959.632942,33919.265885
2,Paris,France,Auckland,New Zealand,Avion,2,48.85889,2.320041,-36.852095,174.76318,18538.879977,37077.759955
3,Paris,France,Oslo,Norvège,Avion,1,48.85889,2.320041,59.91333,10.73897,1344.066054,1344.066054
4,Paris,France,Lima,Peru,Avion,2,48.85889,2.320041,-12.062107,-77.036526,10245.031213,20490.062426
5,Paris,France,Rabat,Maroc,Avion,2,48.85889,2.320041,34.021845,-6.840893,1813.840043,3627.680086
6,Paris,France,Lille,France,Avion,2,48.85889,2.320041,50.636565,3.063528,204.849593,409.699187
7,Paris,France,Buenos Aires,Argentina,Avion,2,48.85889,2.320041,-34.60837,-58.444058,11027.372368,22054.744737
8,Paris,France,Rabat,Maroc,Avion,2,48.85889,2.320041,34.021845,-6.840893,1813.840043,3627.680086
9,Paris,France,Sao Paulo,Brazil,Avion,2,48.85889,2.320041,-23.550651,-46.633382,9376.93048,18753.860961


In [None]:
psdf_final_mission_paris=psdf_lat_lon_paris
print(psdf_final_mission_paris.head(3))
psdf_final_mission_paris['VILLE_DEPART'].unique()

  VILLE_DEPART PAYS_DEPART VILLE_DESTINATION PAYS_DESTINATION TRANSPORT  \
0        Paris      France        Wellington      New Zealand     Avion   
1        Paris      France            Sidney        Australia     Avion   
2        Paris      France          Auckland      New Zealand     Avion   

   ALLER_RETOUR  latitude_depart  longitude_depart  latitude_destination  \
0             2         48.85889          2.320041            -41.288795   
1             2         48.85889          2.320041            -33.869844   
2             2         48.85889          2.320041            -36.852095   

   longitude_destination  each_distance  total_distance  
0             174.777211   18980.624227    37961.248453  
1             151.208285   16959.632942    33919.265885  
2             174.763180   18538.879977    37077.759955  


array(['Paris', 'Shanghai', 'Los Angeles', 'New-York', 'London', 'Berlin'],
      dtype=object)

In [None]:
print(psdf_final_mission_paris[psdf_final_mission_paris['TRANSPORT'] == 'Taxi'].shape[0])
psdf_final_mission_paris[psdf_final_mission_paris['TRANSPORT'] == 'Taxi'].head()




60


Unnamed: 0,VILLE_DEPART,PAYS_DEPART,VILLE_DESTINATION,PAYS_DESTINATION,TRANSPORT,ALLER_RETOUR,latitude_depart,longitude_depart,latitude_destination,longitude_destination,each_distance,total_distance
62,Paris,France,Compiègne,France,Taxi,2,48.85889,2.320041,49.41795,2.826317,72.321067,144.642134
196,Paris,France,Marseille,France,Taxi,2,48.85889,2.320041,43.296174,5.369953,661.647736,1323.295472
210,Paris,France,Lille,France,Taxi,2,48.85889,2.320041,50.636565,3.063528,204.849593,409.699187
244,Paris,France,Compiègne,France,Taxi,2,48.85889,2.320041,49.41795,2.826317,72.321067,144.642134
252,Paris,France,Bordeaux,France,Taxi,2,48.85889,2.320041,44.841225,-0.580036,498.30864,996.61728


In [None]:
#psdf_final_mission_paris.to_spark()

In [None]:
psdf_final_mission_paris['TRANSPORT'].unique()

array(['Avion', 'Taxi', 'Train', 'Transports en commun'], dtype=object)

**确定不同的出行工具的系数**

**Generic calculation: calculation of emissions from each trip and then aggregation of emissions from all trips over the year
Emissions of a trip = distance travelled * emission factor of the travel mode (per km travelled or per passenger-km)
Calculation of the distance of a trip:
Calculation of the distance as the crow flies between the city of departure and the city of destination from geonames
To these distances as the crow flies, we apply the following multiplying factors: 1.3 for car travel, 1.2 for train and suburban train, 1.7 for metro, 1.5 for bus and tram. 
For taxi journeys, the return road distance is systematically taken into account and corresponds to the outward distance 
For air travel, 95 km is added to the distance as the crow flies**

**

⸻

通用计算方法：

每次出行的排放量先被计算出来，然后将全年所有出行的排放量进行汇总。

一次出行的排放量 = 行驶距离 × 出行方式的排放因子（按每公里或每人公里计算）

⸻

出行距离的计算方式：
	•	从 GeoNames 获取出发城市和目的地城市之间的“鸟飞直线距离”；
	•	然后对该直线距离应用以下乘数因子，以更贴近真实陆路距离：
	•	汽车出行：乘以 1.3
	•	火车和市郊列车：乘以 1.2
	•	地铁：乘以 1.7
	•	公交车和有轨电车：乘以 1.5
	•	对于出租车出行，系统性地将往返距离计入排放计算，即视为往返行驶与去程相同的距离；
	•	对于飞机出行，在直线距离基础上额外加上 95 公里。

⸻

**

In [None]:

def ajuster_distance(row):
    base = row['each_distance']
    mode = row['TRANSPORT']  
    
    if mode == 'Avion':
        return base + 95
    elif mode == 'Taxi':
        return base * 1.2
    elif mode == 'Train':
        return base * 1.2 
    elif mode == 'Transports en commun':
        return base * 1.5
    
 
psdf_final_mission_paris['each_distance'] = psdf_final_mission_paris.apply(ajuster_distance, axis=1)
psdf_final_mission_paris.head(3)

Unnamed: 0,VILLE_DEPART,PAYS_DEPART,VILLE_DESTINATION,PAYS_DESTINATION,TRANSPORT,ALLER_RETOUR,latitude_depart,longitude_depart,latitude_destination,longitude_destination,each_distance,total_distance
0,Paris,France,Wellington,New Zealand,Avion,2,48.85889,2.320041,-41.288795,174.777211,19075.624227,37961.248453
1,Paris,France,Sidney,Australia,Avion,2,48.85889,2.320041,-33.869844,151.208285,17054.632942,33919.265885
2,Paris,France,Auckland,New Zealand,Avion,2,48.85889,2.320041,-36.852095,174.76318,18633.879977,37077.759955


In [None]:
psdf_final_mission_paris['total_distance']=psdf_final_mission_paris['each_distance']*psdf_final_mission_paris['ALLER_RETOUR']
psdf_final_mission_paris.head(3)



Unnamed: 0,VILLE_DEPART,PAYS_DEPART,VILLE_DESTINATION,PAYS_DESTINATION,TRANSPORT,ALLER_RETOUR,latitude_depart,longitude_depart,latitude_destination,longitude_destination,each_distance,total_distance
0,Paris,France,Wellington,New Zealand,Avion,2,48.85889,2.320041,-41.288795,174.777211,19075.624227,38151.248453
1,Paris,France,Sidney,Australia,Avion,2,48.85889,2.320041,-33.869844,151.208285,17054.632942,34109.265885
2,Paris,France,Auckland,New Zealand,Avion,2,48.85889,2.320041,-36.852095,174.76318,18633.879977,37267.759955


In [None]:
psdf_discion_file = pd.read_csv('/Users/ordi_de_lvga/Documents/nf26_td/projet/GES1point5_transports_factors_20250408.tsv', sep='\t')
psdf_discion_file

Unnamed: 0,category,subcategory,subsubcategory,unit,name,year,co2,ch4,n2o,other,total,uncertainty,ef.unit
0,Transport,Plane,Short haul (< 1000 km),km,"Short haul plane, with contrails",2022,0,0,0,0,0.2586,0.37,kg CO2e/km
1,Transport,Plane,Medium haul (< 1001 - 3500km),km,"Medium haul plane, with contrails",2022,0,0,0,0,0.1875,0.37,kg CO2e/km
2,Transport,Plane,Long haul (> 3500 km),km,"Long haul plane, with contrails",2022,0,0,0,0,0.152,0.37,kg CO2e/km
3,Transport,Railway,TGV > 200 km,km,TGV,2021,0,0,0,0,0.0033,0.2,kg CO2e/km
4,Transport,Railway,Train < 200 km,km,Not available,2019,0,0,0,0,0.018,0.6,kg CO2e/km
5,Transport,Railway,International train,km,Not available,2019,0,0,0,0,0.037,0.6,kg CO2e/km
6,Transport,Railway,Mixed train France and international,km,Not available,2019,0,0,0,0,0.016,0.6,kg CO2e/km
7,Transport,Railway,rer,km,Paris suburban express railway including RER,2021,0,0,0,0,0.0094,0.2,kg CO2e/km
8,Transport,Railway,"Tramway =< 250,000 residents",km,"Subway, tramway, trolleybus, other cities - Ag...",2018,0,0,0,0,0.005,0.6,kg CO2e/km
9,Transport,Railway,"Tramway > 250,000 residents",km,"Subway, tramway, trolleybus, other cities - Ag...",2018,0,0,0,0,0.0033,0.6,kg CO2e/km


In [None]:
def calculer_emission(row):
    distance0= row['each_distance']
    distance1 = row['total_distance']
    mode = row['TRANSPORT']
    if mode =='Avion':
        if distance0 <1000:
            return 0.2586*distance1
        if distance0 >=1000 and distance0 <3500:
            return 0.1875*distance1
        if distance0 >=3500:
            return 0.1520*distance1
    if mode == 'Train':
        if distance0 <200:
            return 0.0033*distance1
        if distance0 >=200:
            return 0.018*distance1
    if mode == 'Taxi':
        return distance1 * 2 * 0.215
    if mode == 'Transports en commun':
        return 0.129*distance1

    

In [None]:
#psdf_final_mission_paris.to_spark() 

1：巴黎站点计算机工程师的人数

In [None]:
# 确保 ID_SITE 为 Paris (ID_SITE = 1) 并且职务为 Computer Engineer
paris_computer_engineers = fait_mission.filter(f.col("ID_SITE") == 1) \
    .join(sdf_dimension_personnel, "ID_PERSONNEL") \
    .filter(f.col("FONCTION_PERSONNEL") == "Computer Engineer") \
    .select("ID_PERSONNEL").distinct()

# 统计 Computer Engineer 的数量
engineer_count = paris_computer_engineers.count()
print(f"\nParis 站点共有 {engineer_count} 名 Computer Engineer。")


Paris 站点共有 1831 名 Computer Engineer。


2，伦敦站点有多少名数据工程师

In [None]:
# 确保 ID_SITE 为 London (ID_SITE = 4) 并且职务为 Data Engineer
london_data_engineers = fait_mission.filter(f.col("ID_SITE") == 4) \
    .join(sdf_dimension_personnel, "ID_PERSONNEL") \
    .filter(f.col("FONCTION_PERSONNEL") == "Data Engineer") \
    .select("ID_PERSONNEL").distinct()

# 统计 Data Engineer 的数量
data_engineer_count = london_data_engineers.count()
print(f"\n London 站点共有 {data_engineer_count} 名 Data Engineer。")


 London 站点共有 1568 名 Data Engineer。


3，整个站点有多少名Business Executive

In [None]:
# 统计所有站点的 Business Executive 人数
business_executives = fait_mission.join(sdf_dimension_personnel, "ID_PERSONNEL") \
    .filter(f.col("FONCTION_PERSONNEL") == "Business Executive") \
    .select("ID_PERSONNEL").distinct()

# 统计 Business Executive 的数量
executive_count = business_executives.count()
print(f"\n 所有站点共有 {executive_count} 名 Business Executive。")


 所有站点共有 3955 名 Business Executive。


4

In [None]:
from pyspark.sql import functions as f

# 筛选出 2024年5月至10月购买的 PC portable
pc_portable_count = fait_materiel.filter(
    (f.col("ID_TYPE") == "PC portable") & 
    (f.col("ID_DATE").between("2024-05-01", "2024-10-31"))
).count()

print(f"\n 从2024年5月至10月，组织总共购买了 {pc_portable_count} 台 PC portable。")


 从2024年5月至10月，组织总共购买了 1905 台 PC portable。


5

In [None]:
from pyspark.sql import functions as f

# 假设每台 PC portable 产生 200 kg CO₂
carbon_per_pc = 200  # kg CO₂

# 筛选出 2024年5月至10月购买的 PC portable
pc_portable_count = fait_materiel.filter(
    (f.col("ID_TYPE") == "PC portable") & 
    (f.col("ID_DATE").between("2024-05-01", "2024-10-31"))
).count()

# 计算碳排放总量
total_carbon_impact = (pc_portable_count * carbon_per_pc)/1000
print(f"\n🚀 从2024年5月至10月，PC portable 的总碳排放量为 {total_carbon_impact} T CO₂。")


🚀 从2024年5月至10月，PC portable 的总碳排放量为 381.0 T CO₂。
