In [1]:
# Cell 1: Imports and Configuration (Secure Loading)

import sqlite3
import pandas as pd
import os
from sqlalchemy import create_engine, text
from snowflake.sqlalchemy import URL
from sqlalchemy.exc import SQLAlchemyError
from dotenv import load_dotenv

# --- CONFIGURATION ---
load_dotenv() # Load variables from .env file

# A. Source Info
DB_PATH = os.getenv('DB_PATH')
print(f"Database Path: {DB_PATH}")
# B. Snowflake Credentials (Retrieve all required variables)
# ... (Retrieve all SNOWFLAKE_* variables) ...
SNOWFLAKE_USER = os.getenv('SNOWFLAKE_USER')
SNOWFLAKE_PASSWORD = os.getenv('SNOWFLAKE_PASSWORD')
SNOWFLAKE_ACCOUNT = os.getenv('SNOWFLAKE_ACCOUNT')
SNOWFLAKE_WAREHOUSE = os.getenv('SNOWFLAKE_WAREHOUSE')
SNOWFLAKE_DATABASE = os.getenv('SNOWFLAKE_DATABASE')
TARGET_SCHEMA = os.getenv('SNOWFLAKE_SCHEMA') 

print("Configuration loaded securely and ready for use.")

Database Path: /Users/mory_jr/Library/DBeaverData/workspace6/.metadata/sample-database-sqlite-1/Chinook.db
Configuration loaded securely and ready for use.


In [2]:
# Pipeline.ipynb - Cell 2: Data Extraction from Local SQLite

if not DB_PATH:
    raise ValueError("Error: 'DB_PATH' environment variable is not loaded. Please run Cell 1.")

TABLE_QUERIES = {
    'customers_df': "SELECT * FROM customers;",
    'inventory_df': "SELECT * FROM inventory;",
    'sales_df': "SELECT * FROM sales;"
}

dataframes = {}
conn = None

try:
    conn = sqlite3.connect(DB_PATH)
    print(f"Connection to SQLite established at: {DB_PATH}")

    for df_name, query in TABLE_QUERIES.items():
        dataframes[df_name] = pd.read_sql_query(query, conn)

    # Assign DataFrames for easy access
    customers_df = dataframes['customers_df']
    inventory_df = dataframes['inventory_df']
    sales_df = dataframes['sales_df']

except sqlite3.Error as e:
    print(f"FATAL: SQLite error during data extraction: {e}")
finally:
    if conn:
        conn.close()
        print("SQLite connection closed.")

Connection to SQLite established at: /Users/mory_jr/Library/DBeaverData/workspace6/.metadata/sample-database-sqlite-1/Chinook.db
SQLite connection closed.


In [3]:
# Cell 3
import pandas as pd

# --- 1. DATA CLEANING AND VALIDATION (Inventory) ---
# Filter out intentional errors (negative stock, invalid price/size)
inventory_df_clean = inventory_df[inventory_df['stock_quantity'] >= 0].copy()
inventory_df_clean = inventory_df_clean[inventory_df_clean['unit_price'] > 0]
inventory_df_clean = inventory_df_clean[inventory_df_clean['bottle_size_l'].isin([0.375, 0.5, 0.75, 1.0, 1.5])]
print(f"Inventory: {len(inventory_df) - len(inventory_df_clean)} rows removed due to quality issues.")


# --- 2. JOINING TABLES (Ensuring Referential Integrity) ---
# A. Join Sales with CLEAN Inventory
# Suffixes pour renommer 'unit_price' en 'unit_price_sale' (de sales_df) et 'unit_price_base' (de inventory_df_clean)
sales_inventory_join = sales_df.merge(
    inventory_df_clean[['product_id', 'product_name', 'category', 'unit_price']],
    on='product_id',
    how='inner',
    suffixes=('_sale', '_base')
)

# B. Join with Customers
fact_sales = sales_inventory_join.merge(
    customers_df[['customer_id', 'city', 'channel']],
    on='customer_id',
    how='inner'
)
print(f"Data joined. Final sales lines: {len(fact_sales)}")


# --- 3. FEATURE ENGINEERING & METRIC CALCULATION ---

# Calculer les métriques en utilisant la colonne 'unit_price_sale' sécurisée
fact_sales['total_price'] = fact_sales['unit_price_sale'] * fact_sales['quantity']
fact_sales['discount_amount'] = fact_sales['total_price'] * (fact_sales['discount'] / 100)
fact_sales['net_revenue'] = fact_sales['total_price'] - fact_sales['discount_amount']

# Calcul de la marge brute
fact_sales['cost_of_goods'] = fact_sales['unit_price_base'] * fact_sales['quantity']
fact_sales['gross_margin'] = fact_sales['net_revenue'] - fact_sales['cost_of_goods']

# Transformation des colonnes de date et temps
fact_sales['TXN_DATE'] = pd.to_datetime(fact_sales['sold_at']).dt.date
fact_sales['TXN_TIMESTAMP'] = pd.to_datetime(fact_sales['sold_at'])


# --- 4. FINAL DATA SELECTION AND RENAMING (CORRECTED) ---
# Utiliser 'order_id' à la place de 'order_line_id'
# Retirer 'sales_channel_sale' (qui est la version suffixée de sales_channel), car 'sales_channel' n'existe pas dans le sales_df initial
final_fact_sales_df = fact_sales[[
    'order_id',          # CORRECTION : Utiliser order_id (la colonne existante)
    'TXN_DATE', 
    'TXN_TIMESTAMP', 
    'product_id', 
    'customer_id',      
    'net_revenue', 
    'gross_margin', 
    'city', 
    'category', 
    'channel'            # Le canal d'acquisition du client est conservé
    # 'SALES_CHANNEL' n'est plus sélectionné ici car il manque dans sales_df
]].rename(columns={
    'order_id': 'ORDER_ID',          # Renommage de la clé pour la FACT
    'product_id': 'PRODUCT_ID',
    'customer_id': 'CUSTOMER_ID',
    'net_revenue': 'NET_REVENUE',
    'gross_margin': 'GROSS_MARGIN',
    'city': 'CUSTOMER_CITY',
    'category': 'PRODUCT_CATEGORY',
    'channel': 'ACQUISITION_CHANNEL', 
    # 'SALES_CHANNEL': 'SALES_CHANNEL' (supprimé)
})

print("Transformation complète. DataFrame 'final_fact_sales_df' est prêt pour le chargement (MERGE).")

Inventory: 3 rows removed due to quality issues.
Data joined. Final sales lines: 943
Transformation complète. DataFrame 'final_fact_sales_df' est prêt pour le chargement (MERGE).


In [4]:
# EXÉCUTER SEULEMENT POUR DEBUG
print(sales_df.columns)

Index(['order_id', 'product_id', 'customer_id', 'quantity', 'unit_price',
       'discount', 'sold_at'],
      dtype='object')


In [5]:
# --- DEBUG: Vérifier les doublons avant le chargement ---

# Identifier les lignes où la combinaison de ORDER_ID et PRODUCT_ID est dupliquée
duplicate_keys = final_fact_sales_df.duplicated(subset=['ORDER_ID', 'PRODUCT_ID'], keep=False)

# Afficher toutes les lignes qui partagent une clé (ORDER_ID, PRODUCT_ID)
if duplicate_keys.any():
    print("🛑 ATTENTION: Doublons trouvés dans le DataFrame avant le chargement !")
    print("-----------------------------------------------------------------")
    # Trier pour voir les doublons côte à côte
    print(final_fact_sales_df[duplicate_keys].sort_values(by=['ORDER_ID', 'PRODUCT_ID']))
else:
    print("✅ Aucune clé (ORDER_ID, PRODUCT_ID) dupliquée n'a été trouvée. Le problème est ailleurs.")

🛑 ATTENTION: Doublons trouvés dans le DataFrame avant le chargement !
-----------------------------------------------------------------
     ORDER_ID    TXN_DATE       TXN_TIMESTAMP  PRODUCT_ID  CUSTOMER_ID  \
1           2  2025-09-01 2025-09-01 05:45:12        1030           19   
568         2  2025-09-07 2025-09-07 19:37:20        1030           77   
3           4  2025-09-01 2025-09-01 15:31:34        1004           98   
570         4  2025-09-07 2025-09-07 10:10:32        1004           59   
376         5  2025-09-05 2025-09-05 15:04:49        1033           87   
..        ...         ...                 ...         ...          ...   
836        97  2025-09-09 2025-09-09 19:03:02        1004           39   
270        98  2025-09-03 2025-09-03 06:30:00        1004          102   
936        98  2025-09-10 2025-09-10 19:35:17        1004           41   
369       105  2025-09-04 2025-09-04 06:10:19        1043           74   
561       105  2025-09-06 2025-09-06 13:32:59     

In [6]:
# Pipeline.ipynb - Cell 4: Data Loading to Snowflake using MERGE Strategy (CORRIGÉE ET FIABILISÉE)

from sqlalchemy.exc import SQLAlchemyError
import pandas.io.sql as psql

if final_fact_sales_df.empty:
    print("🛑 Loading aborted: The final_fact_sales_df is empty.")
else:
    # --- 1. BUILD CONNECTION AND ENGINE ---
    try:
        SNOWFLAKE_URL = URL(
            account=SNOWFLAKE_ACCOUNT,
            user=SNOWFLAKE_USER,
            password=SNOWFLAKE_PASSWORD,
            database=SNOWFLAKE_DATABASE,
            warehouse=SNOWFLAKE_WAREHOUSE
        )
        snowflake_engine = create_engine(SNOWFLAKE_URL)
        print("Snowflake Engine created.")
    except Exception as e:
        print(f"🛑 FATAL: Failed to create Snowflake Engine: {e}")
        exit()

    # --- 2. SETUP DATA AND TARGETS ---
    FACT_TABLE_NAME = 'FACT_SALES'
    STAGING_TABLE_NAME = 'STG_FACT_SALES'
    
    # S'assurer que les colonnes du DataFrame sont en majuscules
    final_fact_sales_df.columns = [col.upper() for col in final_fact_sales_df.columns]
    
    # 🎯 CORRECTION FINALE : DÉDOUBLONNAGE AVANT CHARGEMENT
    # On garantit l'unicité de la clé (ORDER_ID, PRODUCT_ID) en ne gardant que la dernière entrée
    # basée sur le timestamp de la transaction.
    print(f"Taille initiale du DataFrame: {len(final_fact_sales_df)} lignes.")
    final_fact_sales_df.sort_values('TXN_TIMESTAMP', ascending=True, inplace=True)
    final_fact_sales_df.drop_duplicates(subset=['ORDER_ID', 'PRODUCT_ID'], keep='last', inplace=True)
    print(f"Taille du DataFrame après dédoublonnage: {len(final_fact_sales_df)} lignes.")

    dataframes_to_replace = {
        'DIM_CUSTOMERS': customers_df,       
        'DIM_INVENTORY': inventory_df_clean 
    }

    # --- 3. MERGE LOGIC EXECUTION ---
    with snowflake_engine.connect() as connection:
        transaction = None 
        try:
            transaction = connection.begin() 
            
            # A. PRÉPARATION du Schéma
            connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {SNOWFLAKE_DATABASE}.{TARGET_SCHEMA};"))
            connection.execute(text(f"USE SCHEMA {SNOWFLAKE_DATABASE}.{TARGET_SCHEMA};"))
            print(f"Using schema: {TARGET_SCHEMA}")

            # B. STRATÉGIE MERGE pour FACT_SALES
            print(f"\n1. Preparing STAGING table: {TARGET_SCHEMA}.{STAGING_TABLE_NAME}")
            
            connection.execute(text(f"DROP TABLE IF EXISTS {TARGET_SCHEMA}.{STAGING_TABLE_NAME};"))
            print(f"   -> Dropped existing staging table (if any).")

            create_staging_table_sql = psql.get_schema(final_fact_sales_df, STAGING_TABLE_NAME, con=connection)
            connection.execute(text(create_staging_table_sql))
            print(f"   -> Created new empty staging table.")

            final_fact_sales_df.to_sql(
                name=STAGING_TABLE_NAME,
                con=connection,
                schema=TARGET_SCHEMA,      
                if_exists='append', 
                index=False,               
                chunksize=16000            
            )
            print(f"   -> Loaded {len(final_fact_sales_df)} rows into staging table.")

            # ÉTAPE 2: EXÉCUTION du MERGE (La requête est déjà correcte)
            print(f"\n2. Executing MERGE from staging to final table: {TARGET_SCHEMA}.{FACT_TABLE_NAME}")
            
            connection.execute(text(f"CREATE TABLE IF NOT EXISTS {TARGET_SCHEMA}.{FACT_TABLE_NAME} LIKE {TARGET_SCHEMA}.{STAGING_TABLE_NAME};"))

            merge_query = f"""
            MERGE INTO {TARGET_SCHEMA}.{FACT_TABLE_NAME} AS target
            USING {TARGET_SCHEMA}.{STAGING_TABLE_NAME} AS staging
            ON target.ORDER_ID = staging.ORDER_ID AND target.PRODUCT_ID = staging.PRODUCT_ID
            
            WHEN MATCHED THEN
                UPDATE SET 
                    target.NET_REVENUE = staging.NET_REVENUE,
                    target.GROSS_MARGIN = staging.GROSS_MARGIN,
                    target.TXN_TIMESTAMP = staging.TXN_TIMESTAMP 
            
            WHEN NOT MATCHED THEN
                INSERT (ORDER_ID, TXN_DATE, TXN_TIMESTAMP, PRODUCT_ID, CUSTOMER_ID, NET_REVENUE, 
                        GROSS_MARGIN, CUSTOMER_CITY, PRODUCT_CATEGORY, ACQUISITION_CHANNEL)
                VALUES (staging.ORDER_ID, staging.TXN_DATE, staging.TXN_TIMESTAMP, staging.PRODUCT_ID, 
                        staging.CUSTOMER_ID, staging.NET_REVENUE, staging.GROSS_MARGIN, staging.CUSTOMER_CITY, 
                        staging.PRODUCT_CATEGORY, staging.ACQUISITION_CHANNEL);
            """
            
            connection.execute(text(merge_query))
            print("   -> MERGE executed successfully.")

            # Le reste du code...
            connection.execute(text(f"DROP TABLE {TARGET_SCHEMA}.{STAGING_TABLE_NAME};"))
            print(f"\n3. Staging table {STAGING_TABLE_NAME} dropped.")

            print("\n4. Loading Dimension Tables (Strategy: Drop and Create)")
            for table_name, df in dataframes_to_replace.items():
                df.columns = [col.upper() for col in df.columns]

                # ÉTAPE 1: Supprimer manuellement la table pour éviter les erreurs de "reflection"
                print(f"   -> Preparing to load {table_name}. Dropping if it exists...")
                connection.execute(text(f"DROP TABLE IF EXISTS {TARGET_SCHEMA}.{table_name};"))

                # ÉTAPE 2: Charger les données. Pandas va maintenant créer la table sans erreur.
                df.to_sql(
                    name=table_name,
                    con=connection,
                    schema=TARGET_SCHEMA,      
                    if_exists='fail', # Comportement par défaut : échoue si la table existe déjà.
                                      # C'est parfait car nous venons de la supprimer.
                    index=False,               
                    chunksize=16000            
                )
                print(f"   ✅ Table {table_name} loaded successfully.")

            
            transaction.commit()
            print("\n🎉 ETL Batch Pipeline (avec MERGE) Complété avec Succès !")

        except SQLAlchemyError as e:
            print(f"\n🛑 FATAL: SQLAlchemy/Snowflake error during loading: {e}")
            if transaction:
                transaction.rollback()
                print("   -> Transaction has been rolled back.")
        
        except Exception as e:
            print(f"\n🛑 FATAL: An unexpected error occurred: {e}")
            if transaction:
                transaction.rollback()
                print("   -> Transaction has been rolled back.")

Snowflake Engine created.
Taille initiale du DataFrame: 943 lignes.
Taille du DataFrame après dédoublonnage: 867 lignes.
Using schema: SALES_SCHEMA

1. Preparing STAGING table: SALES_SCHEMA.STG_FACT_SALES
   -> Dropped existing staging table (if any).
   -> Created new empty staging table.


  final_fact_sales_df.to_sql(


   -> Loaded 867 rows into staging table.

2. Executing MERGE from staging to final table: SALES_SCHEMA.FACT_SALES
   -> MERGE executed successfully.

3. Staging table STG_FACT_SALES dropped.

4. Loading Dimension Tables (Strategy: Drop and Create)
   -> Preparing to load DIM_CUSTOMERS. Dropping if it exists...


  df.to_sql(


   ✅ Table DIM_CUSTOMERS loaded successfully.
   -> Preparing to load DIM_INVENTORY. Dropping if it exists...


  df.to_sql(


   ✅ Table DIM_INVENTORY loaded successfully.

🎉 ETL Batch Pipeline (avec MERGE) Complété avec Succès !


In [7]:
# Pipeline.ipynb - Cell 5: Data Loading to Snowflake

# Imports (Assuming create_engine, URL, OperationalError, text, os are from Cell 1)
from sqlalchemy.exc import SQLAlchemyError

# --- 1. RETRIEVE CONFIGURATION ---
# The target schema name (e.g., SALES_SCHEMA) should be set in your .env file
# and retrieved in Cell 1 as SNOWFLAKE_SCHEMA
SNOWFLAKE_USER = os.getenv('SNOWFLAKE_USER')
SNOWFLAKE_PASSWORD = os.getenv('SNOWFLAKE_PASSWORD')
SNOWFLAKE_ACCOUNT = os.getenv('SNOWFLAKE_ACCOUNT')
SNOWFLAKE_WAREHOUSE = os.getenv('SNOWFLAKE_WAREHOUSE')
SNOWFLAKE_DATABASE = os.getenv('SNOWFLAKE_DATABASE')
TARGET_SCHEMA = os.getenv('SNOWFLAKE_SCHEMA') 

# --- 2. BUILD THE CONNECTION URL ---
try:
    # Use the URL object from snowflake.sqlalchemy for a robust connection string
    SNOWFLAKE_URL = URL(
        account=SNOWFLAKE_ACCOUNT,
        user=SNOWFLAKE_USER,
        password=SNOWFLAKE_PASSWORD,
        database=SNOWFLAKE_DATABASE,
        warehouse=SNOWFLAKE_WAREHOUSE
    )

    # --- 3. CREATE THE SQLALCHEMY ENGINE ---
    snowflake_engine = create_engine(SNOWFLAKE_URL)
    
    print("Snowflake Engine created successfully.")

except Exception as e:
    print(f"\n🛑 FATAL: Failed to create Snowflake Engine. Check credentials and network.")
    print(f"Error details: {e}")
    exit()

# --- 4. DATA LOADING PROCESS ---
if not final_fact_sales_df.empty:
    
    # Define DataFrames and their target table names
    dataframes_to_load = {
        'FACT_SALES': final_fact_sales_df,
        'DIM_CUSTOMERS': customers_df,       # Load the source customers as a dimension
        'DIM_INVENTORY': inventory_df_clean  # Load the cleaned inventory as a dimension
    }
    
    with snowflake_engine.connect() as connection:
        try:
            # A. SCHEMA CREATION (DDL)
            print(f"\nEnsuring target schema '{TARGET_SCHEMA}' exists in database '{SNOWFLAKE_DATABASE}'...")
            # Execute DDL to create schema if it doesn't exist.
            connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {SNOWFLAKE_DATABASE}.{TARGET_SCHEMA};"))
            print("Schema check/creation complete.")

            # B. Loop through all DataFrames and load them
            for table_name, df in dataframes_to_load.items():
                print(f"Loading DataFrame into Snowflake table: {TARGET_SCHEMA}.{table_name} ({len(df)} rows)")

                # Ensure columns are uppercase (best practice for Snowflake)
                df.columns = [col.upper() for col in df.columns]

                # Use to_sql() to perform the bulk insert
                df.to_sql(
                    table_name,
                    con=connection,
                    schema=TARGET_SCHEMA,      # Targets the dedicated schema
                    if_exists='append',       # Append to the table (Batch append)
                    index=False,               # Do not include the Pandas index
                    chunksize=16000            # Optimize chunk size for bulk loading
                )
                print(f"✅ Table {TARGET_SCHEMA}.{table_name} loaded successfully.")
            
            connection.commit() # Commit all table creations and loads
            print("\n🎉 ETL Batch Pipeline Completed Successfully! Data is live on Snowflake.")

        except SQLAlchemyError as e:
            # Rollback transaction on any SQL error
            connection.rollback()
            print(f"\n🛑 FATAL: SQLAlchemy/Snowflake error during loading: {e}")
        
        except Exception as e:
            print(f"\n🛑 FATAL: An unexpected error occurred: {e}")
else:
    print("🛑 Loading aborted: The final_fact_sales_df is empty.")

# The engine dispose happens implicitly when the process finishes or when the notebook stops.

Snowflake Engine created successfully.

Ensuring target schema 'SALES_SCHEMA' exists in database 'ETL_PROJECT_DB'...
Schema check/creation complete.
Loading DataFrame into Snowflake table: SALES_SCHEMA.FACT_SALES (867 rows)


  df.to_sql(


✅ Table SALES_SCHEMA.FACT_SALES loaded successfully.
Loading DataFrame into Snowflake table: SALES_SCHEMA.DIM_CUSTOMERS (121 rows)


  df.to_sql(


✅ Table SALES_SCHEMA.DIM_CUSTOMERS loaded successfully.
Loading DataFrame into Snowflake table: SALES_SCHEMA.DIM_INVENTORY (47 rows)


  df.to_sql(


✅ Table SALES_SCHEMA.DIM_INVENTORY loaded successfully.

🎉 ETL Batch Pipeline Completed Successfully! Data is live on Snowflake.
