In [0]:

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

bronze_path_sales_delta = '/Volumes/workspace/hls_demo_2/raw_data/sales_data_bronze_delta'
bronze_path_inventory_delta = '/Volumes/workspace/hls_demo_2/raw_data/inventory_data_bronze_delta'
bronze_path_clinical_trial_delta = '/Volumes/workspace/hls_demo_2/raw_data/clinical_trial_data_bronze_delta'

sales_df = spark.read.format('delta').load(bronze_path_sales_delta)
inventory_df = spark.read.format('delta').load(bronze_path_inventory_delta)
clinical_trial_df = spark.read.format('delta').load(bronze_path_clinical_trial_delta)

# Cleanse: Remove rows with missing values in critical columns
sales_df_cleaned = sales_df.dropna(subset=['product_code', 'product_name', 'sales_qty', 'sales_amount'])

# Aggregation: Total sales by product and region
sales_silver_df = sales_df_cleaned.groupBy('product_name', 'region', 'order_date').agg(
    F.sum('sales_qty').alias('total_sales_qty'),
    F.sum('sales_amount').alias('total_sales_amount')
)

# Enrichment: Add year and month from order_date for better time-based analysis
sales_silver_df = sales_silver_df.withColumn('order_date', F.to_date('order_date', 'yyyy-MM-dd')) \
    .withColumn('year', F.year('order_date')) \
    .withColumn('month', F.month('order_date'))

# Show the Silver Layer (Sales Data)
sales_silver_df.show(5)
sales_silver_path_delta = '/Volumes/workspace/hls_demo_2/refined_data/sales_data_silver_delta'
sales_silver_df = sales_silver_df.withColumn('order_date', F.to_date('order_date', 'yyyy-MM-dd'))
sales_silver_df.write.format('delta').mode('overwrite').save(sales_silver_path_delta)

# Silver Layer Transformations for Inventory Data
# Cleanse: Remove rows with missing values in stock_qty or reorder_point
inventory_df_cleaned = inventory_df.dropna(subset=['product_code', 'stock_qty', 'reorder_point'])

# Aggregation: Total stock and reorder point by product and location (warehouse)
inventory_silver_df = inventory_df_cleaned.groupBy('product_code', 'location').agg(
    F.sum('stock_qty').alias('total_stock_qty'),
    F.sum('reorder_point').alias('total_reorder_point')
)

# Enrichment: Add stock-to-reorder ratio to help with analysis
inventory_silver_df = inventory_silver_df.withColumn(
    'stock_to_reorder_ratio', F.col('total_stock_qty') / F.col('total_reorder_point')
)

# Show the Silver Layer (Inventory Data)
inventory_silver_df.show(5)

inventory_silver_path_delta = '/Volumes/workspace/hls_demo_2/refined_data/inventory_data_silver_delta'
inventory_silver_df.write.format('delta').mode('overwrite').save(inventory_silver_path_delta)

# ----

# Silver Layer Transformations for Clinical Trial Data
# Cleanse: Remove rows with missing trial_id, drug_name, or success_rate
from pyspark.sql import functions as F

# Calculate trial duration per row (in days)
clinical_trial_df = clinical_trial_df.withColumn(
    'trial_duration',
    (F.col('end_date').cast('long') - F.col('start_date').cast('long')) / (24 * 3600)  # convert seconds to days
)

# Cleanse
clinical_trial_df_cleaned = clinical_trial_df.dropna(subset=['trial_id', 'drug_name', 'success_rate', 'trial_duration'])

# Aggregate average success rate and average trial duration per drug
clinical_trial_silver_df = clinical_trial_df_cleaned.groupBy('drug_name').agg(
    F.avg('success_rate').alias('avg_success_rate'),
    F.avg('trial_duration').alias('avg_trial_duration_days')
)

clinical_trial_silver_df.show(5)


clinical_trial_silver_path_delta = '/Volumes/workspace/hls_demo_2/refined_data/clinical_trial_data_silver_delta'
clinical_trial_silver_df.write.format('delta').mode('overwrite').save(clinical_trial_silver_path_delta)


