1. Import required libraries

In [0]:
from pyspark.sql.functions import to_timestamp, concat, col, lit, to_date, current_timestamp, explode
from delta.tables import *

In [0]:
dbutils.widgets.text("destination", "")
file_destination = dbutils.widgets.get("destination")
absolute_path = "/mnt/datalake_mount/" + file_destination;

2. Import schema from patient notebook using magic command

3. Include schema from '../pipeline/schema/patient' file

In [0]:
%run "../pipeline/schema/patient"

4. Read data from bronze layer

In [0]:
display(dbutils.fs.mounts())

mountPoint,source,encryptionType
/databricks-datasets,databricks-datasets,
/mnt/datalake_mount,abfss://datalake@datalakebigdataywgug1c0m.dfs.core.windows.net,
/Volumes,UnityCatalogVolumes,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/databricks/mlflow-registry,databricks/mlflow-registry,
/Volume,DbfsReserved,
/volumes,DbfsReserved,
/,DatabricksRoot,
/volume,DbfsReserved,


In [0]:
df = spark.read.schema(schema).option("recursiveFileLookup","true").json(absolute_path, multiLine=True)
df = df.dropDuplicates(subset = ['id'])

In [0]:
df.show()

+------+------+--------+----------+------------+--------------------+--------------------+--------------------+
|active|gender|      id| birthDate|resourceType|             address|                name|             telecom|
+------+------+--------+----------+------------+--------------------+--------------------+--------------------+
|  true|  male|example1|1974-12-25|     Patient|[{PleasantVille, ...|[{official, Chalm...|[{home, null, nul...|
+------+------+--------+----------+------------+--------------------+--------------------+--------------------+



In [0]:
df_converted_date = df.withColumn("birth_date", to_date(col("birthDate"), 'yyyy-MM-dd'))\
    .withColumn("ingestion_date", current_timestamp())\
    .withColumnRenamed("resourceType", "resource_type")\
    .drop(col("birthDate"))

#df_address_explode = df_converted_date.select(explode(df.address))


In [0]:
df_converted_date.head()

Out[32]: Row(active=True, gender='male', id='example1', resource_type='Patient', address=[Row(city='PleasantVille', district='Rainbow', postal=None, state='Vic', type='both', use='home', line=['534 Erewhon St'])], name=[Row(use='official', family='Chalmers', given=['Peter', 'James'], period=None), Row(use='usual', family=None, given=['Jim'], period=None), Row(use='maiden', family='Windsor', given=['Peter', 'James'], period=Row(start=None, end='2002'))], telecom=[Row(use='home', value=None, system=None, rank=None, period=None), Row(use='work', value='(03) 5555 6473', system='phone', rank=1, period=None), Row(use='mobile', value='(03) 3410 5613', system='phone', rank=2, period=None), Row(use='old', value='(03) 5555 8834', system='phone', rank=None, period=Row(start=None, end='2014'))], birth_date=datetime.date(1974, 12, 25), ingestion_date=datetime.datetime(2023, 8, 8, 8, 59, 59, 861000))

In [0]:
#%sql
#DROP TABLE IF EXISTS patients

In [0]:
#spark.sql("show tables").show()

In [0]:
patient_delta_table = DeltaTable.createIfNotExists(spark).location("/mnt/datalake_mount/silver/patient-table")\
  .tableName("patients") \
  .addColumns(df_converted_date.schema)\
  .addColumn("update_date", "TIMESTAMP")\
  .execute()

In [0]:
#patient_delta_table = DeltaTable.forPath(spark, "/mnt/datalake_mount/silver/patient-table")

patient_delta_table.alias('patients') \
  .merge(
    df_converted_date.alias('updates'),
    'patients.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "active": "updates.active",
      "gender": "updates.gender",
      "resource_type": "updates.resource_type",
      "address": "updates.address",
      "name": "updates.name",
      "telecom": "updates.telecom",
      "birth_date": "updates.birth_date",
      "update_date": current_timestamp()
    }
  ) \
  .whenNotMatchedInsert(values =
     {
      "id": "updates.id",
      "active": "updates.active",
      "gender": "updates.gender",
      "resource_type": "updates.resource_type",
      "address": "updates.address",
      "name": "updates.name",
      "telecom": "updates.telecom",
      "birth_date": "updates.birth_date",
      "ingestion_date": current_timestamp()
    }
  ) \
  .execute()

In [0]:
%sql
select * from patients

active,gender,id,resource_type,address,name,telecom,birth_date,ingestion_date,update_date
True,male,example1,Patient,"List(List(PleasantVille, Rainbow, null, Vic, both, home, List(534 Erewhon St)))","List(List(official, Chalmers, List(Peter, James), null), List(usual, null, List(Jim), null), List(maiden, Windsor, List(Peter, James), List(null, 2002)))","List(List(home, null, null, null, null), List(work, (03) 5555 6473, phone, 1, null), List(mobile, (03) 3410 5613, phone, 2, null), List(old, (03) 5555 8834, phone, null, List(null, 2014)))",1974-12-25,2023-08-06T19:31:46.069+0000,2023-08-08T09:00:08.869+0000
True,male,example,Patient,"List(List(PleasantVille, Rainbow, null, Vic, both, home, List(534 Erewhon St)))","List(List(official, Chalmers, List(Peter, James), null), List(usual, null, List(Jim), null), List(maiden, Windsor, List(Peter, James), List(null, 2002)))","List(List(home, null, null, null, null), List(work, (03) 5555 6473, phone, 1, null), List(mobile, (03) 3410 5613, phone, 2, null), List(old, (03) 5555 8834, phone, null, List(null, 2014)))",1974-12-25,2023-08-06T19:09:41.830+0000,2023-08-06T19:56:02.809+0000
