# **Transformation des Données :**

### **1. Lecture des Fichiers CSV**
- **Objectif** : Lire tous les fichiers CSV du dossier `data/raw` et les combiner en un seul DataFrame.
- **Indices** :
  - Utilisez une librairie comme **Pandas**, **DuckDB**, ou **Spark** pour lire les fichiers.
  - Cherchez une méthode permettant de lire plusieurs fichiers CSV avec un motif commun (ex: `visiteurs_*.csv`).
  - Assurez-vous que les colonnes sont correctement alignées lors de la fusion.
- **Validation** :
  - Affichez le nombre de lignes et colonnes pour vérifier que tout a été chargé.
  - Vérifiez les premières lignes avec `.head()`.

In [1]:
#importation 
import os
import pandas as pd
from pathlib import Path
from typing import List

In [2]:
raw_data_path = "C:/Users/Asus/OneDrive/Desktop/Auto_formation/data_upskilling/projet-end-to-end/Data-pipeline/data/raw"

In [3]:
def get_visitor_csv_files(raw_data_path: str) -> List[Path]:
    """
    Retrieve all visitor CSV files from the specified directory and verify their existence.
    
    Args:
        raw_data_path: Path to the directory containing the raw CSV files
        
    Returns:
        List[Path]: List of Path objects pointing to the CSV files
        
    Raises:
        FileNotFoundError: If directory doesn't exist or no matching CSV files found
    """
    raw_dir = Path(raw_data_path)
    
    # Verify directory exists
    if not raw_dir.is_dir():
        raise FileNotFoundError(f"Directory not found: {raw_dir}")
    
    # Find all visitor CSV files
    csv_files = list(raw_dir.glob("visiteurs_*.csv"))
    
    # Verify at least one CSV file exists
    if not csv_files:
        raise FileNotFoundError(f"No CSV files matching pattern 'visiteurs_*.csv' found in {raw_dir}")
    
    return csv_files

In [4]:
try:
    csv_files = get_visitor_csv_files(raw_data_path)
    print(f"Found {len(csv_files)} CSV files:")
    for file in csv_files:
        print(f"- {file.name}")
except FileNotFoundError as e:
    print(f"Error: {e}")

Found 5 CSV files:
- visiteurs_2025-01.csv
- visiteurs_2025-02.csv
- visiteurs_2025-03.csv
- visiteurs_2025-04.csv
- visiteurs_2025-05.csv


In [5]:
# Read and concatenate all CSV files
dfs = []
for file in csv_files:
    try:
        df = pd.read_csv(file)
        dfs.append(df)
        print(f"Loaded data from {file.name}")
    except Exception as e:
         print(f"Error loading {file.name}: {str(e)}")

combined_df = pd.concat(dfs, ignore_index=True)

Loaded data from visiteurs_2025-01.csv
Loaded data from visiteurs_2025-02.csv
Loaded data from visiteurs_2025-03.csv
Loaded data from visiteurs_2025-04.csv
Loaded data from visiteurs_2025-05.csv


In [6]:
combined_df.head(20)

Unnamed: 0,date,heure,id_du_capteur,id_du_magasin,nombre_visiteurs,unite
0,2025-01-01,12:00:00,0.0,Lille,47.0,visiteurs
1,2025-01-01,12:00:00,1.0,Lille,476.0,visiteurs
2,2025-01-01,12:00:00,2.0,Lille,95.0,visiteurs
3,2025-01-01,12:00:00,3.0,Lille,238.0,visiteurs
4,2025-01-01,12:00:00,4.0,Lille,143.0,visiteurs
5,2025-01-01,12:00:00,5.0,Lille,47.0,visiteurs
6,2025-01-01,12:00:00,6.0,Lille,2288.0,visiteurs
7,2025-01-01,12:00:00,7.0,Lille,1430.0,visiteurs
8,2025-01-01,12:00:00,0.0,Paris,3344.0,visiteurs
9,2025-01-01,12:00:00,1.0,Paris,557.0,visiteurs


In [7]:
print("\nCombined Visitor Data:")
combined_df


Combined Visitor Data:


Unnamed: 0,date,heure,id_du_capteur,id_du_magasin,nombre_visiteurs,unite
0,2025-01-01,12:00:00,0.0,Lille,47.0,visiteurs
1,2025-01-01,12:00:00,1.0,Lille,476.0,visiteurs
2,2025-01-01,12:00:00,2.0,Lille,95.0,visiteurs
3,2025-01-01,12:00:00,3.0,Lille,238.0,visiteurs
4,2025-01-01,12:00:00,4.0,Lille,143.0,visiteurs
...,...,...,...,...,...,...
5903,2025-05-26,12:00:00,3.0,Marseille,817.0,visiteurs
5904,2025-05-26,12:00:00,4.0,Marseille,17.0,visiteurs
5905,2025-05-26,12:00:00,5.0,Marseille,17.0,visiteurs
5906,2025-05-26,12:00:00,6.0,Marseille,34.0,visiteurs


In [8]:
# Show basic info about the data
print("\nData Info:")
combined_df.info()


Data Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5908 entries, 0 to 5907
Data columns (total 6 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   date              5908 non-null   object 
 1   heure             5908 non-null   object 
 2   id_du_capteur     5879 non-null   float64
 3   id_du_magasin     5908 non-null   object 
 4   nombre_visiteurs  5908 non-null   float64
 5   unite             5908 non-null   object 
dtypes: float64(2), object(4)
memory usage: 277.1+ KB


In [9]:
# Show summary statistics
print("\nSummary Statistics:")
combined_df.describe()


Summary Statistics:


Unnamed: 0,id_du_capteur,nombre_visiteurs
count,5879.0,5908.0
mean,10.103929,6718.038253
std,80.851263,78864.295944
min,0.0,-1.0
25%,2.0,21.0
50%,4.0,91.0
75%,6.0,468.0
max,999.0,999999.0


In [10]:
df_daily  = combined_df.groupby(['date', 'id_du_magasin'])['nombre_visiteurs'].sum()
df_daily  = df_daily .reset_index()
df_daily = df_daily.rename(columns={'nombre_visiteurs': 'total_journalier'})
df_daily

Unnamed: 0,date,id_du_magasin,total_journalier
0,2025-01-01,Lille,4764.0
1,2025-01-01,Lyon,8064.0
2,2025-01-01,Marseille,2160.0
3,2025-01-01,Paris,11143.0
4,2025-01-01,Toulouse,3370.0
...,...,...,...
725,2025-05-26,Lille,3011.0
726,2025-05-26,Lyon,6011.0
727,2025-05-26,Marseille,1701.0
728,2025-05-26,Paris,8020.0


In [11]:
#  ajouter une agrégation avant reset_index()
df1 = df_daily.groupby('id_du_magasin').agg({'total_journalier': 'sum'}).reset_index()
df1

Unnamed: 0,id_du_magasin,total_journalier
0,Lille,10398789.0
1,Lyon,6775150.0
2,Marseille,8227014.0
3,Paris,9024038.0
4,Toulouse,5265179.0


In [13]:
print("Total global:", df_daily ['total_journalier'].sum())

Total global: 39690170.0


In [15]:
print("Vérification pour un jour spécifique :")
print(combined_df[combined_df['date'] == '2023-05-01']['nombre_visiteurs'].sum())

Vérification pour un jour spécifique :
0.0


---
### **3. Nettoyage des Données (Unités et Valeurs Manquantes)**
- **Objectif** : Supprimer les lignes avec des unités incorrectes ou des valeurs nulles.
- **Indices** :
  - Filtrez les lignes où `unite` correspond à une valeur attendue (ex: `"visiteurs"`).
  - Supprimez les lignes où `nombre_visiteurs` est `null` ou `NaN` (avec `.dropna()`).
  - **Attention** : Ne supprimez pas encore les valeurs aberrantes (trop élevées ou trop basses).
- **Validation** :
  - Affichez les valeurs uniques de `unite` pour confirmer le filtrage.
  - Vérifiez le nombre de lignes avant/après nettoyage.

In [17]:
null_in_column = combined_df['id_du_capteur'].isnull().sum()
print(f"Valeurs manquantes : {null_in_column}")

Valeurs manquantes : 29


In [25]:
# Liste des unités valides (à adapter selon vos besoins)
unites_valides = ["visiteurs", "visiteur"]  # Exemple : formats possibles

# Filtrage
df_clean = combined_df[combined_df['unite'].isin(unites_valides)]

In [26]:
# Suppression des lignes où 'nombre_visiteurs' est NaN
df_clean = df_clean.dropna(subset=['nombre_visiteurs'])

In [27]:
# Afficher les unités restantes pour validation
print("Unités après filtrage :", df_clean['unite'].unique())

# Comparaison du nombre de lignes
print(f"Lignes avant nettoyage : {len(combined_df)}")
print(f"Lignes après nettoyage : {len(df_clean)}")

Unités après filtrage : ['visiteurs']
Lignes avant nettoyage : 5908
Lignes après nettoyage : 5840


In [28]:
print("Valeurs manquantes après nettoyage :", df_clean['nombre_visiteurs'].isna().sum())  # Doit retourner 0

Valeurs manquantes après nettoyage : 0


---
### **4. Window Function (Moyenne Mobile sur 4 Semaines)**
- **Objectif** : Calculer une moyenne mobile basée sur les 4 mêmes jours de la semaine précédents.
- **Indices** :
  - Ajoutez une colonne `day_of_week` pour identifier le jour de la semaine (ex: `lundi=0`).
  - Utilisez une **fonction de fenêtrage** (`window`) avec :
    - Partition par `day_of_week`, `id_du_magasin`, et `id_du_capteur`.
    - Tri par `date`.
    - Une fenêtre glissante de 4 périodes.
  - Calculez la moyenne (`mean`) sur cette fenêtre.
- **Validation** :
  - Vérifiez que la moyenne est calculée uniquement sur les jours correspondants (ex: que des jeudis).

In [22]:
df_clean['date'] = pd.to_datetime(df_clean['date']) 
# Convertir la colonne 'date' en type datetime

df_clean

Unnamed: 0,date,heure,id_du_capteur,id_du_magasin,nombre_visiteurs,unite
0,2025-01-01,12:00:00,0.0,Lille,47.0,visiteurs
1,2025-01-01,12:00:00,1.0,Lille,476.0,visiteurs
2,2025-01-01,12:00:00,2.0,Lille,95.0,visiteurs
3,2025-01-01,12:00:00,3.0,Lille,238.0,visiteurs
4,2025-01-01,12:00:00,4.0,Lille,143.0,visiteurs
...,...,...,...,...,...,...
5903,2025-05-26,12:00:00,3.0,Marseille,817.0,visiteurs
5904,2025-05-26,12:00:00,4.0,Marseille,17.0,visiteurs
5905,2025-05-26,12:00:00,5.0,Marseille,17.0,visiteurs
5906,2025-05-26,12:00:00,6.0,Marseille,34.0,visiteurs


In [23]:
df_nat = df_clean[pd.isna(df_clean['date'])]
df_nat

Unnamed: 0,date,heure,id_du_capteur,id_du_magasin,nombre_visiteurs,unite


In [24]:
df_clean['day_of_week'] = df_clean['date'].dt.dayofweek
df_clean

Unnamed: 0,date,heure,id_du_capteur,id_du_magasin,nombre_visiteurs,unite,day_of_week
0,2025-01-01,12:00:00,0.0,Lille,47.0,visiteurs,2
1,2025-01-01,12:00:00,1.0,Lille,476.0,visiteurs,2
2,2025-01-01,12:00:00,2.0,Lille,95.0,visiteurs,2
3,2025-01-01,12:00:00,3.0,Lille,238.0,visiteurs,2
4,2025-01-01,12:00:00,4.0,Lille,143.0,visiteurs,2
...,...,...,...,...,...,...,...
5903,2025-05-26,12:00:00,3.0,Marseille,817.0,visiteurs,0
5904,2025-05-26,12:00:00,4.0,Marseille,17.0,visiteurs,0
5905,2025-05-26,12:00:00,5.0,Marseille,17.0,visiteurs,0
5906,2025-05-26,12:00:00,6.0,Marseille,34.0,visiteurs,0


---
### **5. Calcul du Pourcentage d'Écart (pct_change)**
- **Objectif** : Mesurer l'écart entre la valeur actuelle et la moyenne mobile.
- **Indices** :
  - Utilisez la formule :  
    `pct_change = (valeur_actuelle - moyenne_mobile) / moyenne_mobile * 100`
  - Stockez le résultat dans une nouvelle colonne `pct_change`.
  - Gérez les cas où `moyenne_mobile` est nulle (évitez les divisions par zéro).
- **Validation** :
  - Vérifiez que les pourcentages sont cohérents (ex: entre -100% et +200%).

In [29]:
# Calcul de la moyenne mobile sur 4 semaines pour chaque jour de la semaine, magasin et capteur
df_clean['moyenne_mobile'] = df_clean.groupby(
    ['day_of_week', 'id_du_magasin', 'id_du_capteur']
)['nombre_visiteurs'].transform(
    lambda x: x.rolling(window=4, min_periods=1).mean()
)

# Calcul du pourcentage d'écart
df_clean['pct_change'] = (
    (df_clean['nombre_visiteurs'] - df_clean['moyenne_mobile']) /
    df_clean['moyenne_mobile'].replace(0, 1)  # Évite la division par zéro
) * 100

# Validation des résultats
print("\nValidation des pourcentages d'écart:")
print(f"- Valeur min: {df_clean['pct_change'].min():.2f}%")
print(f"- Valeur max: {df_clean['pct_change'].max():.2f}%")
print(f"- Valeur moyenne: {df_clean['pct_change'].mean():.2f}%")

# Affichage d'un échantillon pour vérification
print("\nExemple de données avec pourcentage d'écart:")
display(df_clean.sample(5))

KeyError: 'day_of_week'

---
### **6. Stockage au Format Parquet**
- **Objectif** : Sauvegarder le DataFrame nettoyé et transformé en format Parquet.
- **Indices** :
  - Utilisez une méthode comme `.to_parquet()` (Pandas) ou l'équivalent dans votre outil.
  - Choisissez un dossier de sortie (ex: `data/filtered`).
  - **Bonus** : Explorez les avantages de Parquet (compression, colonnes, compatibilité).
- **Validation** :
  - Relisez le fichier Parquet pour vérifier que les données sont intactes.

---
## **Bonnes Pratiques Supplémentaires**
- **Tests Unitaires** : Vérifiez chaque étape avec des assertions (ex: `assert df.isna().sum() == 0`).
- **Logging** : Ajoutez des logs pour suivre l'exécution (`print` ou `logging`).
- **Documentation** : Commentez votre code pour expliquer les choix critiques.

---
## **Résumé des Étapes**
1. **Charger** les CSV → DataFrame unique.
2. **Agréger** → Trafic journalier.
3. **Nettoyer** → Unités et valeurs manquantes.
4. **Window Function** → Moyenne mobile.
5. **% d'écart** → Colonne `pct_change`.
6. **Sauvegarder** → Format Parquet.

