In [2]:
!pip install sqlalchemy psycopg2-binary python-dotenv kaggle pytest tabulate

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Collecting python-dotenv
  Downloading python_dotenv-1.1.0-py3-none-any.whl.metadata (24 kB)
Collecting kaggle
  Downloading kaggle-1.7.4.2-py3-none-any.whl.metadata (16 kB)
Collecting python-slugify (from kaggle)
  Downloading python_slugify-8.0.4-py2.py3-none-any.whl.metadata (8.5 kB)
Collecting text-unidecode (from kaggle)
  Downloading text_unidecode-1.3-py2.py3-none-any.whl.metadata (2.4 kB)
Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m40.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading python_dotenv-1.1.0-py3-none-any.whl (20 kB)
Downloading kaggle-1.7.4.2-py3-none-any.whl (173 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m173.2/173.2 kB[0m [31m40.5 MB/s[0m eta 

In [3]:
!python setup_kaggle_token.py

--- Bắt đầu thiết lập Kaggle Credentials ---
2025-04-07 18:03:17,053 - INFO - Tìm thấy file nguồn: /home/jovyan/work/kaggle.json
2025-04-07 18:03:17,054 - INFO - Đã tạo (hoặc tồn tại) thư mục: /home/jovyan/.config/kaggle
2025-04-07 18:03:17,054 - INFO - Đã copy file vào: /home/jovyan/.config/kaggle/kaggle.json
2025-04-07 18:03:17,054 - INFO - Đã thiết lập quyền truy cập (600) cho: /home/jovyan/.config/kaggle/kaggle.json

--- Thông tin Credentials (đọc từ file đã copy) ---
Kaggle Username: thangalbert
Đã đọc thành công key.
----------------------------------------
2025-04-07 18:03:17,055 - INFO - Credentials đã được thiết lập tại ~/.config/kaggle/kaggle.json.
2025-04-07 18:03:17,055 - INFO - Bây giờ bạn có thể sử dụng thư viện 'kaggle' (có thể cần restart kernel).


In [4]:
import kaggle
try:
    kaggle.api.authenticate()
    print("\nKaggle API authentication successful using the copied token file!")
except Exception as e:
    print(f"\nKaggle API authentication failed: {e}")


Kaggle API authentication successful using the copied token file!


In [48]:
import kaggle
import zipfile

import os
import pandas as pd
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import logging
from pathlib import Path
import time

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

DATASET_ID = 'olistbr/brazilian-ecommerce' # Định danh của dataset trên Kaggle


current_dir = Path.cwd()
print(f"Thư mục làm việc hiện tại (notebook): {current_dir}")

BASE_DIR = current_dir.parent
print(f"Thư mục gốc dự án (ước tính): {BASE_DIR}")

DATA_DIR = BASE_DIR / 'data'
ETL_DIR = BASE_DIR / 'etl'

print(f"Đường dẫn thư mục Data: {DATA_DIR}")

print(f"Thư mục Data tồn tại: {DATA_DIR.exists()}")
print(f"Thư mục ETL tồn tại: {ETL_DIR.exists()}")



DB_USER = os.getenv('POSTGRES_USER')
DB_PASSWORD = os.getenv('POSTGRES_PASSWORD')
DB_HOST = 'postgres' 
DB_PORT = '5432'
DB_NAME = os.getenv('POSTGRES_DB')
DATABASE_URI = f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
print(f"URI DATABASE:  {DATABASE_URI}")

Thư mục làm việc hiện tại (notebook): /home/jovyan/work
Thư mục gốc dự án (ước tính): /home/jovyan
Đường dẫn thư mục Data: /home/jovyan/data
Thư mục Data tồn tại: True
Thư mục ETL tồn tại: True
URI DATABASE:  postgresql+psycopg2://ThangData:password@postgres:5432/olist_dwh


### DOWNLOAD DATA

In [6]:
EXPECTED_FILE_EXAMPLE = DATA_DIR / 'olist_orders_dataset.csv'

def check_if_extracted(target_dir, example_file):
    return example_file.exists()

def download_and_extract_kaggle_dataset(dataset_id, download_path):
    """
    Tải dataset từ Kaggle và giải nén vào thư mục chỉ định.
    Sử dụng biến môi trường KAGGLE_USERNAME và KAGGLE_KEY để xác thực.
    """
    # Tạo thư mục download nếu chưa tồn tại
    download_path.mkdir(parents=True, exist_ok=True)

    # Kiểm tra xem dữ liệu đã được giải nén chưa để tránh tải lại
    if check_if_extracted(download_path, EXPECTED_FILE_EXAMPLE):
        logging.info(f"Dữ liệu có vẻ đã tồn tại trong thư mục: {download_path}. Bỏ qua tải về.")
        return True

    logging.info("Bắt đầu quá trình xác thực với Kaggle API...")
    try:
        kaggle.api.authenticate()
        logging.info("Xác thực Kaggle API thành công.")
    except Exception as e:
        logging.error(f"Lỗi xác thực Kaggle API: {e}")
        logging.error("Hãy đảm bảo bạn đã cài đặt KAGGLE_USERNAME và KAGGLE_KEY trong file .env hoặc đặt file kaggle.json đúng vị trí.")
        return False

    logging.info(f"Bắt đầu tải dataset: {dataset_id} vào thư mục: {download_path}")
    zip_file_path = None # Khởi tạo để dùng trong finally
    try:
        # Tải dataset (thường là 1 file zip)
        kaggle.api.dataset_download_files(dataset_id, path=download_path, unzip=False) # Tải file zip về trước

        # Tìm file zip vừa tải về (tên file thường là tên dataset.zip)
        zip_filename = f"{dataset_id.split('/')[-1]}.zip"
        zip_file_path = download_path / zip_filename

        if not zip_file_path.exists():
             # Đôi khi tên file zip có thể khác, thử tìm file .zip duy nhất
             zip_files = list(download_path.glob('*.zip'))
             if len(zip_files) == 1:
                  zip_file_path = zip_files[0]
             else:
                  raise FileNotFoundError(f"Không tìm thấy file zip dự kiến '{zip_filename}' hoặc file zip duy nhất trong {download_path}")

        logging.info(f"Dataset đã được tải về thành công: {zip_file_path}")

        logging.info(f"Bắt đầu giải nén file: {zip_file_path}")
        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
            zip_ref.extractall(download_path)
        logging.info(f"Giải nén hoàn tất vào thư mục: {download_path}")

        return True # Trả về True nếu thành công

    except Exception as e:
        logging.error(f"Đã xảy ra lỗi trong quá trình tải hoặc giải nén: {e}")
        return False # Trả về False nếu có lỗi

    finally:
        if zip_file_path and zip_file_path.exists():
            try:
                os.remove(zip_file_path)
                logging.info(f"Đã xóa file zip: {zip_file_path}")
            except OSError as e:
                logging.error(f"Không thể xóa file zip {zip_file_path}: {e}")


# --- Hàm Main để chạy script ---
if __name__ == "__main__":
    logging.info("--- Bắt đầu Script Tải Dữ Liệu Olist ---")

    load_dotenv(dotenv_path=BASE_DIR / '.env')

    # Gọi hàm tải và giải nén
    success = download_and_extract_kaggle_dataset(DATASET_ID, DATA_DIR)

    if success:
        logging.info("--- Script Tải Dữ Liệu Hoàn Thành Thành Công ---")
    else:
        logging.error("--- Script Tải Dữ Liệu Gặp Lỗi ---")

2025-04-07 18:03:27,364 - INFO - --- Bắt đầu Script Tải Dữ Liệu Olist ---
2025-04-07 18:03:27,365 - INFO - Bắt đầu quá trình xác thực với Kaggle API...
2025-04-07 18:03:27,366 - INFO - Xác thực Kaggle API thành công.
2025-04-07 18:03:27,366 - INFO - Bắt đầu tải dataset: olistbr/brazilian-ecommerce vào thư mục: /home/jovyan/data


Dataset URL: https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce


2025-04-07 18:03:28,059 - INFO - Dataset đã được tải về thành công: /home/jovyan/data/brazilian-ecommerce.zip
2025-04-07 18:03:28,060 - INFO - Bắt đầu giải nén file: /home/jovyan/data/brazilian-ecommerce.zip
2025-04-07 18:03:28,823 - INFO - Giải nén hoàn tất vào thư mục: /home/jovyan/data
2025-04-07 18:03:28,835 - INFO - Đã xóa file zip: /home/jovyan/data/brazilian-ecommerce.zip
2025-04-07 18:03:28,836 - INFO - --- Script Tải Dữ Liệu Hoàn Thành Thành Công ---


## ETL

In [49]:
try:
    engine = create_engine(DATABASE_URI)
    # Thử kết nối và chạy một câu lệnh đơn giản
    with engine.connect() as connection:
        result = connection.execute(text("SELECT table_name FROM information_schema.tables WHERE table_schema = 'dwh';"))
        print("Kết nối thành công! Các bảng trong schema 'dwh':")
        for row in result:
            print(f"- {row[0]}")
except Exception as e:
    print(f"Lỗi kết nối database: {e}")


try:
    with engine.connect() as connection:
        df_customers = pd.read_sql("SELECT * FROM dwh.dim_date LIMIT 10", connection)
        display(df_customers) # display() hiển thị DataFrame đẹp hơn trong Jupyter
except Exception as e:
    print(f"Lỗi đọc dữ liệu: {e}")



Kết nối thành công! Các bảng trong schema 'dwh':
- dim_customer
- dim_seller
- fact_order_delivery
- dim_date


Unnamed: 0,date_key,full_date,day_of_week,day_name,day_of_month,day_of_year,week_of_year,month_name,month_number,quarter,year,is_weekend,is_weekday
0,20160101,2016-01-01,5,Friday,1,1,53,January,1,1,2016,False,True
1,20160102,2016-01-02,6,Saturday,2,2,53,January,1,1,2016,True,False
2,20160103,2016-01-03,7,Sunday,3,3,53,January,1,1,2016,True,False
3,20160104,2016-01-04,1,Monday,4,4,1,January,1,1,2016,False,True
4,20160105,2016-01-05,2,Tuesday,5,5,1,January,1,1,2016,False,True
5,20160106,2016-01-06,3,Wednesday,6,6,1,January,1,1,2016,False,True
6,20160107,2016-01-07,4,Thursday,7,7,1,January,1,1,2016,False,True
7,20160108,2016-01-08,5,Friday,8,8,1,January,1,1,2016,False,True
8,20160109,2016-01-09,6,Saturday,9,9,1,January,1,1,2016,True,False
9,20160110,2016-01-10,7,Sunday,10,10,1,January,1,1,2016,True,False


In [8]:
# Danh sách các file CSV và bảng staging tương ứng
CSV_FILES = {
    'olist_orders_dataset.csv': 'staging.stg_orders',
    'olist_order_items_dataset.csv': 'staging.stg_order_items',
    'olist_customers_dataset.csv': 'staging.stg_customers',
    'olist_sellers_dataset.csv': 'staging.stg_sellers',
    'olist_geolocation_dataset.csv': 'staging.stg_geolocation',
}

In [10]:
def extract_load_to_staging(csv_files_map, data_dir, db_engine):
    """
    Extract dữ liệu từ các file CSV và load vào bảng staging tương ứng.
    Xóa dữ liệu cũ trong staging trước khi load.
    """
    logging.info("Bắt đầu quá trình Extract và Load vào Staging...")
    with db_engine.connect() as connection:
        for csv_file, table_name in csv_files_map.items():
            start_time = time.time()
            file_path = data_dir / csv_file
            if not file_path.exists():
                logging.warning(f"File không tồn tại: {file_path}, bỏ qua.")
                continue

            try:
                logging.info(f"Đọc file: {csv_file}")

                df = pd.read_csv(file_path, dtype=str)
                df['_load_timestamp'] = pd.Timestamp.now() # Thêm metadata thời gian load

                logging.info(f"Load dữ liệu vào bảng: {table_name}")
                # Xóa dữ liệu cũ trong bảng staging
                connection.execute(text(f"TRUNCATE TABLE {table_name};"))
                # Load dữ liệu mới
                df.to_sql(
                    name=table_name.split('.')[1], # Chỉ lấy tên bảng
                    con=connection,
                    schema=table_name.split('.')[0], # Chỉ lấy tên schema
                    if_exists='append', # Vì đã truncate nên dùng append
                    index=False,
                    chunksize=10000 # Load theo chunk để tiết kiệm bộ nhớ
                )
                connection.commit() # Commit sau mỗi bảng staging
                end_time = time.time()
                logging.info(f"Hoàn thành load {table_name} trong {end_time - start_time:.2f} giây.")

            except Exception as e:
                logging.error(f"Lỗi khi xử lý file {csv_file} hoặc load vào {table_name}: {e}")
                connection.rollback()

    logging.info("Hoàn thành Extract và Load vào Staging.")

In [20]:
def transform_and_load_dimensions(db_engine):
    """
    Transform dữ liệu từ staging và load vào các bảng Dimension
    (dim_customer, dim_seller)
    """
    logging.info("Bắt đầu quá trình Transform và Load Dimensions (snake_case)...")
    with db_engine.connect() as connection:
        # Bắt đầu transaction
        with connection.begin(): # Sử dụng transaction cho toàn bộ quá trình load dimension
            try:
                # --- 1. Chuẩn hóa Geolocation ---
                logging.info("Chuẩn hóa dữ liệu Geolocation...")
                df_geo = pd.read_sql("SELECT * FROM staging.stg_geolocation", connection)
                df_geo['geolocation_city'] = df_geo['geolocation_city'].str.lower().str.strip()
                df_geo['geolocation_state'] = df_geo['geolocation_state'].str.upper().str.strip()
                # Tạo mapping zip_prefix -> city, state (lấy bản ghi đầu tiên cho mỗi prefix)
                # Loại bỏ các prefix trùng lặp, giữ lại bản ghi đầu tiên
                geo_map = df_geo.drop_duplicates(subset=['geolocation_zip_code_prefix'], keep='first')
                # Tạo index bằng zip_code_prefix để merge dễ dàng
                geo_map = geo_map.set_index('geolocation_zip_code_prefix')[['geolocation_city', 'geolocation_state']]
                logging.info(f"Tạo mapping cho {len(geo_map)} zip code prefixes.")

                # --- 2. Load dim_customer ---
                logging.info("Load dữ liệu vào dwh.dim_customer...")
                start_time = time.time()
                df_cust_staging = pd.read_sql("SELECT * FROM staging.stg_customers", connection)

                # Merge với geo_map để lấy city/state chuẩn hóa
                # Đảm bảo kiểu dữ liệu cột join là string
                df_cust_staging['customer_zip_code_prefix'] = df_cust_staging['customer_zip_code_prefix'].astype(str)
                df_merged_cust = pd.merge(
                    df_cust_staging,
                    geo_map,
                    left_on='customer_zip_code_prefix',
                    right_index=True,
                    how='left'
                )
                
                df_dim_cust = df_merged_cust[[
                    'customer_id',
                    'customer_unique_id',
                    'customer_zip_code_prefix',
                    'geolocation_city',  
                    'geolocation_state'
                ]].copy() 
                
                df_dim_cust = df_dim_cust.rename(columns={
                    'geolocation_city': 'customer_city',
                    'geolocation_state': 'customer_state'
                })

                # Xử lý NULL sau merge và chuẩn hóa thêm nếu cần
                df_dim_cust['customer_city'] = df_dim_cust['customer_city'].fillna('Unknown')
                df_dim_cust['customer_state'] = df_dim_cust['customer_state'].fillna('NA')
                # Optional: Thêm state_name, region dựa trên state (cần có mapping riêng)
                # df_dim_cust['customer_state_name'] = df_dim_cust['customer_state'].map(state_mapping_dict)
                # df_dim_cust['customer_region'] = df_dim_cust['customer_state'].map(region_mapping_dict)

                dim_customer_cols = [
                    'customer_id', 'customer_unique_id', 'customer_zip_code_prefix',
                    'customer_city', 'customer_state' #, 'customer_state_name', 'customer_region'
                ]
                df_dim_cust = df_dim_cust[dim_customer_cols]

                # Xử lý SCD Type 2 (Phiên bản đơn giản - Chỉ load bản ghi mới nhất/duy nhất)
                # Lấy bản ghi cuối cùng cho mỗi customer_id nếu có trùng lặp trong staging
                df_dim_cust = df_dim_cust.drop_duplicates(subset=['customer_id'], keep='last')

                # Thêm các cột SCD (snake_case)
                df_dim_cust['effective_start_date'] = pd.Timestamp.now()
                df_dim_cust['effective_end_date'] = pd.NaT # NULL trong DB
                df_dim_cust['is_current'] = True

                # Xóa dữ liệu cũ trong DimCustomer (cho lần load đầu hoặc full load)
                logging.info("Truncating dwh.dim_customer...")
                connection.execute(text("TRUNCATE TABLE dwh.dim_customer CASCADE;")) # CASCADE để xóa FK refs
                # Load vào dwh.dim_customer
                logging.info(f"Loading {len(df_dim_cust)} rows into dwh.dim_customer...")
                df_dim_cust.to_sql(
                    name='dim_customer', 
                    con=connection,
                    schema='dwh',
                    if_exists='append', # Đã truncate nên dùng append
                    index=False,
                    chunksize=10000
                )
                end_time = time.time()
                logging.info(f"Hoàn thành load dim_customer trong {end_time - start_time:.2f} giây.")

                
                # ------------ 3. LOAD DIM_SELLER ------------------
                logging.info("Load dữ liệu vào dwh.dim_seller...")
                start_time = time.time()
                df_seller_staging = pd.read_sql("SELECT * FROM staging.stg_sellers", connection)

                # Merge với geo_map
                df_seller_staging['seller_zip_code_prefix'] = df_seller_staging['seller_zip_code_prefix'].astype(str)
                df_merged_seller = pd.merge(
                    df_seller_staging,
                    geo_map,
                    left_on='seller_zip_code_prefix',
                    right_index=True,
                    how='left'
                )
                
                df_dim_seller = df_merged_seller[[
                    'seller_id',
                    'seller_zip_code_prefix',
                    'geolocation_city', 
                    'geolocation_state' 
                ]].copy() 
                
                df_dim_seller = df_dim_seller.rename(columns={
                    'geolocation_city': 'seller_city',
                    'geolocation_state': 'seller_state'
                })
                df_dim_seller['seller_city'] = df_dim_seller['seller_city'].fillna('Unknown')
                df_dim_seller['seller_state'] = df_dim_seller['seller_state'].fillna('NA')
                # Optional: Thêm state_name, region

                # Chọn cột (snake_case)
                dim_seller_cols = [
                    'seller_id', 'seller_zip_code_prefix', 'seller_city', 'seller_state'
                    # , 'seller_state_name', 'seller_region'
                ]
                df_dim_seller = df_dim_seller[dim_seller_cols]

                # Xử lý SCD Type 2 (Đơn giản)
                df_dim_seller = df_dim_seller.drop_duplicates(subset=['seller_id'], keep='last')
                df_dim_seller['effective_start_date'] = pd.Timestamp.now()
                df_dim_seller['effective_end_date'] = pd.NaT
                df_dim_seller['is_current'] = True

                # Xóa dữ liệu cũ (cho lần load đầu)
                logging.info("Truncating dwh.dim_seller...")
                connection.execute(text("TRUNCATE TABLE dwh.dim_seller CASCADE;"))
                # Load vào dwh.dim_seller
                logging.info(f"Loading {len(df_dim_seller)} rows into dwh.dim_seller...")
                df_dim_seller.to_sql(
                    name='dim_seller', # Tên bảng snake_case
                    con=connection,
                    schema='dwh',
                    if_exists='append',
                    index=False,
                    chunksize=1000
                )
                end_time = time.time()
                logging.info(f"Hoàn thành load dim_seller trong {end_time - start_time:.2f} giây.")

            except Exception as e:
                logging.error(f"Lỗi trong quá trình Transform và Load Dimensions: {e}")
                # Transaction sẽ tự rollback khi thoát khỏi 'with connection.begin()' nếu có lỗi
                raise e # Ném lại lỗi để dừng ETL

    logging.info("Hoàn thành Transform và Load Dimensions.")

In [31]:
def transform_and_load_fact(db_engine):
    """
    Transform dữ liệu từ staging, lookup keys từ Dimensions,
    và load vào fact_order_delivery
    """
    logging.info("Bắt đầu quá trình Transform và Load Fact Table...")
    with db_engine.connect() as connection:
        with connection.begin(): # Sử dụng transaction
            try:
                # --- 1. Đọc dữ liệu cần thiết ---
                logging.info("Đọc dữ liệu từ staging và dimensions...")
                df_orders = pd.read_sql("SELECT * FROM staging.stg_orders", connection)
                df_items = pd.read_sql("SELECT * FROM staging.stg_order_items", connection)
                df_dim_date = pd.read_sql('SELECT date_key, full_date FROM dwh.dim_date', connection, parse_dates=['full_date'])
                df_dim_cust = pd.read_sql('SELECT customer_key, customer_id FROM dwh.dim_customer WHERE is_current = TRUE', connection)
                df_dim_seller = pd.read_sql('SELECT seller_key, seller_id FROM dwh.dim_seller WHERE is_current = TRUE', connection)


                # --- 2. Xử lý và Tổng hợp Order Items ---
                logging.info("Tổng hợp dữ liệu Order Items...")
                df_items['price'] = pd.to_numeric(df_items['price'], errors='coerce').fillna(0)
                df_items['freight_value'] = pd.to_numeric(df_items['freight_value'], errors='coerce').fillna(0)
                df_items_agg = df_items.groupby('order_id').agg(
                    item_count=('order_item_id', 'count'),
                    total_freight_value=('freight_value', 'sum'),
                    total_price=('price', 'sum'),
                    seller_id=('seller_id', 'first')
                ).reset_index()

                # --- 3. Kết hợp Orders và Items Aggregated ---
                logging.info("Kết hợp Orders và Items Aggregated...")
                df_fact = pd.merge(df_orders, df_items_agg, on='order_id', how='inner')

                # --- 4. Chuyển đổi kiểu dữ liệu Ngày tháng trong Orders ---
                logging.info("Chuyển đổi kiểu dữ liệu ngày tháng...")
                date_cols_ts = [
                    'order_purchase_timestamp', 'order_approved_at',
                    'order_delivered_carrier_date', 'order_delivered_customer_date',
                    'order_estimated_delivery_date'
                ]
                for col in date_cols_ts:
                    df_fact[col] = pd.to_datetime(df_fact[col], errors='coerce')

                date_cols_date = {
                    'order_purchase_timestamp': 'purchase_date',
                    'order_approved_at': 'approved_date',
                    'order_delivered_carrier_date': 'delivered_carrier_date',
                    'order_delivered_customer_date': 'delivered_customer_date',
                    'order_estimated_delivery_date': 'estimated_delivery_date'
                }
                for ts_col, date_col in date_cols_date.items():
                     df_fact[date_col] = df_fact[ts_col].dt.date

                # --- 5. Tính toán các Measures ---
                logging.info("Tính toán các Measures...")
                df_fact['delivery_time_days'] = (pd.to_datetime(df_fact['delivered_customer_date'], errors='coerce') - pd.to_datetime(df_fact['approved_date'], errors='coerce')).dt.days
                df_fact['estimated_delivery_time_days'] = (pd.to_datetime(df_fact['estimated_delivery_date'], errors='coerce') - pd.to_datetime(df_fact['approved_date'], errors='coerce')).dt.days
                df_fact['delivery_time_difference_days'] = (pd.to_datetime(df_fact['delivered_customer_date'], errors='coerce') - pd.to_datetime(df_fact['estimated_delivery_date'], errors='coerce')).dt.days
                df_fact['time_to_approve_hours'] = (df_fact['order_approved_at'] - df_fact['order_purchase_timestamp']) / pd.Timedelta(hours=1)
                df_fact['seller_processing_hours'] = (df_fact['order_delivered_carrier_date'] - df_fact['order_approved_at']) / pd.Timedelta(hours=1)
                df_fact['carrier_shipping_hours'] = (df_fact['order_delivered_customer_date'] - df_fact['order_delivered_carrier_date']) / pd.Timedelta(hours=1)
                hour_cols = ['time_to_approve_hours', 'seller_processing_hours', 'carrier_shipping_hours']
                for col in hour_cols:
                    df_fact[col] = df_fact[col].round(2)
                df_fact['is_late_delivery_flag'] = (df_fact['delivery_time_difference_days'] > 0) & (df_fact['delivered_customer_date'].notna())
                df_fact['is_late_delivery_flag'] = df_fact['is_late_delivery_flag'].fillna(False).astype(bool)

                # ----  CHECK các giá trị ÂM -------
                logging.info("Setting negative time measures to None...")
                time_measure_cols = [
                    'delivery_time_days', 'estimated_delivery_time_days', 'delivery_time_difference_days',
                    'time_to_approve_hours', 'seller_processing_hours', 'carrier_shipping_hours'
                ]
                for col in time_measure_cols:
                    if col in df_fact.columns:
                        df_fact.loc[df_fact[col] < 0, col] = None
                    else:
                         logging.warning(f"Column {col} not found for negative check.")

                # --- 6. Lookup Dimension Keys ---
                logging.info("Lookup Dimension Keys...")
                date_lookup_cols = {
                    'purchase_date': 'purchase_date_key',
                    'approved_date': 'approved_date_key',
                    'delivered_carrier_date': 'delivered_carrier_date_key',
                    'delivered_customer_date': 'delivered_customer_date_key',
                    'estimated_delivery_date': 'estimated_delivery_date_key'
                }
                # Chuyển cột date trong fact sang datetime để join
                for date_col_fact in date_lookup_cols.keys():
                     df_fact[date_col_fact] = pd.to_datetime(df_fact[date_col_fact], errors='coerce')

                # Join với DimDate cho từng cột ngày
                for date_col_fact, date_key_col in date_lookup_cols.items():
                    temp_dim_date = df_dim_date.rename(columns={'date_key': date_key_col})
                    df_fact = pd.merge(
                        df_fact,
                        temp_dim_date[['full_date', date_key_col]],
                        left_on=date_col_fact,
                        right_on='full_date',
                        how='left'
                    )
                    df_fact = df_fact.drop(columns=['full_date']) # Bỏ cột full_date sau mỗi lần merge

                # Join với DimCustomer
                df_fact = pd.merge(
                    df_fact,
                    df_dim_cust[['customer_key', 'customer_id']],
                    on='customer_id',
                    how='left'
                )

                # Join với DimSeller
                df_fact = pd.merge(
                    df_fact,
                    df_dim_seller[['seller_key', 'seller_id']],
                    on='seller_id',
                    how='left'
                )

                logging.info("Handling failed lookups and preparing key data types...")
                date_key_cols_list = list(date_lookup_cols.values())
                dim_key_cols_list = ['customer_key', 'seller_key']
                all_key_cols = date_key_cols_list + dim_key_cols_list

                for col in all_key_cols:
                    if col not in df_fact.columns:
                        logging.warning(f"Key column '{col}' missing after merges. Adding as pd.NA.")
                        df_fact[col] = pd.NA # Sử dụng pd.NA là tốt nhất cho nullable int
                    else:
                        # QUAN TRỌNG: KHÔNG fillna(-1) một cách mù quáng nữa.
                        # Chỉ fillna(-1) cho customer/seller keys NẾU bạn đã tạo dòng Unknown=-1 trong Dim.
                        # Nếu không, hãy để NaN/NA để nó thành NULL trong DB.
                        if col in dim_key_cols_list:
                            # Tạm thời vẫn fill -1 cho dimension keys nếu bạn muốn (cần có dòng -1 trong dim)
                            # Hoặc comment dòng fillna này để nó thành NULL
                            df_fact[col] = df_fact[col].fillna(-1)
                            pass # Giữ logic cũ cho dim keys (cẩn thận nếu không có dòng -1)
                        # else: # col là date_key_col
                             # KHÔNG LÀM GÌ CẢ với fillna cho date keys, để NaN/NA tự nhiên

                    # Chuyển đổi sang kiểu số nullable để to_sql xử lý NaN/NA thành NULL
                    # Sử dụng float trước để xử lý các kiểu dữ liệu không đồng nhất có thể có
                    df_fact[col] = pd.to_numeric(df_fact[col], errors='coerce')
                    # Chuyển sang Int64 của Pandas để biểu diễn integer nullable
                    # Điều này giúp to_sql hiểu rõ hơn ý định gửi NULL
                    df_fact[col] = df_fact[col].astype('Int64') # 'Int64' (chữ I viết hoa) là nullable integer type

                # --- 7. Chuẩn bị dữ liệu cuối cùng cho Fact ---
                logging.info("Chuẩn bị dữ liệu cuối cùng cho fact_order_delivery...")
                df_fact = df_fact.rename(columns={'order_id': 'order_id', 'order_status': 'order_status'})
                df_fact['order_count'] = 1
                df_fact['dw_load_timestamp'] = pd.Timestamp.now()
                final_fact_columns = [
                    'order_id', 'purchase_date_key', 'approved_date_key', 'delivered_carrier_date_key',
                    'delivered_customer_date_key', 'estimated_delivery_date_key', 'customer_key', 'seller_key',
                    'order_status', 'delivery_time_days', 'estimated_delivery_time_days', 'delivery_time_difference_days',
                    'is_late_delivery_flag', 'time_to_approve_hours', 'seller_processing_hours', 'carrier_shipping_hours',
                    'item_count', 'total_freight_value', 'total_price', 'order_count', 'dw_load_timestamp'
                ]
                missing_cols = [col for col in final_fact_columns if col not in df_fact.columns]
                if missing_cols:
                    logging.error(f"Thiếu các cột trong Fact DataFrame: {missing_cols}")
                    raise ValueError(f"Missing columns required for fact table: {missing_cols}")
                df_fact_final = df_fact[final_fact_columns]


                # --- 8. Load dữ liệu vào Fact Table ---
                logging.info(f"Load {len(df_fact_final)} dòng vào dwh.fact_order_delivery...")
                start_time = time.time()
                logging.info("Truncating dwh.fact_order_delivery...")
                connection.execute(text("TRUNCATE TABLE dwh.fact_order_delivery;"))
                df_fact_final.to_sql(
                    name='fact_order_delivery',
                    con=connection,
                    schema='dwh',
                    if_exists='append',
                    index=False,
                    chunksize=10000,
                    # method='multi' # Có thể thử method='multi' nếu mặc định chậm
                )
                end_time = time.time()
                logging.info(f"Hoàn thành load fact_order_delivery trong {end_time - start_time:.2f} giây.")

            except Exception as e:
                logging.error(f"Lỗi trong quá trình Transform và Load Fact Table: {e}")
                raise e

    logging.info("Hoàn thành Transform và Load Fact Table.")


In [22]:
if __name__ == "__main__":
    logging.info("=== BẮT ĐẦU QUÁ TRÌNH ETL ===")
    total_start_time = time.time()

    extract_load_to_staging(CSV_FILES, DATA_DIR, engine)



2025-04-07 18:37:15,645 - INFO - === BẮT ĐẦU QUÁ TRÌNH ETL ===
2025-04-07 18:37:15,646 - INFO - Bắt đầu quá trình Extract và Load vào Staging...
2025-04-07 18:37:15,647 - INFO - Đọc file: olist_orders_dataset.csv
2025-04-07 18:37:15,895 - INFO - Load dữ liệu vào bảng: staging.stg_orders
2025-04-07 18:37:20,903 - INFO - Hoàn thành load staging.stg_orders trong 5.26 giây.
2025-04-07 18:37:20,903 - INFO - Đọc file: olist_order_items_dataset.csv
2025-04-07 18:37:21,065 - INFO - Load dữ liệu vào bảng: staging.stg_order_items
2025-04-07 18:37:26,548 - INFO - Hoàn thành load staging.stg_order_items trong 5.64 giây.
2025-04-07 18:37:26,549 - INFO - Đọc file: olist_customers_dataset.csv
2025-04-07 18:37:26,672 - INFO - Load dữ liệu vào bảng: staging.stg_customers
2025-04-07 18:37:30,233 - INFO - Hoàn thành load staging.stg_customers trong 3.68 giây.
2025-04-07 18:37:30,234 - INFO - Đọc file: olist_sellers_dataset.csv
2025-04-07 18:37:30,245 - INFO - Load dữ liệu vào bảng: staging.stg_sellers
20

In [23]:
transform_and_load_dimensions(engine)

2025-04-07 18:38:05,417 - INFO - Bắt đầu quá trình Transform và Load Dimensions (snake_case)...
2025-04-07 18:38:05,418 - INFO - Chuẩn hóa dữ liệu Geolocation...
2025-04-07 18:38:09,421 - INFO - Tạo mapping cho 19015 zip code prefixes.
2025-04-07 18:38:09,421 - INFO - Load dữ liệu vào dwh.dim_customer...
2025-04-07 18:38:09,960 - INFO - Truncating dwh.dim_customer...
2025-04-07 18:38:09,975 - INFO - Loading 99441 rows into dwh.dim_customer...
2025-04-07 18:38:16,983 - INFO - Hoàn thành load dim_customer trong 7.56 giây.
2025-04-07 18:38:16,984 - INFO - Load dữ liệu vào dwh.dim_seller...
2025-04-07 18:38:17,008 - INFO - Truncating dwh.dim_seller...
2025-04-07 18:38:17,032 - INFO - Loading 3095 rows into dwh.dim_seller...
2025-04-07 18:38:17,171 - INFO - Hoàn thành load dim_seller trong 0.19 giây.
2025-04-07 18:38:17,176 - INFO - Hoàn thành Transform và Load Dimensions.


In [32]:
transform_and_load_fact(engine)

2025-04-07 19:03:28,771 - INFO - Bắt đầu quá trình Transform và Load Fact Table...
2025-04-07 19:03:28,772 - INFO - Đọc dữ liệu từ staging và dimensions...
2025-04-07 19:03:30,054 - INFO - Tổng hợp dữ liệu Order Items...
2025-04-07 19:03:30,198 - INFO - Kết hợp Orders và Items Aggregated...
2025-04-07 19:03:30,286 - INFO - Chuyển đổi kiểu dữ liệu ngày tháng...
2025-04-07 19:03:30,456 - INFO - Tính toán các Measures...
2025-04-07 19:03:30,515 - INFO - Setting negative time measures to None...
2025-04-07 19:03:30,518 - INFO - Lookup Dimension Keys...
2025-04-07 19:03:30,798 - INFO - Handling failed lookups and preparing key data types...
2025-04-07 19:03:30,823 - INFO - Chuẩn bị dữ liệu cuối cùng cho fact_order_delivery...
2025-04-07 19:03:30,838 - INFO - Load 98666 dòng vào dwh.fact_order_delivery...
2025-04-07 19:03:30,839 - INFO - Truncating dwh.fact_order_delivery...
2025-04-07 19:03:43,824 - INFO - Hoàn thành load fact_order_delivery trong 12.98 giây.
2025-04-07 19:03:43,837 - INFO 

In [None]:
total_end_time = time.time()
logging.info(f"=== KẾT THÚC QUÁ TRÌNH ETL TRONG {total_end_time - total_start_time:.2f} GIÂY ===")

## TEST

In [40]:
!pytest tests/test_unit_etl.py

platform linux -- Python 3.11.6, pytest-8.3.5, pluggy-1.5.0
rootdir: /home/jovyan/work
plugins: anyio-4.0.0
collected 5 items                                                              [0m

tests/test_unit_etl.py [32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                             [100%][0m



In [54]:
!pytest tests/conftest.py

platform linux -- Python 3.11.6, pytest-8.3.5, pluggy-1.5.0
rootdir: /home/jovyan/work
plugins: anyio-4.0.0
collected 0 items                                                              [0m

