### DDL table dim_cpt_code

In [1]:

%%sql
CREATE TABLE IF NOT EXISTS Gold_LH.dim_cpt_code (
    cpt_codes string,
    procedure_code_category string,
    procedure_code_descriptions string,
    code_status string,
    refreshed_at timestamp
);

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 2, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

##### Quering tables

In [1]:
%%sql
select * from Silver_LH.cptcodes  limit 5;

StatementMeta(, 9e9be1c1-af1e-49e7-990b-4302618540e2, 2, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 8 fields>

##### Quering tables

In [10]:
display(spark.read.table('Gold_LH.dim_cpt_code').head(5))

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, feefad11-3277-43e8-b1f4-123130fae79f)

#### writing cpt_code table to Gold lakehouse

In [15]:
from pyspark.sql.functions import col,current_timestamp,lit
df_gold_cptcodes=spark.read.table('Silver_LH.cptcodes')
df_gold_cptcodes=df_gold_cptcodes.select(col('cpt_codes'),col('procedure_code_category'),col('procedure_code_descriptions'),col('code_status'),lit(current_timestamp().alias('refreshed_at')))
df_gold_cptcodes = df_gold_cptcodes.filter(
    (col('is_quarantined') == 'false') & (col('is_current') == 'true')
)
#display(df_gold_cptcodes.head(5))
df_gold_cptcodes.write.mode('append').saveAsTable('Gold_LH.dim_cpt_code')

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 17, Finished, Available, Finished)

#### writing department table to Gold lakehouse

In [42]:
from pyspark.sql.functions import col,current_timestamp,lit,split
df_gold_department=spark.read.table('Silver_LH.departments')
df_gold_department=df_gold_department.distinct().select(col('Dept_id').alias('Dept_id'),(split(col('Dept_Id'),'_')[0]).alias('SRC_Dept_Id'),col('Name'),col('data_source'))
#display(df_gold_department)
df_gold_department.write.mode('overwrite').option("overwriteSchema", "true").saveAsTable('Gold_LH.dim_department')

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 44, Finished, Available, Finished)

##### Quering tables

In [43]:
%%sql
select * from Gold_LH.dim_department limit 5;

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 45, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 4 fields>

### DDL table dim_patient

In [44]:
%%sql
CREATE TABLE IF NOT EXISTS Gold_LH.dim_patient
(
    patient_key STRING,
    src_patientid STRING,
    firstname STRING,
    lastname STRING,
    middlename STRING,
    ssn STRING,
    phonenumber STRING,
    gender STRING,
    dob DATE,
    address STRING,
    datasource STRING
);

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 46, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

##### Quering tables

In [45]:
%%sql
SELECT * from Silver_LH.patients limit 5;

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 47, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 16 fields>

#### writing patient table to Gold lakehouse

In [56]:
from pyspark.sql.functions import col, current_timestamp, lit

# Read the table
df_gold_patient = spark.read.table('Silver_LH.patients')

# Select specific columns
df_gold_patient = df_gold_patient.select(
    col('patient_key'),
    col('SRC_PatientID'),
    col('FirstName'),
    col('LastName'),
    col('MiddleName'),
    col('SSN'),
    col('PhoneNumber'),
    col('Gender'),
    col('DOB'),
    col('Address'),
    col('datasource')
)

# Filter the data
df_gold_patients = df_gold_patient.filter(
    (col('is_current') == 'true') & (col('is_quarantined') == 'true')
)
#.option("overwriteSchema", "true").
df_gold_patients.write.mode('overwrite').saveAsTable('Gold_LH.dim_patient')

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 58, Finished, Available, Finished)

##### Quering tables

In [57]:
%%sql
SELECT * from Gold_LH.dim_patient limit 5;

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 59, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 11 fields>

### DDL table dim_department

In [11]:
%%sql
CREATE TABLE IF NOT EXISTS Gold_LH.dim_department
(
    Dept_Id string,
    SRC_Dept_Id string,
    Name string,
    datasource string
);

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 13, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

##### Quering tables

In [69]:
%%sql
select * from Gold_LH.dim_provider limit 5;

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 71, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 6 fields>

#### writing providers table to Gold lakehouse

In [68]:
from pyspark.sql.functions import col, current_timestamp, lit

# Read the table
df_gold_providers = spark.read.table('Silver_LH.providers')
df_gold_providers=df_gold_providers.select(col('ProviderID'),col('FirstName'),col('LastName'),(concat(col('DeptID'),lit('_'),col('data_source'))).alias('DeptID'),col('NPI'),col('data_source').alias('datasource'))
df_gold_providers.write.mode('overwrite').saveAsTable('Gold_LH.dim_provider')

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 70, Finished, Available, Finished)

### DDL table dim_provider

In [58]:
%%sql
CREATE TABLE IF NOT EXISTS Gold_LH.dim_provider
(
    ProviderID string,
    FirstName string,
    LastName string,
    DeptID string,
    NPI long,
    datasource string
);

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 60, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

### DDL table fact_transactions

In [70]:
%%sql
create table if not exists Gold_LH.fact_transactions
(
  TransactionID string,
  SRC_TransactionID string,
  EncounterID string,
  FK_PatientID string,
  FK_ProviderID string,
  FK_DeptID string,
  ICDCode string,
  ProcedureCode string,
  VisitType string,
  ServiceDate date,
  PaidDate date,
  Amount double,
  PaidAmount double,
  AmountType string,
  ClaimID string,
  datasource string,
  refreshed_at timestamp
)

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 72, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

##### Quering tables

In [75]:
%%sql
select * from Gold_LH.fact_transactions limit 5

StatementMeta(, 41bfe7f0-82b4-41b0-b0a6-4ec6e8016db3, 77, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 17 fields>

#### truncating data from table in spark

In [3]:
spark.sql("DELETE FROM Gold_LH.fact_transactions WHERE 1=1")

StatementMeta(, a340de59-d9ed-431a-b26b-30a3e6fc4d01, 5, Finished, Available, Finished)

DataFrame[num_affected_rows: bigint]

#### writing data to fact table in Gold lakehouse

In [4]:
%%sql
insert into Gold_LH.fact_transactions
select 
  t.TransactionID, 
  t.SRC_TransactionID,
  t.EncounterID,
  concat(t.PatientID,'-',t.datasource ) as FK_Patient_ID,
  case when t.datasource='hos-a' then concat('H1-',t.providerID) else concat('H2-',t.providerID ) end as FK_Provider_ID, 
  concat(t.DeptID,'_',t.datasource ) as FK_Dept_ID, 
  t.ICDCode,
  t.ProcedureCode CPT_Code,
  t.VisitType,
  t.ServiceDate, 
  t.PaidDate,
  t.Amount Charge_Amt, 
  t.PaidAmount Paid_Amt, 
  t.AmountType,
  t.ClaimID,
  t.datasource,
  current_timestamp()
  from Silver_LH.transactions t 
  where t.is_current=true and t.is_quarantined=false

StatementMeta(, a340de59-d9ed-431a-b26b-30a3e6fc4d01, 6, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>