In [0]:
import logging
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
def get_logger(name):
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    if not logger.handlers:
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
    return logger

In [0]:


def process_and_save_crm_cust_info():

    logger = get_logger("crm_cust_info_logger")
    try:
        logger.info("Reading source table dwh.bronze.crm_cust_info")
        df_crm_cust_info = spark.read.table('dwh.bronze.crm_cust_info')

        logger.info("Transforming data")
        df_crm_cust_info = df_crm_cust_info.withColumn(
            'dup_rank', 
            row_number().over(Window.partitionBy('cst_id').orderBy(col('cst_create_date').desc()))
        ).withColumn(
            'cst_firstname', 
            trim(col('cst_firstname'))
        ).withColumn(
            'cst_lastname', 
            trim(col('cst_lastname'))
        ).withColumn(
            'cst_gndr', 
            when(upper(trim(col('cst_gndr'))) == 'F', 'Female')
            .when(upper(trim(col('cst_gndr'))) == 'M', 'Male')
            .otherwise('n/a')
        ).withColumn(
            'cst_marital_status', 
            when(upper(trim(col('cst_marital_status'))) == 'S', 'Single')
            .when(upper(trim(col('cst_marital_status'))) == 'M', 'Married')
            .otherwise('n/a')
        ).filter(
            (col('dup_rank') == 1) & (col('cst_id').isNotNull())
        ).drop('dup_rank')

        logger.info("Writing to target table dwh.silver.crm_cust_info")
        df_crm_cust_info.write.mode('overwrite').saveAsTable('dwh.silver.crm_cust_info')
        logger.info("Process completed successfully")
    except Exception as e:
        logger.error(f"Error processing CRM customer info: {e}")
        raise


In [0]:
def process_and_save_crm_prd_info():
    logger = get_logger("crm_prd_info_logger")

    try:
        logger.info("Reading source table dwh.bronze.crm_prd_info")
        df_crm_prd_info = spark.read.table('dwh.bronze.crm_prd_info')
        logger.info("Transforming data")
        df_crm_prd_info = df_crm_prd_info.withColumn(
            'cat_id', regexp_replace(substring(col('prd_key'), 1, 5), '-', '_')
        ).withColumn(
            'prd_key', substring(col('prd_key'), 7, length(col('prd_key')))
        ).withColumn(
            'prd_cost', coalesce(col('prd_cost'), lit(0))
        ).withColumn(
            'prd_line', when(upper(trim(col('prd_line'))) == 'M', 'Mountain')
            .when(upper(trim(col('prd_line'))) == 'R', 'Road')
            .when(upper(trim(col('prd_line'))) == 'T', 'Touring')
            .when(upper(trim(col('prd_line'))) == 'S', 'Other Sales')
            .otherwise('n/a')
        ).withColumn(
            'prd_nm', trim(col('prd_nm'))
        ).withColumn(
            'prd_end_dt', lead(col('prd_start_dt'), 1).over(Window.partitionBy('prd_key').orderBy(col('prd_start_dt'))) - 1
        )
        logger.info("Writing to target table dwh.silver.crm_prd_info")
        df_crm_prd_info.write.mode('overwrite').saveAsTable('dwh.silver.crm_prd_info')
        logger.info("Process completed successfully")
    except Exception as e:
        logger.error(f"Error processing CRM product info: {e}")
        raise

In [0]:

def process_and_save_crm_sales_details():
    logger = get_logger("crm_sales_details_logger")
    try:
        logger.info("Reading source table dwh.bronze.crm_sales_details")
        df_crm_sales_details = spark.read.table('dwh.bronze.crm_sales_details')

        logger.info("Transforming data")
        df_crm_sales_details = df_crm_sales_details.withColumn(
            'sls_order_dt', 
            when((col('sls_order_dt') == 0) | (length(col('sls_order_dt')) != 8), None)
            .otherwise(to_date(col('sls_order_dt').cast('string'), 'yyyyMMdd'))
        ).withColumn(
            'sls_ship_dt', 
            when((col('sls_ship_dt') == 0) | (length(col('sls_ship_dt')) != 8), None)
            .otherwise(to_date(col('sls_ship_dt').cast('string'), 'yyyyMMdd'))
        ).withColumn(
            'sls_due_dt', 
            when((col('sls_due_dt') == 0) | (length(col('sls_due_dt')) != 8), None)
            .otherwise(to_date(col('sls_due_dt').cast('string'), 'yyyyMMdd'))
        ).withColumn(
            'sls_sales', 
            when(
                (col('sls_sales').isNull()) | (col('sls_sales') <= 0) | (col('sls_sales') != col('sls_quantity') * abs(col('sls_price'))), 
                col('sls_quantity') * abs(col('sls_price'))
            ).otherwise(col('sls_sales'))
        ).withColumn(
            'sls_price',
            when((col('sls_price').isNull()) | (col('sls_price') <= 0), 
                (col('sls_sales')/when(col('sls_quantity') == 0, None).otherwise(col('sls_quantity'))).cast('int'))
            .otherwise(col('sls_price').cast('int'))
        )

        logger.info("Writing to target table dwh.silver.crm_sales_details")
        df_crm_sales_details.write.mode('overwrite').saveAsTable('dwh.silver.crm_sales_details')
        logger.info("Process completed successfully")
    except Exception as e:
        logger.error(f"Error processing CRM sales details: {e}")
        raise

In [0]:
def process_and_save_erp_cust_az12():
    logger = get_logger("erp_cust_az12_logger")

    try:
        logger.info("Reading source table dwh.bronze.erp_cust_az12")
        df_erp_cust_az12 = spark.read.table('dwh.bronze.erp_cust_az12')
        logger.info("Transforming data")
        df_erp_cust_az12 = df_erp_cust_az12.withColumn(
            'cid', 
            when(col('cid').like('NAS%'), substring(col('cid'), 4, length(col('cid'))))
            .otherwise(col('cid'))
        ).withColumn(
            'bdate', 
            when(col('bdate') > current_date(), None)
            .otherwise(col('bdate'))
        ).withColumn(
            'gen', 
            when(upper(trim(col('gen'))) == 'F', 'Female')
            .when(upper(trim(col('gen'))) == 'FEMALE', 'Female')
            .when(upper(trim(col('gen'))) == 'M', 'Male')
            .when(upper(trim(col('gen'))) == 'MALE', 'Male')
            .otherwise('n/a')
        )
        logger.info("Writing to target table dwh.silver.erp_cust_az12")
        df_erp_cust_az12.write.mode('overwrite').saveAsTable('dwh.silver.erp_cust_az12')
        logger.info("Process completed successfully")
    except Exception as e:
        logger.error(f"Error processing ERP customer az12: {e}")
        raise

In [0]:
def process_and_save_erp_loc_a101():
    logger = get_logger("erp_loc_a101_logger")

    try:
        logger.info("Reading source table dwh.bronze.erp_loc_a101")
        df_erp_loc_a101 = spark.read.table('dwh.bronze.erp_loc_a101')
        logger.info("Transforming data")
        df_erp_loc_a101 = df_erp_loc_a101.withColumn(
            'cid', 
            regexp_replace(col('cid'), '-', '') 
        ).withColumn(
            'cntry', 
            when(trim(col('cntry')) == 'DE', 'Germany')
            .when(trim(col('cntry')).isin(['US', 'USA']), 'United States')
            .when((trim(col('cntry')) == '') | col('cntry').isNull(), 'n/a')
            .otherwise(trim(col('cntry')))
        )
        logger.info("Writing to target table dwh.silver.erp_loc_a101")
        df_erp_loc_a101.write.mode('overwrite').saveAsTable('dwh.silver.erp_loc_a101')
        logger.info("Process completed successfully")
    except Exception as e:
        logger.error(f"Error processing ERP location a101: {e}")
        raise

In [0]:
def process_and_save_erp_px_cat_g1v2():
    logger = get_logger("erp_px_cat_g1v2_logger")

    try:
        logger.info("Reading source table dwh.bronze.erp_px_cat_g1v2")
        df_erp_px_cat_g1v2 = spark.read.table('dwh.bronze.erp_px_cat_g1v2')
        logger.info("Writing to target table dwh.silver.erp_px_cat_g1v2")
        df_erp_px_cat_g1v2.write.mode('overwrite').saveAsTable('dwh.silver.erp_px_cat_g1v2')
        logger.info("Process completed successfully")
    except Exception as e:
        logger.error(f"Error processing ERP px cat g1v2: {e}")
        raise

In [0]:

process_and_save_crm_cust_info()

2025-07-29 16:24:55,457 INFO Reading source table dwh.bronze.crm_cust_info
2025-07-29 16:24:56,118 INFO Transforming data
2025-07-29 16:24:56,231 INFO Writing to target table dwh.silver.crm_cust_info
2025-07-29 16:24:58,313 INFO Process completed successfully


In [0]:
process_and_save_crm_prd_info()

2025-07-29 16:24:58,453 INFO Reading source table dwh.bronze.crm_prd_info
2025-07-29 16:24:58,567 INFO Transforming data
2025-07-29 16:24:58,663 INFO Writing to target table dwh.silver.crm_prd_info
2025-07-29 16:25:00,316 INFO Process completed successfully


In [0]:
process_and_save_crm_sales_details()

2025-07-29 16:25:00,458 INFO Reading source table dwh.bronze.crm_sales_details
2025-07-29 16:25:00,539 INFO Transforming data
2025-07-29 16:25:00,678 INFO Writing to target table dwh.silver.crm_sales_details
2025-07-29 16:25:02,561 INFO Process completed successfully


In [0]:
process_and_save_erp_cust_az12()

2025-07-29 16:25:02,755 INFO Reading source table dwh.bronze.erp_cust_az12
2025-07-29 16:25:02,863 INFO Transforming data
2025-07-29 16:25:02,920 INFO Writing to target table dwh.silver.erp_cust_az12
2025-07-29 16:25:04,365 INFO Process completed successfully


In [0]:
process_and_save_erp_loc_a101()

2025-07-29 16:25:04,557 INFO Reading source table dwh.bronze.erp_loc_a101
2025-07-29 16:25:04,655 INFO Transforming data
2025-07-29 16:25:04,700 INFO Writing to target table dwh.silver.erp_loc_a101
2025-07-29 16:25:05,874 INFO Process completed successfully


In [0]:
process_and_save_erp_px_cat_g1v2()

2025-07-29 16:25:06,066 INFO Reading source table dwh.bronze.erp_px_cat_g1v2
2025-07-29 16:25:06,210 INFO Writing to target table dwh.silver.erp_px_cat_g1v2
2025-07-29 16:25:07,452 INFO Process completed successfully
