# Nhanh Bills - Local Testing

Notebook này cho phép chạy Nhanh Bills pipeline từ local, không cần gọi Cloud Run.

**⚠️ QUAN TRỌNG - Đồng bộ với CloudRun:**
- Notebook này import trực tiếp từ code chính (`src/features/nhanh/bills/`)
- Khi code trên CloudRun thay đổi, notebook tự động sử dụng code mới
- Không cần copy/paste code - chỉ cần chạy lại notebook sau khi code thay đổi

**Flow mới (Flatten trong Python, Load trực tiếp vào Fact Tables):**
1. Extract: Lấy dữ liệu bills từ Nhanh API (raw nested structures)
2. Flatten + Load: Flatten nested structures trong Python → Upload GCS (backup) → Load trực tiếp vào Fact Tables (`nhanhVN.fact_sales_bills_v3_0`, `nhanhVN.fact_sales_bills_product_v3_0`)
3. ✅ Không cần Transform step nữa - flatten đã được làm trong loader

**Sử dụng khi:**
- Test pipeline trước khi deploy
- Debug issues
- Chạy sync thủ công cho date range cụ thể
- Test với credentials local


In [None]:
import os
import sys
from datetime import datetime, timedelta
from dotenv import load_dotenv

# Add project root to path
project_root = os.path.abspath(os.path.join(os.getcwd(), '..', '..', '..', '..'))
sys.path.insert(0, project_root)

# Load environment variables
load_dotenv()

print(f"Project root: {project_root}")
print(f"Python path: {sys.path[0]}")


In [None]:
# Import các module từ code chính
# Lưu ý: Notebook import trực tiếp từ src/features/nhanh/bills/ nên luôn đồng bộ với code CloudRun
from src.features.nhanh.bills.components.extractor import BillExtractor
from src.features.nhanh.bills.components.transformer import BillTransformer
from src.features.nhanh.bills.components.loader import BillLoader
from src.features.nhanh.bills.pipeline import BillPipeline
from src.config import settings
import logging

# Cấu hình logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

print("✅ Đã import tất cả modules thành công")


## Cấu hình

Load cấu hình từ environment variables và credential files.

**Lưu ý về Credentials:**
- Nhanh API credentials được lấy từ Secret Manager hoặc environment variables
- BigQuery credentials: Sử dụng Application Default Credentials (ADC) hoặc service account key
- GCS bucket: Cấu hình trong `BRONZE_BUCKET` environment variable


In [None]:
# ⚠️ QUAN TRỌNG: Reload modules sau khi sửa code
# Chạy cell này trước khi chạy pipeline để đảm bảo code mới nhất được sử dụng
import importlib
import src.features.nhanh.bills.components.extractor
import src.features.nhanh.bills.components.transformer
import src.features.nhanh.bills.components.loader
import src.features.nhanh.bills.pipeline

# Reload modules
importlib.reload(src.features.nhanh.bills.components.extractor)
importlib.reload(src.features.nhanh.bills.components.transformer)
importlib.reload(src.features.nhanh.bills.components.loader)
importlib.reload(src.features.nhanh.bills.pipeline)

# Re-import sau khi reload
from src.features.nhanh.bills.components.extractor import BillExtractor
from src.features.nhanh.bills.components.transformer import BillTransformer
from src.features.nhanh.bills.components.loader import BillLoader
from src.features.nhanh.bills.pipeline import BillPipeline

# Re-initialize components với code mới
extractor = BillExtractor()
transformer = BillTransformer()
loader = BillLoader()
pipeline = BillPipeline()

print("✅ Đã reload modules và khởi tạo lại components thành công")


In [None]:
# Kiểm tra cấu hình
print(f"GCP Project: {settings.gcp_project}")
print(f"GCP Region: {settings.gcp_region}")
print(f"Bronze Bucket: {settings.bronze_bucket}")
print(f"Bronze Dataset: {settings.bronze_dataset}")
print(f"Target Dataset: {settings.target_dataset}")
print(f"Nhanh API Base URL: {settings.nhanh_api_base_url}")

# Kiểm tra credentials (nếu có trong env)
nhanh_creds_available = (
    os.getenv("NHANH_APP_ID") or 
    os.getenv("NHANH_BUSINESS_ID") or 
    os.getenv("NHANH_ACCESS_TOKEN")
)
print(f"\nNhanh Credentials từ env: {'✅ Có' if nhanh_creds_available else '⚠️ Không có (sẽ dùng Secret Manager)'}")


## Khởi tạo Components

Khởi tạo extractor, transformer, loader và pipeline.


In [None]:
# Khởi tạo các components
extractor = BillExtractor()
transformer = BillTransformer()
loader = BillLoader()

# Khởi tạo pipeline
pipeline = BillPipeline()

print("✅ Đã khởi tạo tất cả components thành công")


## Chạy Pipeline

Chạy pipeline với các options khác nhau.


### Option 1: Extract + Load (Trực tiếp vào Fact Tables)

Chạy Extract và Load cho date range cụ thể. 
- Data sẽ được flatten trong Python (loader)
- Upload lên GCS để backup
- Load trực tiếp vào Fact Tables: `nhanhVN.fact_sales_bills_v3_0` và `nhanhVN.fact_sales_bills_product_v3_0`


In [None]:
# Extract + Load cho 30 ngày gần nhất
from_date = datetime.now() - timedelta(days=30)
to_date = datetime.now()

print("=" * 60)
print(f"Đang chạy Extract + Load Pipeline")
print(f"Date range: {from_date.strftime('%Y-%m-%d')} đến {to_date.strftime('%Y-%m-%d')}")
print("=" * 60)

result = pipeline.run_extract_load(
    from_date=from_date,
    to_date=to_date,
    process_by_day=True
)

print("\n" + "=" * 60)
print("Kết quả Pipeline:")
print("=" * 60)
for key, value in result.items():
    print(f"{key}: {value}")
print("=" * 60)


### Option 2: Transform (DEPRECATED - Không cần nữa)

⚠️ **DEPRECATED**: Transform step không còn cần thiết vì flatten đã được làm trong loader.
Data được load trực tiếp vào Fact Tables trong Option 1.

Nếu vẫn muốn test transform (ví dụ: từ bronze raw tables), có thể uncomment cell bên dưới.


In [None]:
# # ⚠️ DEPRECATED: Transform không còn cần thiết vì flatten đã được làm trong loader
# # Chỉ dùng nếu muốn test transform từ bronze raw tables
# print("=" * 60)
# print("Đang chạy Transform Pipeline (Raw → Clean)")
# print("⚠️ Lưu ý: Transform step đã được tích hợp vào loader, không cần chạy riêng")
# print("=" * 60)

# # result = pipeline.run_transform()

# # print("\n" + "=" * 60)
# # print("Kết quả Transform:")
# # print("=" * 60)
# # for key, value in result.items():
# #     print(f"{key}: {value}")
# # print("=" * 60)


### Option 3: Full Pipeline (Extract + Load)

Chạy toàn bộ pipeline từ đầu đến cuối.
- Extract: Lấy data từ Nhanh API
- Load: Flatten và load trực tiếp vào Fact Tables
- ⚠️ Lưu ý: `run_full_pipeline()` vẫn có transform step (backward compatibility), nhưng thực tế không cần vì flatten đã được làm trong loader


In [None]:
# Full pipeline cho 30 ngày gần nhất
from_date = datetime.now() - timedelta(days=30)
to_date = datetime.now()

print("=" * 60)
print(f"Đang chạy Full Pipeline")
print(f"Date range: {from_date.strftime('%Y-%m-%d')} đến {to_date.strftime('%Y-%m-%d')}")
print("=" * 60)

result = pipeline.run_full_pipeline(
    from_date=from_date,
    to_date=to_date
)

print("\n" + "=" * 60)
print("Kết quả Full Pipeline:")
print("=" * 60)
for key, value in result.items():
    if isinstance(value, dict):
        print(f"\n{key}:")
        for sub_key, sub_value in value.items():
            print(f"  {sub_key}: {sub_value}")
    else:
        print(f"{key}: {value}")
print("=" * 60)


### Option 4: Chạy với Date Range Tùy Chỉnh

Chạy pipeline với ngày cụ thể.


In [None]:
# # Tùy chọn: Chạy với date range tùy chỉnh
# custom_from_date = datetime(2025, 1, 1)
# custom_to_date = datetime(2025, 1, 15)

# print("=" * 60)
# print(f"Đang chạy Pipeline cho date range: {custom_from_date.strftime('%Y-%m-%d')} đến {custom_to_date.strftime('%Y-%m-%d')}")
# print("=" * 60)

# result = pipeline.run_extract_load(
#     from_date=custom_from_date,
#     to_date=custom_to_date,
#     process_by_day=True
# )

# print("\n" + "=" * 60)
# print("Kết quả Pipeline:")
# print("=" * 60)
# for key, value in result.items():
#     print(f"{key}: {value}")
# print("=" * 60)


## Test Từng Component Riêng Lẻ

Test từng component riêng lẻ để debug.


### Test Extractor

Test extractor để lấy dữ liệu từ Nhanh API.


In [None]:
# # Test Extractor - Extract bills và products
# test_from_date = datetime.now() - timedelta(days=7)
# test_to_date = datetime.now()

# print(f"Đang test Extractor cho date range: {test_from_date.strftime('%Y-%m-%d')} đến {test_to_date.strftime('%Y-%m-%d')}")

# bills, products = extractor.extract_with_products(
#     from_date=test_from_date,
#     to_date=test_to_date,
#     process_by_day=True
# )

# print(f"\nKết quả extract:")
# print(f"  - Bills: {len(bills)} records")
# print(f"  - Products: {len(products)} records")
# if bills:
#     print(f"  - Sample bill columns: {list(bills[0].keys())[:10]}")
# if products:
#     print(f"  - Sample product columns: {list(products[0].keys())[:10]}")


### Test Loader

Test loader để flatten data, upload lên GCS (backup) và load vào Fact Tables.
- Loader sẽ tự động flatten nested structures (customer, payment, vat)
- Upload flattened data lên GCS
- Load trực tiếp vào Fact Tables: `nhanhVN.fact_sales_bills_v3_0` và `nhanhVN.fact_sales_bills_product_v3_0`


In [None]:
# # Test Loader - Flatten, upload GCS và load vào Fact Tables
# # Lưu ý: Cần có dữ liệu từ extractor trước (raw nested structures)
# from datetime import date

# test_date = date.today()
# # Sample raw bill với nested structures (giống data từ API)
# test_bills = [{
#     "id": 12345,
#     "depotId": 1,
#     "date": "2025-01-15",
#     "type": 2,
#     "mode": 2,
#     "customer": {"id": 100, "name": "Test Customer", "mobile": "0123456789", "address": "Test Address"},
#     "sale": {"id": 50, "name": "Test Sale"},
#     "created": {"id": 10, "name": "test@example.com"},
#     "payment": {
#         "amount": 1000000,
#         "customerAmount": 1000000,
#         "discount": 0,
#         "points": 0,
#         "cash": {"amount": 1000000},
#         "transfer": {"amount": 0, "accountId": None},
#         "credit": {"amount": 0}
#     },
#     "description": "Test bill"
# }]
# # Sample raw product với nested vat
# test_products = [{
#     "bill_id": 12345,
#     "id": 999,
#     "code": "PROD001",
#     "barcode": "123456789",
#     "name": "Test Product",
#     "quantity": 1.0,
#     "price": 1000000.0,
#     "discount": 0.0,
#     "vat": {"percent": 10, "amount": 100000.0},
#     "amount": 1100000.0
# }]

# print(f"Đang test Loader cho ngày: {test_date}")
# print("⚠️ Loader sẽ tự động flatten nested structures và load vào Fact Tables")

# # Load bills (sẽ flatten và load vào fact_sales_bills_v3_0)
# if test_bills:
#     bills_path = loader.load_bills(data=test_bills, partition_date=test_date)
#     print(f"✅ Bills flattened, uploaded to GCS: {bills_path}")
#     print(f"✅ Bills loaded to: {loader.bills_table_id}")

# # Load products (sẽ flatten và load vào fact_sales_bills_product_v3_0)
# if test_products:
#     products_path = loader.load_bill_products(data=test_products, partition_date=test_date)
#     print(f"✅ Products flattened, uploaded to GCS: {products_path}")
#     print(f"✅ Products loaded to: {loader.products_table_id}")

# # Setup External Tables (optional, backward compatibility)
# # print("\nSetting up External Tables (optional)...")
# # external_tables = loader.setup_external_tables()
# # print(f"External Tables: {external_tables}")


### Test Transformer (DEPRECATED)

⚠️ **DEPRECATED**: Transformer không còn cần thiết vì flatten đã được tích hợp vào loader.
Data được flatten và load trực tiếp vào Fact Tables trong loader.


In [None]:
# # ⚠️ DEPRECATED: Transformer không còn cần thiết
# # Flatten đã được tích hợp vào loader, data được load trực tiếp vào Fact Tables
# # print("Đang test Transformer...")
# # print("⚠️ DEPRECATED: Transformer không còn cần thiết vì flatten đã được làm trong loader")
# # print("⚠️ Lưu ý: Cần có dữ liệu trong External Tables trước (nếu muốn test từ bronze raw)")

# # result = transformer.transform_flatten()

# # print(f"\nKết quả Transform:")
# # for key, value in result.items():
# #     print(f"  - {key}: {value}")


## Verify Data trong Fact Tables

Sau khi chạy pipeline, verify data đã được load vào Fact Tables.


In [None]:
# Verify data trong Fact Tables
from google.cloud import bigquery
from datetime import date

bq_client = bigquery.Client(project=settings.gcp_project)

# Check bills table
bills_table_id = f"{settings.gcp_project}.{settings.target_dataset}.fact_sales_bills_v3_0"
bills_query = f"""
SELECT 
    COUNT(*) as total_bills,
    COUNT(DISTINCT date) as distinct_dates,
    MIN(date) as min_date,
    MAX(date) as max_date,
    COUNT(DISTINCT customer_id) as distinct_customers
FROM `{bills_table_id}`
WHERE date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
"""

print("=" * 60)
print("Bills Table Statistics (last 30 days):")
print("=" * 60)
try:
    bills_job = bq_client.query(bills_query)
    bills_results = bills_job.result()
    for row in bills_results:
        print(f"Total Bills: {row.total_bills}")
        print(f"Distinct Dates: {row.distinct_dates}")
        print(f"Date Range: {row.min_date} to {row.max_date}")
        print(f"Distinct Customers: {row.distinct_customers}")
except Exception as e:
    print(f"⚠️ Error querying bills table: {e}")

print("\n" + "=" * 60)
print("Products Table Statistics (last 30 days):")
print("=" * 60)

# Check products table
products_table_id = f"{settings.gcp_project}.{settings.target_dataset}.fact_sales_bills_product_v3_0"
products_query = f"""
SELECT 
    COUNT(*) as total_products,
    COUNT(DISTINCT DATE(extraction_timestamp)) as distinct_dates,
    MIN(DATE(extraction_timestamp)) as min_date,
    MAX(DATE(extraction_timestamp)) as max_date,
    COUNT(DISTINCT bill_id) as distinct_bills,
    COUNT(DISTINCT product_id) as distinct_products
FROM `{products_table_id}`
WHERE DATE(extraction_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
"""

try:
    products_job = bq_client.query(products_query)
    products_results = products_job.result()
    for row in products_results:
        print(f"Total Products: {row.total_products}")
        print(f"Distinct Dates: {row.distinct_dates}")
        print(f"Date Range: {row.min_date} to {row.max_date}")
        print(f"Distinct Bills: {row.distinct_bills}")
        print(f"Distinct Products: {row.distinct_products}")
except Exception as e:
    print(f"⚠️ Error querying products table: {e}")

print("=" * 60)
