# ETL Pipeline - Data Analysis Notebook

Notebook này dùng để phân tích dữ liệu từ ETL pipeline


In [1]:
# Import libraries
import sys
from pathlib import Path
import pandas as pd

# Optional visualization libraries
try:
    import matplotlib.pyplot as plt
    HAS_MATPLOTLIB = True
except ImportError:
    HAS_MATPLOTLIB = False
    print("⚠️ matplotlib not installed. Install with: pip install matplotlib")

try:
    import seaborn as sns
    HAS_SEABORN = True
except ImportError:
    HAS_SEABORN = False
    print("⚠️ seaborn not installed. Install with: pip install seaborn")

# Add project root to path
project_root = Path().resolve().parent
sys.path.insert(0, str(project_root))

# Import ETL modules
from src.extract import extract_csv, extract_json, extract_json_nested
from src.dashboard import Dashboard
from src.utils.database import db_manager


## 1. Load Data


In [2]:
# Load sample data
orders_df = extract_csv("../data/sample/orders.csv")
customers_df = extract_json("../data/sample/customers.json")
products_df = extract_json_nested("../data/sample/products.json", record_path="products")

print(f"Orders: {len(orders_df)} rows")
print(f"Customers: {len(customers_df)} rows")
print(f"Products: {len(products_df)} rows")


06:22:14 - INFO - Extracting data from CSV: ../data/sample/orders.csv
06:22:14 - INFO - Successfully extracted 10 rows from ../data/sample/orders.csv
06:22:14 - INFO - Extracting data from JSON: ../data/sample/customers.json
06:22:14 - INFO - Successfully extracted 5 rows from ../data/sample/customers.json
06:22:14 - INFO - Extracting nested JSON from: ../data/sample/products.json
06:22:14 - INFO - Successfully extracted 6 rows from nested JSON: ../data/sample/products.json
Orders: 10 rows
Customers: 5 rows
Products: 6 rows


## 3. Load Processed Data from Data Lake

Load data từ data lake (parquet files) đã được pipeline xử lý


In [3]:
# Load processed data from data lake (parquet files)
import glob
from pathlib import Path

# Find latest processed files
processed_dir = Path("../data/processed")
parquet_files = list(processed_dir.glob("*.parquet"))

if parquet_files:
    # Get latest file
    latest_file = max(parquet_files, key=lambda x: x.stat().st_mtime)
    print(f"Loading: {latest_file.name}")
    
    # Read parquet file
    processed_df = pd.read_parquet(latest_file)
    print(f"\nLoaded {len(processed_df)} rows")
    print(f"Columns: {processed_df.columns.tolist()}")
    processed_df.head()
else:
    print("No processed parquet files found. Run pipeline first!")


Loading: sales_transformed_20260109_062053.parquet

Loaded 10 rows
Columns: ['order_id', 'customer_id', 'product_id', 'order_date', 'quantity', 'unit_price', 'discount', 'customer_name', 'email', 'city', 'country', 'product_product_name', 'product_category', 'product_brand', 'product_price', 'total_amount', 'discount_amount', 'final_amount']


## 2. Query from Database


In [4]:
# Query data from warehouse
dashboard = Dashboard()

# Get sales summary
sales_summary = dashboard.get_sales_summary()
print("Sales Summary:")
print(sales_summary)

# Get top customers
top_customers = dashboard.get_top_customers(limit=10)
print("\nTop Customers:")
print(top_customers)


06:22:14 - INFO - Database connection established: postgresql://***@postgres:5432/etl_demo
Sales Summary:
   total_orders  total_quantity  total_revenue  avg_order_value  \
0             0               0            0.0              0.0   

   total_discount  
0             0.0  

Top Customers:
   customer_id customer_name         city   country  order_count total_revenue
0          105   Hoàng Văn E  Hồ Chí Minh  Việt Nam            0          None
1          101  Nguyễn Văn A       Hà Nội  Việt Nam            0          None
2          102    Trần Thị B  Hồ Chí Minh  Việt Nam            0          None
3          104    Phạm Thị D       Hà Nội  Việt Nam            0          None
4          103      Lê Văn C      Đà Nẵng  Việt Nam            0          None
