In [8]:
import os
import sys

module_path = os.path.abspath(os.path.join('..')) + '/my_utils'
if module_path not in sys.path:
    sys.path.append(module_path)

from datetime import datetime
from util_minio import MinioHandler
from extract import Extract
from transform import Transform
from load import Load

class ETLProcess:
    def __init__(self, extract, transform, load):
        self.extract = extract
        self.transform = transform
        self.load = load

    def run(self, transform_columns, table_name):
        extracted_file = self.extract.execute()
        transformed_data = self.transform.execute(extracted_file, transform_columns)
        parquet_path = f's3://tiki/curated/dim_product/ingestion_date={datetime.now().date()}'
        transformed_data.to_parquet(parquet_path, storage_options=self.transform.minio_handler.storage_options, partition_cols=['ingestion_date'])
        self.load.execute(parquet_path, table_name)

# Example usage:
if __name__ == "__main__":
    # Initialize MinIO handler and other components
    minio_handler = MinioHandler()
    api_url = 'https://api.tiki.vn/seller-store/v2/collections/116532/products'
    params = {'limit': 100, 'cursor': 40}
    headers = {'x-source': 'local', 'Host': 'api.tiki.vn'}
    bucket_name = "tiki"
    db_url = 'postgresql://my_user:my_password@localhost:35432/dw_tiki'
    storage_options = {'endpoint_url': 'http://localhost:9000', 'key': 'minioadmin', 'secret': '12345678'}
    transform_columns = ["tiki_pid", "name", "brand_name", "origin", 'ingestion_date', 'ingestion_dt_unix']
    table_name = 'dim_product'

    # Initialize classes
    extract = Extract(api_url, params, headers, minio_handler, bucket_name)
    transform = Transform(minio_handler, bucket_name)
    load = Load(db_url, storage_options)

    # # Run ETL process
    etl = ETLProcess(extract, transform, load)
    # etl.run(transform_columns, table_name)
