In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower, regexp_replace, current_timestamp, count, when
import re
import os

# --- CẤU HÌNH ---
BRONZE_BUCKET = "s3a://datalake/bronze"
# Bạn có thể tạo bucket mới tên 'silver' trên MinIO hoặc dùng prefix
SILVER_BUCKET = "s3a://datalake/silver" 


def get_spark_session():
    return SparkSession.builder \
        .appName("ELT_BronzeToSilver") \
        .master("spark://spark-master:7077") \
        .config("spark.hadoop.fs.s3a.access.key", os.environ.get("MINIO_ACCESS_KEY", "minio_access_key")) \
        .config("spark.hadoop.fs.s3a.secret.key", os.environ.get("MINIO_SECRET_KEY", "minio_secret_key")) \
        .config("spark.hadoop.fs.s3a.endpoint", os.environ.get("S3_ENDPOINT", "http://minio:9000")) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
        .config("spark.hadoop.fs.s3a.fast.upload", "true")\
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
        .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")\
        .getOrCreate()

In [2]:
spark = get_spark_session()

In [3]:
# In ra các cấu hình quan trọng liên quan đến S3/MinIO
print("--- CẤU HÌNH S3A HIỆN TẠI ---")

keys_to_check = [
    "spark.hadoop.fs.s3a.endpoint",
    "spark.hadoop.fs.s3a.access.key",
    "spark.hadoop.fs.s3a.secret.key",
    "spark.hadoop.fs.s3a.path.style.access",
    "spark.hadoop.fs.s3a.connection.ssl.enabled",
    "spark.hadoop.fs.s3a.impl",
    "spark.hadoop.fs.s3a.aws.credentials.provider"
]

for key in keys_to_check:
    # Dùng tham số thứ 2 là giá trị mặc định nếu không tìm thấy key
    val = spark.conf.get(key, "KHÔNG TÌM THẤY")
    print(f"{key}: {val}")

--- CẤU HÌNH S3A HIỆN TẠI ---
spark.hadoop.fs.s3a.endpoint: http://minio:9000
spark.hadoop.fs.s3a.access.key: minio_access_key
spark.hadoop.fs.s3a.secret.key: minio_secret_key
spark.hadoop.fs.s3a.path.style.access: true
spark.hadoop.fs.s3a.connection.ssl.enabled: false
spark.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider


In [4]:


print("--- BIẾN MÔI TRƯỜNG (OS ENV) ---")
env_vars = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "MINIO_ACCESS_KEY", "MINIO_SECRET_KEY"]

for var in env_vars:
    print(f"{var}: {os.environ.get(var, 'Không thiết lập')}")

--- BIẾN MÔI TRƯỜNG (OS ENV) ---
AWS_ACCESS_KEY_ID:  minio_access_key
AWS_SECRET_ACCESS_KEY:  minio_secret_key
MINIO_ACCESS_KEY: minio_access_key
MINIO_SECRET_KEY: minio_secret_key


In [5]:
tables = [
        "IMM_FILE", # Header phiếu kho
        "IMN_FILE", # Detail Phiếu kho
        "IMA_FILE", # Danh mục vật tư, hàng hóa
        "SMD_FILE", # Quy đổi theo mã hàng
        "SMC_FILE", # Quy đổi đơn vị chung
    ]
report_name = "AIMR324"
# --- CẤU HÌNH ---
BRONZE_BUCKET = "s3a://datalake/bronze"
# Bạn có thể tạo bucket mới tên 'silver' trên MinIO hoặc dùng prefix
SILVER_BUCKET = "s3a://datalake/silver" 

In [16]:
def process_bronze_to_silver(spark, table_name, report_name,unique_keys = "col"):
    """
    Logix xử lý chính: Read Bronze -> Transform -> Merge Silver
    """
    bronze_path = f"{BRONZE_BUCKET}/{report_name}/{table_name.lower()}"
    silver_path = f"{SILVER_BUCKET}/{report_name}/{table_name.lower()}"
    
    print(f"\n>>> Đang xử lý bảng: {table_name}")
    
    # 1. Đọc Bronze (Delta)
    try:
        df_bronze = spark.read.format("delta").load(bronze_path)
        silver_bronze = spark.read.format("delta").load(silver_path)
        return df_bronze, silver_bronze
    except Exception as e:
        print(f"error: {e}")
        print(f"!!! Không tìm thấy bảng Bronze tại: {bronze_path}")
        return

In [17]:
df_bronze, silver_bronze = process_bronze_to_silver(spark, tables[1], report_name)



>>> Đang xử lý bảng: IMN_FILE


In [19]:
silver_bronze.printSchema()


root
 |-- imn01: string (nullable = true)
 |-- imn02: decimal(5,0) (nullable = true)
 |-- imn03: string (nullable = true)
 |-- imn04: string (nullable = true)
 |-- imn05: string (nullable = true)
 |-- imn06: string (nullable = true)
 |-- imn09: string (nullable = true)
 |-- imn10: decimal(15,3) (nullable = true)
 |-- imn15: string (nullable = true)
 |-- imn16: string (nullable = true)
 |-- imn17: string (nullable = true)
 |-- imn20: string (nullable = true)
 |-- imn21: decimal(20,8) (nullable = true)
 |-- imn22: decimal(15,3) (nullable = true)
 |-- imn27: string (nullable = true)
 |-- imn28: string (nullable = true)
 |-- imn29: string (nullable = true)
 |-- imnud01: string (nullable = true)
 |-- imnud07: decimal(15,3) (nullable = true)
 |-- imnud08: decimal(15,3) (nullable = true)
 |-- imnud09: decimal(15,3) (nullable = true)
 |-- imnplant: string (nullable = true)
 |-- imnlegal: string (nullable = true)
 |-- imn0912: decimal(20,6) (nullable = true)
 |-- imn0913: decimal(20,6) (nullabl

In [18]:
df_bronze.printSchema()

root
 |-- IMN01: string (nullable = true)
 |-- IMN02: decimal(5,0) (nullable = true)
 |-- IMN03: string (nullable = true)
 |-- IMN041: string (nullable = true)
 |-- IMN04: string (nullable = true)
 |-- IMN05: string (nullable = true)
 |-- IMN06: string (nullable = true)
 |-- IMN07: string (nullable = true)
 |-- IMN08: string (nullable = true)
 |-- IMN09: string (nullable = true)
 |-- IMN091: decimal(20,6) (nullable = true)
 |-- IMN092: decimal(20,6) (nullable = true)
 |-- IMN10: decimal(15,3) (nullable = true)
 |-- IMN11: decimal(15,3) (nullable = true)
 |-- IMN12: string (nullable = true)
 |-- IMN13: string (nullable = true)
 |-- IMN14: timestamp (nullable = true)
 |-- IMN151: string (nullable = true)
 |-- IMN15: string (nullable = true)
 |-- IMN16: string (nullable = true)
 |-- IMN17: string (nullable = true)
 |-- IMN18: string (nullable = true)
 |-- IMN19: string (nullable = true)
 |-- IMN20: string (nullable = true)
 |-- IMN201: string (nullable = true)
 |-- IMN202: string (nullabl

In [20]:
counts = df_bronze.select([count(c).alias(c) for c in df_bronze.columns]).first().asDict()

In [21]:
counts

{'IMN01': 381662,
 'IMN02': 381662,
 'IMN03': 381662,
 'IMN041': 0,
 'IMN04': 381662,
 'IMN05': 381661,
 'IMN06': 381637,
 'IMN07': 0,
 'IMN08': 0,
 'IMN09': 381662,
 'IMN091': 0,
 'IMN092': 0,
 'IMN10': 381662,
 'IMN11': 0,
 'IMN12': 0,
 'IMN13': 0,
 'IMN14': 0,
 'IMN151': 0,
 'IMN15': 381662,
 'IMN16': 381662,
 'IMN17': 381662,
 'IMN18': 0,
 'IMN19': 0,
 'IMN20': 381662,
 'IMN201': 0,
 'IMN202': 0,
 'IMN21': 381584,
 'IMN22': 381661,
 'IMN23': 0,
 'IMN24': 0,
 'IMN25': 0,
 'IMN26': 0,
 'IMN27': 356123,
 'IMN28': 344421,
 'IMN30': 0,
 'IMN31': 0,
 'IMN32': 0,
 'IMN33': 0,
 'IMN34': 0,
 'IMN35': 0,
 'IMN40': 0,
 'IMN41': 0,
 'IMN42': 0,
 'IMN43': 0,
 'IMN44': 0,
 'IMN45': 0,
 'IMN51': 0,
 'IMN52': 0,
 'IMN29': 381586,
 'IMN9301': 0,
 'IMN9302': 0,
 'IMNUD01': 2656,
 'IMNUD02': 0,
 'IMNUD03': 0,
 'IMNUD04': 0,
 'IMNUD05': 0,
 'IMNUD06': 0,
 'IMNUD07': 381213,
 'IMNUD08': 13077,
 'IMNUD09': 79587,
 'IMNUD10': 0,
 'IMNUD11': 0,
 'IMNUD12': 0,
 'IMNUD13': 0,
 'IMNUD14': 0,
 'IMNUD15': 0,
 

In [None]:
cols_to_drop = [c for c, val in counts.items() if val == 0]

In [None]:
spark.stop()