In [0]:
raw_appointment = spark.read.table("workspace.teamb.raw_appointments")
display(raw_appointment)

In [0]:
from pyspark.sql.functions import expr


raw_appointment = raw_appointment \
    .withColumn("cmgID", expr("try_cast(cmgID as int)")) \
    .withColumn("nookalDbID", expr("try_cast(nookalDbID as string)")) \
    .withColumn("ID", expr("try_cast(ID as int)")) \
    .withColumn("patientID", expr("try_cast(patientID as int)")) \
    .withColumn("date", expr("try_cast(date as date)")) \
    .withColumn("startTime", expr("try_cast(startTime as timestamp)")) \
    .withColumn("endTime", expr("try_cast(endTime as timestamp)")) \
    .withColumn("locationID", expr("try_cast(locationID as int)")) \
    .withColumn("type", expr("try_cast(type as string)")) \
    .withColumn("typeID", expr("try_cast(typeID as int)")) \
    .withColumn("practitionerID", expr("try_cast(practitionerID as int)")) \
    .withColumn("emailReminderSent", expr("try_cast(emailReminderSent as boolean)")) \
    .withColumn("arrived", expr("try_cast(arrived as boolean)")) \
    .withColumn("DNA", expr("try_cast(DNA as boolean)")) \
    .withColumn("cancelled", expr("try_cast(cancelled as boolean)")) \
    .withColumn("cancellationDate", expr("try_cast(cancellationDate as date)")) \
    .withColumn("notes", expr("try_cast(notes as string)")) \
    .withColumn("dateModified", expr("try_cast(dateModified as date)")) \
    .withColumn("dateCreated", expr("try_cast(dateCreated as date)")) \
    .withColumn("invoiceGenerated", expr("try_cast(invoiceGenerated as boolean)")) \
    .withColumn("source_name", expr("try_cast(source_name as string)")) \
    .withColumn("source_row_count", expr("try_cast(source_row_count as int)")) \
    .withColumn("dtmCreated", expr("try_cast(dtmCreated as timestamp)")) \
    .withColumn("dtmUpdated", expr("try_cast(dtmUpdated as timestamp)")) \
    .withColumn("dtmProcessed", expr("try_cast(dtmProcessed as timestamp)"))


raw_appointment.display()


In [0]:
#drop rows that contains null values in cmgID and ID
raw_appointment = raw_appointment.dropna(subset=["cmgID", "ID"])

#Drop duplicates
raw_appointment = raw_appointment.dropDuplicates(["nookalDbID","ID","patientID"])
raw_appointment.display()

In [0]:

# this is something that I want to do so that we can flag invalid datatype value  rether than nulling it out 
# %load_ext autoreload
# %autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

# from pyspark.sql.functions import col
# import sys
# sys.path.append("/Workspace/Shared/databricks_getting_started_ab/prod/files/src")

# from udf.datatype_utils import *


# # Define the column -> UDF mapping
# column_validators = {
#     "cmgID": isIntUDF,
#     "nookalDbID": isIntUDF,
#     "ID": isIntUDF,
#     "patientID": isIntUDF,
#     "date": isDateUDF,
#     "startTime": isTimestampUDF,
#     "endTime": isTimestampUDF,
#     "locationID": isIntUDF,
#     "type": isIntUDF,
#     "typeID": isIntUDF,
#     "practitionerID": isIntUDF,
#     "emailReminderSent": isBoolUDF,
#     "arrived": isBoolUDF,
#     "DNA": isBoolUDF,
#     "cancelled": isBoolUDF,
#     "cancellationDate": isDateUDF,
#     "dateModified": isDateUDF,
#     "dateCreated": isDateUDF,
#     "invoiceGenerated": isBoolUDF,
#     "source_row_count": isIntUDF,
#     "dtmCreated": isTimestampUDF,
#     "dtmUpdated": isTimestampUDF,
#     "dtmProcessed": isTimestampUDF
# }

# # Apply filter for all columns with validators
# for col_name, validator in column_validators.items():
#     if validator is not None:
#         raw_appointment = raw_appointment.filter(validator(col(col_name)))
# raw_appointment.display()

# #Mapping of column name to final desired data type
# cast_map = {
#     "cmgID": "int",
#     "nookalDbID": "string",  
#     "ID": "int",
#     "patientID": "int",
#     "date": "date",
#     "startTime": "timestamp",
#     "endTime": "timestamp",
#     "locationID": "int",
#     "type": "int",
#     "typeID": "int",
#     "practitionerID": "int",
#     "emailReminderSent": "boolean",
#     "arrived": "boolean",
#     "DNA": "boolean",
#     "cancelled": "boolean",
#     "cancellationDate": "date",
#     "dateModified": "date",
#     "dateCreated": "date",
#     "invoiceGenerated": "boolean",
#     "source_row_count": "int",
#     "dtmCreated": "timestamp",
#     "dtmUpdated": "timestamp",
#     "dtmProcessed": "timestamp"
#     # notes and source_name left uncasted (assumed string or free text)
# }

# # Apply casting to all specified columns
# for col_name, data_type in cast_map.items():
#     raw_appointment = raw_appointment.withColumn(col_name, col(col_name).cast(data_type))


In [0]:
from pyspark.sql.functions import col, current_timestamp, lit

# Select the columns that we need in cdm and add appointmentID and sourceCmgID
cdm_appointment = raw_appointment.select(
    col("cmgID").alias("sourceCmgID"),
    "nookalDbID",
    "ID",
    "patientID",
    "date",
    "startTime",
    "endTime",
    "locationID",
    "type",
    "typeID",
    "practitionerID",
    "emailReminderSent",
    "arrived",
    "DNA",
    "cancelled",
    "cancellationDate",
    "notes",
    "dateModified",
    "dateCreated",
    "invoiceGenerated",
    col("dtmCreated").alias("dtmCreatedSource"),
    col("dtmUpdated").alias("dtmUpdatedSource"),
    col("dtmProcessed").alias("dtmProcessedSource"),
    current_timestamp().alias("dtmCreated")
)
display(cdm_appointment)

In [0]:
from delta.tables import DeltaTable

table_name = "teamb.cdm_appointment"

insert_cols = cdm_appointment.columns  # excludes appointmentID since it's not present
insert_map = {col: f"source.{col}" for col in insert_cols}

try:
    cdm_table = DeltaTable.forName(spark, table_name)

    (
        cdm_table.alias("target")
        .merge(
            cdm_appointment.alias("source"),
            "target.sourceCmgID = source.sourceCmgID"
        )
        .whenMatchedUpdate(set=insert_map)
        .whenNotMatchedInsert(values=insert_map) 
        .execute()
    )

    print("Merge completed successfully.")

except Exception as e:
    print(f"An error occurred during the merge operation: {e}")


In [0]:
%sql
SELECT *
FROM teamb.cdm_appointment
