In [17]:
import apache_beam as beam
from apache_beam.io import ReadFromCsv
from datetime import datetime
import pandas as pd
import os
from apache_beam.options.pipeline_options import PipelineOptions

In [None]:
# Fichiers d'entrée et sortie
INPUT_DIR = "../raw_data"
OUTPUT_DIR = "../final_data"

# Format attendu pour la conversion en datetime
DATE_COLUMNS = {
    "olist_order_items_dataset.csv": {"shipping_limit_date": "%Y-%m-%d %H:%M:%S"},
    "olist_order_reviews_dataset.csv": {"review_creation_date": "%Y-%m-%d %H:%M:%S", 
                                        "review_answer_timestamp":"%Y-%m-%d %H:%M:%S"},
    "olist_orders_dataset.csv": {"order_purchase_timestamp":"%Y-%m-%d %H:%M:%S", 
                                 "order_approved_at":"%Y-%m-%d %H:%M:%S", 
                                 "order_delivered_carrier_date":"%Y-%m-%d %H:%M:%S", 
                                 "order_delivered_customer_date":"%Y-%m-%d %H:%M:%S", 
                                 "order_estimated_delivery_date":"%Y-%m-%d %H:%M:%S"},
}

DROP_DUPLICATE = {
    "olist_geolocation_dataset.csv": "geolocation_zip_code_prefix",
    "olist_order_reviews_dataset.csv": "review_id"
}



# Fonction de transformation
def transform_csv(file_path):
    filename = os.path.basename(file_path)

    # Charger le fichier CSV avec Pandas
    df = pd.read_csv(file_path)

    # Vérifier si ce fichier a des colonnes à convertir
    if filename in DATE_COLUMNS:
        for col, fmt in DATE_COLUMNS[filename].items():
            if col in df.columns:
                df[col] = pd.to_datetime(df[col], format=fmt)
    
    if filename in DROP_DUPLICATE:
        if DROP_DUPLICATE[filename] is not None:
            df.drop_duplicates(DROP_DUPLICATE[filename], inplace=True)
    # Sauvegarder le fichier transformé
    output_path = os.path.join(OUTPUT_DIR, filename)
    df.to_csv(output_path, index=False)
    return output_path


# Pipeline Apache Beam
with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Lister fichiers CSV" >> beam.Create([os.path.join(INPUT_DIR, f) for f in os.listdir(INPUT_DIR) if f.endswith(".csv")])
        | "Transformer et Sauvegarder" >> beam.Map(transform_csv)
    )

print("Traitement terminé 🎉")

Traitement terminé 🎉
