In [0]:
# Databricks notebook source
from pyspark.sql import SparkSession, functions as f

file_path_hospital1 = "abfss://bronze@emrgen2.dfs.core.windows.net/Hosptial1/patients"
df_hospital1 = spark.read.parquet(file_path_hospital1)
file_path_hospital10 = "abfss://bronze@emrgen2.dfs.core.windows.net/hospital10/patients"
df_hospital10 = spark.read.parquet(file_path_hospital10)

df_hospital1.createOrReplaceTempView("patients_hospital1")
df_hospital10.createOrReplaceTempView("patients_hospital10")



In [0]:
%sql
CREATE OR REPLACE TEMP VIEW cdm_patients AS
SELECT CONCAT(SRC_PatientID,'-', datasource) AS Patient_Key, *
FROM (
    SELECT 
    PatientID AS SRC_PatientID ,
    FirstName,
    LastName,
    MiddleName,
    SSN,
    PhoneNumber,
    Gender,
    DOB,
    Address,
    ModifiedDate,
    datasource
    FROM patients_hospital1
    UNION ALL
    SELECT 
    ID AS SRC_PatientID,
    F_Name AS FirstName,
    L_Name AS LastName,
    M_Name ASMiddleName,
    SSN,
    PhoneNumber,
    Gender,
    DOB,
    Address,
    Updated_Date AS ModifiedDate,
    datasource
     FROM patients_hospital10
)


In [0]:
%sql
select * from cdm_patients


In [0]:
%sql
CREATE OR REPLACE TEMP VIEW quality_checks AS
SELECT 
    Patient_Key,
    SRC_PatientID,
    FirstName,
    LastName,
    MiddleName,
    SSN,
    PhoneNumber,
    Gender,
    DOB,
    Address,
    ModifiedDate As SRC_ModifiedDate,
    datasource,
    CASE 
        WHEN SRC_PatientID IS NULL OR dob IS NULL OR firstname IS NULL or lower(firstname)='null' THEN TRUE
        ELSE FALSE
    END AS is_quarantined
FROM cdm_patients


In [0]:
%sql
select * from quality_checks
order by is_quarantined desc


In [0]:
%sql
CREATE TABLE IF NOT EXISTS silver.patients (
    Patient_Key STRING,
    SRC_PatientID STRING,
    FirstName STRING,
    LastName STRING,
    MiddleName STRING,
    SSN STRING,
    PhoneNumber STRING,
    Gender STRING,
    DOB DATE,
    Address STRING,
    SRC_ModifiedDate TIMESTAMP,
    datasource STRING,
    is_quarantined BOOLEAN,
    inserted_date TIMESTAMP,
    modified_date TIMESTAMP,
    is_current BOOLEAN
)
USING DELTA;


In [0]:
%sql
-- Step 1: Mark existing records as historical (is_current = false) for patients that will
MERGE INTO silver.patients AS target
USING quality_checks AS source
ON target.Patient_Key = source.Patient_Key
AND target.is_current = true 
WHEN MATCHED
AND (
    target.SRC_PatientID <> source.SRC_PatientID OR
    target.FirstName <> source.FirstName OR
    target.LastName <> source.LastName OR
    target.MiddleName <> source.MiddleName OR
    target.SSN <> source.SSN OR
    target.PhoneNumber <> source.PhoneNumber OR
    target.Gender <> source.Gender OR
    target.DOB <> source.DOB OR
    target.Address <> source.Address OR
    target.SRC_ModifiedDate <> source.SRC_ModifiedDate OR
    target.datasource <> source.datasource OR
    target.is_quarantined <> source.is_quarantined
)
THEN UPDATE SET
    target.is_current = false,
    target.modified_date = current_timestamp()


WHEN NOT MATCHED
THEN INSERT (
    Patient_Key,
    SRC_PatientID,
    FirstName,
    LastName,
    MiddleName,
    SSN,
    PhoneNumber,
    Gender,
    DOB,
    Address,
    SRC_ModifiedDate,
    datasource,
    is_quarantined,
    inserted_date,
    modified_date,
    is_current
)
VALUES (
    source.Patient_Key,
    source.SRC_PatientID,
    source.FirstName,
    source.LastName,
    source.MiddleName,
    source.SSN,
    source.PhoneNumber,
    source.Gender,
    source.DOB,
    source.Address,
    source.SRC_ModifiedDate,
    source.datasource,
    source.is_quarantined,
    current_timestamp(), -- Set inserted_date to current timestamp
    current_timestamp(), -- Set modified_date to current timestamp
    true -- Mark as current
);


In [0]:
%sql
MERGE INTO silver.patients AS target
USING quality_checks AS source
ON target.Patient_Key = source.Patient_Key
AND target.is_current = true 
-- Step 2: Insert new and updated records into the Delta table, marking them as current
WHEN NOT MATCHED
THEN INSERT (
    Patient_Key,
    SRC_PatientID,
    FirstName,
    LastName,
    MiddleName,
    SSN,
    PhoneNumber,
    Gender,
    DOB,
    Address,
    SRC_ModifiedDate,
    datasource,
    is_quarantined,
    inserted_date,
    modified_date,
    is_current
)
VALUES (
    source.Patient_Key,
    source.SRC_PatientID,
    source.FirstName,
    source.LastName,
    source.MiddleName,
    source.SSN,
    source.PhoneNumber,
    source.Gender,
    source.DOB,
    source.Address,
    source.SRC_ModifiedDate,
    source.datasource,
    source.is_quarantined,
    current_timestamp(), -- Set inserted_date to current timestamp
    current_timestamp(), -- Set modified_date to current timestamp
    true -- Mark as current
);
