In [0]:
%run "../01_setup/01_config"

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
%sql
CREATE TABLE IF NOT EXISTS healthcare_rcm_databricks.silver.patients_scd2 (
  patient_key STRING,
  src_patient_id STRING,
  first_name STRING,
  last_name STRING,
  middle_name STRING,
  ssn STRING,
  phone_number STRING,
  gender STRING,
  dob DATE,
  address STRING,
  src_updated_date TIMESTAMP,
  data_source STRING,

  hash_diff STRING,
  effective_start_ts TIMESTAMP,
  effective_end_ts TIMESTAMP,
  is_current BOOLEAN,

  silver_inserted_ts TIMESTAMP,
  silver_modified_ts TIMESTAMP
)
USING DELTA;

CREATE TABLE IF NOT EXISTS healthcare_rcm_databricks.silver.quarantine_patients (
  patient_key STRING,
  src_patient_id STRING,
  first_name STRING,
  last_name STRING,
  middle_name STRING,
  ssn STRING,
  phone_number STRING,
  gender STRING,
  dob DATE,
  address STRING,
  src_updated_date TIMESTAMP,
  data_source STRING,
  is_quarantined BOOLEAN,
  quarantine_reason STRING,
  quarantine_ts TIMESTAMP,
  ingest_ts TIMESTAMP,
  source_file STRING
)
USING DELTA;


In [0]:
hosa_path = "healthcare_rcm_databricks.stg_bronze.patients_hosa_raw"
hosb_path = "healthcare_rcm_databricks.stg_bronze.patients_hosb_raw"

### Extract data from bronze staging layer

In [0]:
df_hosa = spark.table(hosa_path)
df_hosb = spark.table(hosb_path)

In [0]:
df_hosa = df_hosa.withColumnsRenamed({
    'PatientID': 'src_patient_id',
    'FirstName': 'first_name',
    'LastName': 'last_name',
    'MiddleName': 'middle_name',
    'SSN': 'ssn',
    'PhoneNumber': 'phone_number',
    'Gender': 'gender',
    'DOB': 'dob',
    'Address': 'address',
    'ModifiedDate': 'src_updated_date',
    '_rescued_data': 'rescued_data',
    '_ingest_ts': 'ingest_ts',
    '_source_file': 'source_file'
})

df_hosa = df_hosa.select(
            "src_patient_id", "first_name", "last_name", "middle_name",
        "ssn", "phone_number", "gender", "dob", "address",
        "src_updated_date", "data_source",
        "ingest_ts", "source_file"
)

In [0]:
df_hosb = df_hosb.withColumnsRenamed({
    'ID': 'src_patient_id',
    'F_Name': 'first_name',
    'L_Name': 'last_name',
    'M_Name': 'middle_name',
    'SSN': 'ssn',
    'PhoneNumber': 'phone_number',
    'Gender': 'gender',
    'DOB': 'dob',
    'Address': 'address',
    'Updated_Date': 'src_updated_date',
    '_rescued_data': 'rescued_data',
    '_ingest_ts': 'ingest_ts',
    '_source_file': 'source_file'
})

df_hosb = df_hosb.select(
            "src_patient_id", "first_name", "last_name", "middle_name",
        "ssn", "phone_number", "gender", "dob", "address",
        "src_updated_date", "data_source",
        "ingest_ts", "source_file"
)

### Merging data of both hospitals

In [0]:
df_merged = df_hosa.unionByName(df_hosb)\
    .withColumn('dob', col('dob').cast('date'))\
        .withColumn('src_updated_date', col('src_updated_date').cast('timestamp'))


In [0]:
#df_merged.display()

### Transformations

In [0]:
df_merged = df_merged.withColumn('patient_key', concat(col('src_patient_id'), lit('-'), col('data_source')))

In [0]:
cols = ['first_name', 'last_name', 'middle_name', 'gender']

for c in cols:
    df_merged = df_merged.withColumn(c, initcap(trim(regexp_replace(col(c), r"\s+", " "))))

In [0]:
df_merged = df_merged.withColumn(
    "is_quarantined",
    when(col('src_patient_id').isNull() | col('first_name').isNull(), lit(True)).otherwise(lit(False))
)

In [0]:
final_df = df_merged.select("patient_key", "src_patient_id", "first_name", "last_name",
"middle_name", "ssn", "phone_number", "gender",
"dob", "address", "src_updated_date", "data_source", "is_quarantined","ingest_ts","source_file"
)

In [0]:
#final_df.display()

### Deduplication

In [0]:
spec = Window.partitionBy('patient_key').orderBy(col('src_updated_date').desc_nulls_last(), col('ingest_ts').desc())

stg_df = final_df.withColumn('rn', row_number().over(spec))\
    .filter(col('rn') == 1)\
        .drop('rn')

### Separate Clean & Quarantine records

In [0]:
clean_df = stg_df.filter(col('is_quarantined') == False)

quarantine_df = stg_df.filter(col('is_quarantined') == True)

In [0]:
quarantine_df.withColumn('quarantine_reason', lit('Missing Patient ID or First Name'))\
    .withColumn("is_quarantined", lit(True))\
    .withColumn('quarantine_ts', lit(current_timestamp()))\
        .write.mode("append").format('delta').saveAsTable("healthcare_rcm_databricks.silver.quarantine_patients")


### Late Arriving data

In [0]:
current_df = spark.table("healthcare_rcm_databricks.silver.patients_scd2")\
    .filter(col('is_current') == True)\
        .select('patient_key',"effective_start_ts")

current_clean_df = clean_df.join(current_df, on = "patient_key", how = 'left')

good_df = current_clean_df.filter((col('effective_start_ts').isNull()) | (col('src_updated_date') >= col('effective_start_ts')))\
    .drop('effective_start_ts')

late_df = current_clean_df.filter((col('effective_start_ts').isNotNull()) & (col('src_updated_date') < col('effective_start_ts')))\
    .drop('effective_start_ts')

late_df\
 .withColumn("is_quarantined", lit(True))\
 .withColumn("quarantine_reason", lit("late_arriving_update"))\
 .withColumn("quarantine_ts", current_timestamp())\
 .write.mode("append").format("delta")\
 .saveAsTable("healthcare_rcm_databricks.silver.quarantine_patients")


### MERGE logic

In [0]:
tracked_cols = [
    "src_patient_id", "first_name", "last_name", "middle_name", "ssn",
    "phone_number", "gender", "dob", "address", "data_source"
]

good_df2 = good_df.withColumn(
    "hash_diff",
    sha2(concat_ws("||", *[col(c).cast("string") for c in tracked_cols]), 256)
)

good_df2.createOrReplaceTempView("patients_stg")

In [0]:
%sql
MERGE INTO healthcare_rcm_databricks.silver.patients_scd2 AS tgt
USING patients_stg AS src
ON tgt.patient_key = src.patient_key
AND tgt.is_current = true

WHEN MATCHED
  AND tgt.hash_diff <> src.hash_diff
THEN UPDATE SET
  tgt.is_current = false,
  tgt.effective_end_ts = src.src_updated_date,
  tgt.silver_modified_ts = current_timestamp();


In [0]:
%sql
INSERT INTO healthcare_rcm_databricks.silver.patients_scd2 (
  patient_key, src_patient_id, first_name, last_name, middle_name, ssn,
  phone_number, gender, dob, address,
  src_updated_date, data_source,
  hash_diff,
  effective_start_ts, effective_end_ts, is_current,
  silver_inserted_ts, silver_modified_ts
)
SELECT
  src.patient_key, src.src_patient_id, src.first_name, src.last_name, src.middle_name, src.ssn,
  src.phone_number, src.gender, src.dob, src.address,
  src.src_updated_date, src.data_source,
  src.hash_diff,
  src.src_updated_date AS effective_start_ts,
  CAST(NULL AS TIMESTAMP) AS effective_end_ts,
  true AS is_current,
  current_timestamp() AS silver_inserted_ts,
  current_timestamp() AS silver_modified_ts
FROM patients_stg src
LEFT JOIN healthcare_rcm_databricks.silver.patients_scd2 tgt
  ON tgt.patient_key = src.patient_key
 AND tgt.is_current = true
WHERE
  tgt.patient_key IS NULL
  OR tgt.hash_diff <> src.hash_diff;
