In [0]:
import pyspark.sql.functions as F

In [0]:

patients = spark.table("bronze_patients")
silver_patients = (
    patients
    .dropDuplicates(["patient_id"])
    .filter("age is Not Null")
    .withColumn("age",F.col("age").cast("int"))
)

silver_patients.write.mode("overwrite").saveAsTable("silver_patients")

In [0]:
prescriptions = spark.table("bronze_prescriptions")
silver_prescriptions = (
    prescriptions
    .filter("start_date IS NOT NULL")
    .withColumn("active_days", F.datediff("end_date", "start_date"))
)
silver_prescriptions.write.mode("overwrite").saveAsTable("silver_prescriptions")

In [0]:
events = spark.table("bronze_events")

adverse_events = events.filter(
    F.col("event_type").isin("fall","fracture", "dizziness")
)

labels = (
    adverse_events
    .join(silver_prescriptions,"patient_id")
    .withColumn(
        "days_to_event",
        F.datediff("event_date","start_date")
    )
    .withColumn(
        "label",
        F.when(F.col("days_to_event") <=28, 1).otherwise(0)
    )
    .select("patient_id", "label")
    .dropDuplicates(["patient_id"])
)

labels.write.mode("overwrite").saveAsTable("silver_labels")

In [0]:
spark.table("silver_labels").display()

In [0]:
silver_features = (
    silver_patients
    .join(silver_prescriptions, "patient_id", "left")
    .join(labels, "patient_id","left")
    .fillna({"label":0})
)
silver_features.write.mode("overwrite").saveAsTable("silver_features")

In [0]:
spark.table("silver_features").display()