In [0]:
%sql
CREATE DATABASE IF NOT EXISTS clinicaltrial_silver;

Patients table

In [0]:
# Drop the table if it exists to avoid schema merge errors
spark.sql("DROP TABLE IF EXISTS clinicaltrial_silver.patients")

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, to_date, upper, when, lit
from pyspark.sql.types import IntegerType

df_patients_bronze = spark.table("clinicaltrial_bronze.patients")

window_spec = Window.partitionBy("patient_id").orderBy(col("ingestion_timestamp").desc())

df_patients_silver = (
    df_patients_bronze
    .withColumn("enrollment_date", to_date("enrollment_date"))
    .withColumn("sex", upper(col("sex")))
    .withColumn("sex", when(col("sex") == "MALE", "M").when(col("sex") == "FEMALE", "F").when(col("sex").isNull(), "U").otherwise(col("sex")))
    .withColumn("age", when(col("age") < 18, lit(18)).when(col("age") > 100, lit(100)).otherwise(col("age")))
    .withColumn("age", when(col("age").isNull(), lit(18)).otherwise(col("age")))
    .withColumn("age", col("age").cast(IntegerType()))
    .withColumn("ethnicity", when(col("ethnicity").isNull(), lit("Unknown")).otherwise(col("ethnicity")))
    .withColumn("status", upper(col("status")))
    .filter((col("age") >= 18) & (col("age") <= 100))
    .withColumn("rn", row_number().over(window_spec))
    .filter(col("rn") == 1)
    .drop("rn", "source_file", "ingestion_timestamp")
)

(
    df_patients_silver.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable("clinicaltrial_silver.patients")
)


Visits table

In [0]:

# Drop the table if it exists to avoid schema merge errors
spark.sql("DROP TABLE IF EXISTS clinicaltrial_silver.visits")

from pyspark.sql.functions import col, to_date, row_number
from pyspark.sql.window import Window

df_visits_silver = (
  spark.table("clinicaltrial_bronze.visits")
    .withColumn("visit_date", to_date("visit_date"))    
    .filter(col("visit_type").isin(["Baseline", "Week_4", "Week_8", "Week_12", "Week_16"]))
    .withColumn("rn", row_number().over(Window.partitionBy("patient_id", "visit_type").orderBy(col("visit_date"))))
    .filter(col("rn") == 1)
    .dropDuplicates(["patient_id", "visit_type", "visit_date"])
    .drop("source_file", "ingestion_timestamp")
)

(df_visits_silver.write
  .format("delta")
  .mode("overwrite")
  .saveAsTable("clinicaltrial_silver.visits")
)

Labs table

In [0]:
# Drop table if exists to avoid schema merge errors
spark.sql("DROP TABLE IF EXISTS clinicaltrial_silver.labs")

from pyspark.sql.functions import when, upper, to_date, col, expr

df_labs_silver = (
  spark.table("clinicaltrial_bronze.labs")
    .withColumn("visit_date", to_date("visit_date"))
    .withColumn("lab_test", upper(col("lab_test")))
    .withColumn("lab_value", expr("try_cast(lab_value as double)"))
    .filter(col("lab_value").isNotNull())
    .withColumn("abnormal_flag",
                when((col("lab_value") < col("normal_low")) | (col("lab_value") > col("normal_high")), 1)
                .otherwise(0))
    .drop("source_file", "ingestion_timestamp")
)

(
  df_labs_silver.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("clinicaltrial_silver.labs")
)


Adverse Events table

In [0]:
# Drop table if exists to avoid schema merge errors
spark.sql("DROP TABLE IF EXISTS clinicaltrial_silver.adverse_events")

from pyspark.sql.functions import col, to_date, initcap

df_ae_silver = (
    spark.table("clinicaltrial_bronze.adverse_events")
    .withColumn("ae_start_date", to_date("ae_start_date"))
    .withColumn("ae_end_date", to_date("ae_end_date"))
    .filter((col("severity") >=1) & (col("severity") <= 5))
    .filter(col("ae_end_date") >= col("ae_start_date"))
    .withColumn("ae_term", initcap(col("ae_term")))
    .drop("source_file", "ingestion_timestamp")
)

(
    df_ae_silver.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable("clinicaltrial_silver.adverse_events")
)

Drug Dosing table

In [0]:
# Drop table if exists to avoid schema merge errors
spark.sql("DROP TABLE IF EXISTS clinicaltrial_silver.drug_dosing")
          
from pyspark.sql.functions import when, upper, to_date, col

df_dosing_silver = (
    spark.table("clinicaltrial_bronze.drug_dosing")
    .withColumn("dose_date", to_date("dose_date"))
    .withColumn("dose_mg", col("dose_mg").cast("double"))
    .filter(col("dose_mg") >=0)
    .withColumn("dose_status", upper(col("dose_status")))
    .withColumn(
        "exposed_flag",
        when(col("dose_status") == "TAKEN", 1).otherwise(0)
    )
    .drop("source_file", "ingestion_timestamp")
)

(
    df_dosing_silver
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("clinicaltrial_silver.drug_dosing")
)

Outcomes table

In [0]:
# Drop table if exists to avoid schema merge errors
spark.sql("DROP TABLE IF EXISTS clinicaltrial_silver.outcomes")

valid_responses = ["PD", "SD", "PR", "CR"]

df_outcomes_silver = (
    spark.table("clinicaltrial_bronze.outcomes")
    .withColumn("response_date", to_date("response_date"))
    .withColumn("progression_date", to_date("progression_date"))
    .withColumn("death_date", to_date("death_date"))
    .filter(col("best_response").isin(valid_responses))
    .filter(col("progression_date") >= col("response_date"))
    .drop("source_file", "ingestion_timestamp")
)

(
    df_outcomes_silver
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("clinicaltrial_silver.outcomes")
)