In [0]:
dbutils.fs.ls('/FileStore/tables')

Out[3]: [FileInfo(path='dbfs:/FileStore/tables/BigMart_Sales.csv', name='BigMart_Sales.csv', size=869537, modificationTime=1746875253000),
 FileInfo(path='dbfs:/FileStore/tables/Online_Retail__2_.csv', name='Online_Retail__2_.csv', size=46128694, modificationTime=1747572572000),
 FileInfo(path='dbfs:/FileStore/tables/Online_Retail__2_.xlsx', name='Online_Retail__2_.xlsx', size=23715344, modificationTime=1747571418000),
 FileInfo(path='dbfs:/FileStore/tables/drivers.json', name='drivers.json', size=180812, modificationTime=1746883266000),
 FileInfo(path='dbfs:/FileStore/tables/health_data.csv', name='health_data.csv', size=4226, modificationTime=1747643556000)]

### Step-1: Load the dataset

In [0]:
df = spark.read.option('header', True).csv('/FileStore/tables/health_data.csv')
df.display()

PatientID,Name,Age,Gender,VisitDate,BloodPressure,GlucoseLevel,BMI
1,Danielle,58,M,2024-05-27,111/93,105,22.5
2,Judith,26,M,2024-08-21,153/93,139,19.9
3,Jeffrey,45,M,2025-02-28,111/72,97,22.3
4,Curtis,56,M,2025-03-01,145/76,153,30.1
5,Patricia,44,M,2024-06-20,138/88,105,31.9
6,Brittany,18,M,2024-05-30,154/83,113,23.1
7,Anthony,31,F,2024-08-10,116/72,118,20.1
8,Jesse,72,F,2025-01-06,148/78,75,30.5
9,Anthony,52,M,2024-08-03,134/72,140,23.3
10,Jennifer,58,F,2025-02-14,146/76,160,19.6


### Step-2: Split 'BloodPressure' column into 'SystolicBP' and 'DiastolicBP'

In [0]:
# The format is 120/80, we split on '/' and convert to integers

from pyspark.sql.functions import split, col

df = df.withColumn('SystolicBP', split(col('BloodPressure'), '/').getItem(0).cast('int'))
df = df.withColumn('DiastolicBP', split(col('BloodPressure'), '/').getItem(1).cast('int'))

# Remove original 'BloodPressure' column as it's now split
df = df.drop('BloodPressure')
df.display()

PatientID,Name,Age,Gender,VisitDate,GlucoseLevel,BMI,SystolicBP,DiastolicBP
1,Danielle,58,M,2024-05-27,105,22.5,111,93
2,Judith,26,M,2024-08-21,139,19.9,153,93
3,Jeffrey,45,M,2025-02-28,97,22.3,111,72
4,Curtis,56,M,2025-03-01,153,30.1,145,76
5,Patricia,44,M,2024-06-20,105,31.9,138,88
6,Brittany,18,M,2024-05-30,113,23.1,154,83
7,Anthony,31,F,2024-08-10,118,20.1,116,72
8,Jesse,72,F,2025-01-06,75,30.5,148,78
9,Anthony,52,M,2024-08-03,140,23.3,134,72
10,Jennifer,58,F,2025-02-14,160,19.6,146,76


### Step-3: Convert VisitDate from string to Datatype

In [0]:
from pyspark.sql.functions import to_date

df = df.withColumn('VisitDate', to_date(col('Visitdate'), 'yyyy-MM-dd'))
df.display()

PatientID,Name,Age,Gender,VisitDate,GlucoseLevel,BMI,SystolicBP,DiastolicBP
1,Danielle,58,M,2024-05-27,105,22.5,111,93
2,Judith,26,M,2024-08-21,139,19.9,153,93
3,Jeffrey,45,M,2025-02-28,97,22.3,111,72
4,Curtis,56,M,2025-03-01,153,30.1,145,76
5,Patricia,44,M,2024-06-20,105,31.9,138,88
6,Brittany,18,M,2024-05-30,113,23.1,154,83
7,Anthony,31,F,2024-08-10,118,20.1,116,72
8,Jesse,72,F,2025-01-06,75,30.5,148,78
9,Anthony,52,M,2024-08-03,140,23.3,134,72
10,Jennifer,58,F,2025-02-14,160,19.6,146,76


### Step-4: Create the Date dimension table

In [0]:
# Extract useful components like year, month, day weekday and generate a dateID

from pyspark.sql.functions import date_format, year, month, dayofmonth, dayofweek
date_dim = df.select('VisitDate').distinct()

# Creating surrogate key 'DateID' in format YYYYMMDD

date_dim = date_dim.withColumn('DateID', date_format('VisitDate', 'yyyyMMdd').cast('int'))\
  .withColumn('Year', year('VisitDate'))\
    .withColumn('Month', month('VisitDate'))\
      .withColumn('Day', dayofmonth('VisitDate'))\
        .withColumn('Weekday', dayofweek('VisitDate'))

### step-5: Create the patient dimension table 

In [0]:
# This table contains unique pateint info

patient_dim = df.select('PatientID', 'Name', 'Age', 'Gender').distinct()
patient_dim.display()

PatientID,Name,Age,Gender
12,Shane,73,M
67,Michelle,34,M
41,Veronica,30,M
52,Dawn,21,M
84,Tracy,64,F
10,Jennifer,58,F
60,Katelyn,24,M
58,Wendy,43,M
25,Maria,25,F
2,Judith,26,M


### Step-6: Create the fact table  

In [0]:
# Join back date dimension to get the DateID(foreign key)
fact = df.join(date_dim, on='VisitDate', how='left')

# select required metrics and keys for the fact table
fact_health = fact.select('PatientID', 'DateID', 'SystolicBP', 'DiastolicBP', 'GlucoseLevel', 'BMI')
fact_health.display()

PatientID,DateID,SystolicBP,DiastolicBP,GlucoseLevel,BMI
1,20240527,111,93,105,22.5
2,20240821,153,93,139,19.9
3,20250228,111,72,97,22.3
4,20250301,145,76,153,30.1
5,20240620,138,88,105,31.9
6,20240530,154,83,113,23.1
7,20240810,116,72,118,20.1
8,20250106,148,78,75,30.5
9,20240803,134,72,140,23.3
10,20250214,146,76,160,19.6


### Step-7: Saved all dimension and fact tales as delta format

In [0]:
# these are saved for downstream quering and reporting 

# save patient dimension table
patient_dim.write.format('delta').mode('overwrite').save('/delta'patient_dim)

# save date dimension table 
date_dim.write.format('delta.mode')