In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, unix_timestamp, hour, to_date, row_number, trim
import logging
from io import StringIO, BytesIO
from minio import Minio
from minio.error import S3Error
import datetime
from pyspark.sql.window import Window

In [None]:

HIVE_URI = 'thrift://hive-metastore:9083'
MINIO_ACCESS_KEY = 'FrmF5fXO0bxpBepjVUSX'
MINIO_SECRET_KEY = '5RgPTIToiUPa16HAWnRv3KcsE7y21Oo3RPA3QXTb'
MINIO_ENDPOINT = "minio:9000"
MINIO_BUCKET = "logs"


spark = SparkSession.builder \
    .appName("IcebergAsDefaultCatalog") \
    .config('spark.jars.packages', 
            'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,'
            'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,'
            'software.amazon.awssdk:bundle:2.17.178,'
            'software.amazon.awssdk:url-connection-client:2.17.178') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.spark_catalog.uri", "thrift://hive-metastore:9083") \
    .config("spark.sql.catalog.spark_catalog.warehouse", "s3a://warehouse/") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.hive.metastore.sasl.enabled", "false") \
    .getOrCreate()

## Start Spark Session
print("Spark Running")


In [None]:
spark.conf.set("spark.sql.iceberg.handle-timestamp-without-timezone", "true")

In [None]:
# In-memory log buffer
log_buffer = StringIO()

# Configure logger
log_handler = logging.StreamHandler(log_buffer)
log_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))

logger = logging.getLogger("SilverLoader")
logger.setLevel(logging.INFO)
logger.addHandler(log_handler)
logger.addHandler(logging.StreamHandler())  # Optional: Also log to stdout

In [None]:
# Upload logs from buffer to MinIO
def flush_logs_to_minio(object_name):
    try:
        client = Minio(
            MINIO_ENDPOINT,
            access_key=MINIO_ACCESS_KEY,
            secret_key=MINIO_SECRET_KEY,
            secure=False  # Set to True if using HTTPS
        )
        if not client.bucket_exists(MINIO_BUCKET):
            client.make_bucket(MINIO_BUCKET)

        content = log_buffer.getvalue().encode("utf-8")
        client.put_object(
            MINIO_BUCKET,
            object_name,
            data=BytesIO(content),
            length=len(content),
            content_type='text/plain'
        )
        logger.info(f"Uploaded log to MinIO at {MINIO_BUCKET}/{object_name}")
        
        # Clear the buffer after upload
        log_buffer.truncate(0)
        log_buffer.seek(0)
        
    except S3Error as e:
        logger.error(f"Failed to upload log: {e}")

In [None]:
def load_silver_crm_cust_info():

    try:
        df = spark.table("spark_catalog.bronze.crm_cust_info")
        logger.info("Loaded table crm_cust_info from bronze layer")

        window_spec = Window.partitionBy("cst_id").orderBy(df["cst_create_date"].desc())
        df_with_row_num = df.withColumn("row_num", row_number().over(window_spec))
        ranked_cust_info_df = df_with_row_num.filter((col("row_num") == 1) & (col("cst_id").isNotNull())).drop("row_num")
        ranked_cust_info_df.createOrReplaceTempView("ranked_cust_info_df")
        logger.info("Filtered latest records for each customer")

        silver_cust_info = spark.sql("""SELECT cst_id,
                                        cst_key,
                                        TRIM(cst_firstname) AS cst_firstname,
                                        TRIM(cst_lastname) AS cst_lastname,
                                        CASE 
                                            WHEN UPPER(cst_marital_status) = 'S' THEN 'Single'
                                            WHEN UPPER(cst_marital_status) = 'M' THEN 'Married'
                                            ELSE 'n/a'
                                        END cst_marital_status,
                                        CASE 
                                            WHEN UPPER(cst_gndr) = 'F' THEN 'Female'
                                            WHEN UPPER(cst_gndr) = 'M' THEN 'Male'
                                            ELSE 'n/a'
                                        END cst_gndr,
                                        cst_create_date,
                                        CURRENT_TIMESTAMP() AS dwh_create_date
                                        FROM ranked_cust_info_df
                                    """)
        logger.info("Transformed data for silver layer")

        silver_cust_info.write.format("iceberg") \
            .mode("overwrite") \
            .insertInto(f"spark_catalog.silver.crm_cust_info")
        
        logger.info("Data written to silver layer table crm_cust_info")

    except Exception as e:
        logger.error(f"Failed to load table crm_cust_info: {e}")
        raise ValueError
    
    finally:
        flush_logs_to_minio(f"silver_logs/crm_cust_info.log")

In [None]:
load_silver_crm_cust_info()

In [None]:
def load_silver_crm_prd_info():

    try:
        silver_prd_df = spark.table("spark_catalog.bronze.crm_prd_info")
        silver_prd_df.createOrReplaceTempView("silver_prd_df")
        logger.info("Loaded table crm_prd_info from bronze layer")

        crm_silver_prd_info_df = spark.sql("""SELECT prd_id,
                    REPLACE(SUBSTR(prd_key, 1, 5),'-', '_') AS cat_id,
                    SUBSTR(prd_key, 7, LENGTH(prd_key)) AS prd_key,
                    prd_nm,
                    COALESCE(prd_cost, 0) AS prd_cost,
                    CASE UPPER(TRIM(prd_line))
                        WHEN 'M' THEN 'Mountain'
                        WHEN 'R' THEN 'Road'
                        WHEN 'S' THEN 'Other Sales'
                        ELSE 'n/a'
                    END 
                    AS prd_line,
                    prd_start_dt,
                    LEAD(prd_start_dt) OVER (PARTITION BY prd_key ORDER BY prd_start_dt) - 1 AS prd_end_dt_new,
                    CURRENT_TIMESTAMP() AS dwh_create_date
                FROM silver_prd_df
                """)
        
        logger.info("created result dataset for silver crm_prd_info")
        
        crm_silver_prd_info_df.write.format("iceberg") \
            .mode("overwrite") \
            .insertInto(f"spark_catalog.silver.crm_prd_info")
        
        logger.info("Loaded table silver.crm_prd_info")
        

    except Exception as e:
        logger.error(f"Failed to load table crm_prd_info: {e}")
        raise ValueError
    
    finally:
        flush_logs_to_minio(f"silver_logs/crm_prd_info.log")

In [None]:
load_silver_crm_prd_info()

In [None]:
def load_crm_sales_details():
    
    try:
        silver_sales_df = spark.table("spark_catalog.bronze.crm_sales_details")
        silver_sales_df.createOrReplaceTempView("silver_sales_df")
        logger.info("Loaded table silver_sales_df from bronze layer")

        sales_df = spark.sql("""SELECT sls_ord_num,
                        sls_prd_key,
                        sls_cust_id,
                        CASE WHEN sls_order_dt = 0 OR LENGTH(sls_order_dt) != 8
                             THEN NULL
                             ELSE CAST(TO_DATE(CAST(sls_order_dt AS STRING), 'yyyyMMdd') AS DATE) 
                        END AS sls_order_dt,
                        CASE WHEN sls_ship_dt = 0 OR LENGTH(sls_ship_dt) != 8
                             THEN NULL
                             ELSE CAST(TO_DATE(CAST(sls_ship_dt AS STRING), 'yyyyMMdd') AS DATE) 
                        END AS sls_ship_dt,
                        CASE WHEN sls_due_dt = 0 OR LENGTH(sls_due_dt) != 8
                             THEN NULL
                             ELSE CAST(TO_DATE(CAST(sls_due_dt AS STRING), 'yyyyMMdd') AS DATE) 
                        END AS sls_due_dt,
                        CASE 
                            WHEN sls_sales IS NULL OR sls_sales <= 0 OR sls_sales != sls_quantity * ABS(sls_price)
                             THEN sls_quantity * ABS(sls_price)
                             ELSE sls_sales
                            END AS sls_sales,
                        sls_quantity,
                        CASE 
                            WHEN sls_price IS NULL OR sls_price <= 0 
                             THEN sls_sales / NULLIF(sls_quantity, 0)
                             ELSE sls_price
                        END AS sls_price,
                        CURRENT_TIMESTAMP() AS dwh_create_date
                    FROM silver_sales_df; 
                  """)
        
        logger.info("created result dataset for silver silver_sales_df")

        # df.show(5)
        
        sales_df.write.format("iceberg") \
            .mode("overwrite") \
            .insertInto(f"spark_catalog.silver.crm_sales_details")
        
        logger.info("Loaded table silver.crm_sales_details")
        

    except Exception as e:
        logger.error(f"Failed to load table crm_sales_details: {e}")
        raise ValueError
    
    finally:
        flush_logs_to_minio(f"silver_logs/crm_sales_details.log")

In [None]:
load_crm_sales_details()

In [None]:
def load_erp_cust_az12():
    
    try:
        df = spark.table("spark_catalog.bronze.erp_cust_az12")
        df.createOrReplaceTempView("bz_cust_df")
        logger.info("Loaded table erp_cust_az12 from bronze layer")

        sales_df = spark.sql("""SELECT 
                             CASE 
                                WHEN cid like 'NAS%' THEN SUBSTR(cid, 4,LENGTH(cid))
                                ELSE cid  
                             END AS cst_id,
                             CASE 
                                WHEN bdate > CURRENT_DATE() THEN NULL
                                ELSE bdate
                             END AS bdate,
                             CASE 
                                WHEN UPPER(TRIM(gen)) IN ('F', 'FEMALE') THEN 'Female'
                                WHEN UPPER(TRIM(gen)) IN ('M', 'MALE') THEN 'Male'
                                ELSE 'n/a'
                             END as gen,
                             CURRENT_TIMESTAMP() AS dwh_create_date
                            FROM bz_cust_df
                  """)
        
        logger.info("created result dataset for silver erp_cust_az12")

        # df.show(5)
        
        sales_df.write.format("iceberg") \
            .mode("overwrite") \
            .insertInto(f"spark_catalog.silver.erp_cust_az12")
        
        logger.info("Loaded table silver.erp_cust_az12")
        

    except Exception as e:
        logger.error(f"Failed to load table erp_cust_az12: {e}")
        raise ValueError
    
    finally:
        flush_logs_to_minio(f"silver_logs/erp_cust_az12.log")

In [None]:
load_erp_cust_az12()

In [24]:
def load_erp_cust_az12():
    
    try:
        df = spark.table("spark_catalog.bronze.erp_loc_a101")
        df.createOrReplaceTempView("erp_loc_a101")
        logger.info("Loaded table erp_loc_a101 from bronze layer")

        sales_df = spark.sql("""SELECT 
                             REPLACE(cid, '-','') as cid,
                             CASE 
                                WHEN TRIM(cntry) = 'DE' THEN 'Germany'
                                WHEN TRIM(cntry) IN ('US', 'USA') THEN 'United States'
                                WHEN TRIM(cntry) = '' OR TRIM(cntry) IS NULL THEN 'n/a'
                                ELSE TRIM(cntry)
                             END AS cntry,
                             CURRENT_TIMESTAMP() AS dwh_create_date
                            FROM erp_loc_a101
                  """)
        
        logger.info("created result dataset for silver erp_loc_a101")

        # sales_df.show(5)
        
        sales_df.write.format("iceberg") \
            .mode("overwrite") \
            .insertInto(f"spark_catalog.silver.erp_loc_a101")
        
        logger.info("Loaded table silver.erp_loc_a101")
        

    except Exception as e:
        logger.error(f"Failed to load table erp_loc_a101: {e}")
        raise ValueError
    
    finally:
        flush_logs_to_minio(f"silver_logs/erp_loc_a101.log")

In [25]:
load_erp_cust_az12()

Loaded table erp_loc_a101 from bronze layer


created result dataset for silver erp_loc_a101
Loaded table silver.erp_loc_a101
Uploaded log to MinIO at logs/silver_logs/erp_loc_a101.log


In [26]:
def load_erp_px_cat_g1v2():
    
    try:
        df = spark.table("spark_catalog.bronze.erp_px_cat_g1v2")
        df.createOrReplaceTempView("erp_px_cat_g1v2")
        logger.info("Loaded table erp_px_cat_g1v2 from bronze layer")

        sales_df = spark.sql("""SELECT 
                             id,
                             cat,
                             subcat,
                             maintenance,
                             CURRENT_TIMESTAMP() AS dwh_create_date
                            FROM erp_px_cat_g1v2
                  """)
        
        logger.info("created result dataset for silver erp_px_cat_g1v2")

        # sales_df.show(5)
        
        sales_df.write.format("iceberg") \
            .mode("overwrite") \
            .insertInto(f"spark_catalog.silver.erp_px_cat_g1v2")
        
        logger.info("Loaded table silver.erp_px_cat_g1v2")
        

    except Exception as e:
        logger.error(f"Failed to load table erp_px_cat_g1v2: {e}")
        raise ValueError
    
    finally:
        flush_logs_to_minio(f"silver_logs/erp_px_cat_g1v2.log")

In [27]:
load_erp_px_cat_g1v2()

Loaded table erp_px_cat_g1v2 from bronze layer
created result dataset for silver erp_px_cat_g1v2
Loaded table silver.erp_px_cat_g1v2
Uploaded log to MinIO at logs/silver_logs/erp_px_cat_g1v2.log
