In [0]:
import os
import numpy as np
from pyspark.sql.functions import col, lower, to_date, to_timestamp, year, when, current_date
from scipy.stats import zscore

In [0]:
dbutils.fs.ls("/mnt/staging/staging_layer")

[FileInfo(path='dbfs:/mnt/staging/staging_layer/clinic.csv', name='clinic.csv', size=999, modificationTime=1732262809000),
 FileInfo(path='dbfs:/mnt/staging/staging_layer/patient.csv', name='patient.csv', size=5993, modificationTime=1732262782000),
 FileInfo(path='dbfs:/mnt/staging/staging_layer/physician.csv', name='physician.csv', size=976, modificationTime=1732262797000),
 FileInfo(path='dbfs:/mnt/staging/staging_layer/session.csv', name='session.csv', size=2392, modificationTime=1732262847000)]

In [0]:
dbutils.fs.ls("/mnt/cleaning/cleaned_data")

[FileInfo(path='dbfs:/mnt/cleaning/cleaned_data/clinic.parquet/', name='clinic.parquet/', size=0, modificationTime=1732091443000),
 FileInfo(path='dbfs:/mnt/cleaning/cleaned_data/patient.parquet/', name='patient.parquet/', size=0, modificationTime=1732091442000),
 FileInfo(path='dbfs:/mnt/cleaning/cleaned_data/physician.parquet/', name='physician.parquet/', size=0, modificationTime=1732091442000),
 FileInfo(path='dbfs:/mnt/cleaning/cleaned_data/session.parquet/', name='session.parquet/', size=0, modificationTime=1732091444000)]

In [0]:
input_path = "/mnt/staging/staging_layer/"

csv_files = [file_info.name for file_info in dbutils.fs.ls(input_path) if file_info.name.endswith(".csv")]

data_frames = {}

for csv_file in csv_files:
  df_name = os.path.splitext(csv_file)[0]
  df = spark.read.csv(os.path.join(input_path, csv_file), header="true", inferSchema="true")
  data_frames[df_name] = df

In [0]:
patient_df = data_frames["patient"]
physician_df = data_frames["physician"]
clinic_df = data_frames["clinic"]
session_df = data_frames["session"]

# Data Transformation

In [0]:
#Check the column header format
display(patient_df.show(2))
display(physician_df.show(2))
display(clinic_df.show(2))
display(session_df.show(2))

+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
| Id|          First_Name|           Last_Name| age|Year_Of_Birth|             Address|        Phone_Number|Gender|
+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
|  1|a8cfcd74832004951...|fd53ef835b1548557...|  30|   1993-01-01|15550b5880db605d3...|25e77e5a54d1dfcd6...|  Male|
|  2|4f23798d92708359b...|9f542590100424c92...|NULL|   1998-01-01|31924a83ca67c160b...|7f0314e4b8beaa73c...|Female|
+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
only showing top 2 rows

+---+----------+---------+----------+
| Id|First_Name|Last_Name|NPI_Number|
+---+----------+---------+----------+
|  1| Dr. Alice|    Brown|1234567890|
|  2| Dr. James|    White|9876543217|
+---+----------+---------+----------+
only showing top 2 rows

+---+----------------+----

### Column Standardization

In [0]:
def column_standardize(df):
  column_names = df.columns
  for column_name in column_names:
    df = df.withColumnRenamed(column_name, column_name.lower())
  return df

In [0]:
patient_df = column_standardize(patient_df)
physician_df = column_standardize(physician_df)
clinic_df = column_standardize(clinic_df)
session_df = column_standardize(session_df)

In [0]:
display(patient_df.show(2))
display(physician_df.show(2))
display(clinic_df.show(2))
display(session_df.show(2))

+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
| id|          first_name|           last_name| age|year_of_birth|             address|        phone_number|gender|
+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
|  1|a8cfcd74832004951...|fd53ef835b1548557...|  30|   1993-01-01|15550b5880db605d3...|25e77e5a54d1dfcd6...|  Male|
|  2|4f23798d92708359b...|9f542590100424c92...|NULL|   1998-01-01|31924a83ca67c160b...|7f0314e4b8beaa73c...|Female|
+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
only showing top 2 rows

+---+----------+---------+----------+
| id|first_name|last_name|npi_number|
+---+----------+---------+----------+
|  1| Dr. Alice|    Brown|1234567890|
|  2| Dr. James|    White|9876543217|
+---+----------+---------+----------+
only showing top 2 rows

+---+----------------+----

### Datatype check & Conversion

In [0]:
patient_df.show(2)

+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
| id|          first_name|           last_name| age|year_of_birth|             address|        phone_number|gender|
+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
|  1|a8cfcd74832004951...|fd53ef835b1548557...|  30|   1993-01-01|15550b5880db605d3...|25e77e5a54d1dfcd6...|  Male|
|  2|4f23798d92708359b...|9f542590100424c92...|NULL|   1998-01-01|31924a83ca67c160b...|7f0314e4b8beaa73c...|Female|
+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
only showing top 2 rows



In [0]:
patient_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- year_of_birth: date (nullable = true)
 |-- address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- gender: string (nullable = true)



In [0]:
physician_df.show(2)

+---+----------+---------+----------+
| id|first_name|last_name|npi_number|
+---+----------+---------+----------+
|  1| Dr. Alice|    Brown|1234567890|
|  2| Dr. James|    White|9876543217|
+---+----------+---------+----------+
only showing top 2 rows



In [0]:
physician_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- npi_number: long (nullable = true)



In [0]:
clinic_df.show(2)

+---+----------------+--------------+
| id|     clinic_name|       address|
+---+----------------+--------------+
|  1| Downtown Health|123 Health Ave|
|  2|Riverside Clinic|  456 River Rd|
+---+----------------+--------------+
only showing top 2 rows



In [0]:
clinic_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- clinic_name: string (nullable = true)
 |-- address: string (nullable = true)



In [0]:
session_df.show(2)

+---+----------+------------+---------+------------+------------------+----------------+---------+
| id|patient_id|physician_id|clinic_id|scheduled_on|session_started_at|session_ended_at|   status|
+---+----------+------------+---------+------------+------------------+----------------+---------+
|  1|         1|           1|        1|  05-10-2023|  05-10-2023 09:00|05-10-2023 09:30|Confirmed|
|  2|         2|           2|        2|  05-10-2023|              NULL|            NULL|Cancelled|
+---+----------+------------+---------+------------+------------------+----------------+---------+
only showing top 2 rows



In [0]:
session_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- patient_id: integer (nullable = true)
 |-- physician_id: integer (nullable = true)
 |-- clinic_id: integer (nullable = true)
 |-- scheduled_on: string (nullable = true)
 |-- session_started_at: string (nullable = true)
 |-- session_ended_at: string (nullable = true)
 |-- status: string (nullable = true)



In [0]:
session_df = session_df.withColumn("scheduled_on", to_date(session_df["scheduled_on"], "dd-MM-yyyy")).\
  withColumn("session_started_at", to_timestamp(session_df["session_started_at"], "dd-MM-yyyy HH:mm")).\
  withColumn("session_ended_at", to_timestamp(session_df["session_ended_at"], "dd-MM-yyyy HH:mm"))

In [0]:
session_df.show(2)

+---+----------+------------+---------+------------+-------------------+-------------------+---------+
| id|patient_id|physician_id|clinic_id|scheduled_on| session_started_at|   session_ended_at|   status|
+---+----------+------------+---------+------------+-------------------+-------------------+---------+
|  1|         1|           1|        1|  2023-10-05|2023-10-05 09:00:00|2023-10-05 09:30:00|Confirmed|
|  2|         2|           2|        2|  2023-10-05|               NULL|               NULL|Cancelled|
+---+----------+------------+---------+------------+-------------------+-------------------+---------+
only showing top 2 rows



In [0]:
session_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- patient_id: integer (nullable = true)
 |-- physician_id: integer (nullable = true)
 |-- clinic_id: integer (nullable = true)
 |-- scheduled_on: date (nullable = true)
 |-- session_started_at: timestamp (nullable = true)
 |-- session_ended_at: timestamp (nullable = true)
 |-- status: string (nullable = true)



### Handling Missing Values

In [0]:
patient_df.show(3)

+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
| id|          first_name|           last_name| age|year_of_birth|             address|        phone_number|gender|
+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
|  1|a8cfcd74832004951...|fd53ef835b1548557...|  30|   1993-01-01|15550b5880db605d3...|25e77e5a54d1dfcd6...|  Male|
|  2|4f23798d92708359b...|9f542590100424c92...|NULL|   1998-01-01|31924a83ca67c160b...|7f0314e4b8beaa73c...|Female|
|  3|3bc51062973c458d5...|3013b18f4387bbe12...|  40|   1983-01-01|fd0a8c28f44d206e4...|cc9911cda34343c33...|Female|
+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
only showing top 3 rows



In [0]:
patient_df = patient_df.withColumn("year_of_birth", year(col("year_of_birth")))


In [0]:
patient_df.show(10)

+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
| id|          first_name|           last_name| age|year_of_birth|             address|        phone_number|gender|
+---+--------------------+--------------------+----+-------------+--------------------+--------------------+------+
|  1|a8cfcd74832004951...|fd53ef835b1548557...|  30|         1993|15550b5880db605d3...|25e77e5a54d1dfcd6...|  Male|
|  2|4f23798d92708359b...|9f542590100424c92...|NULL|         1998|31924a83ca67c160b...|7f0314e4b8beaa73c...|Female|
|  3|3bc51062973c458d5...|3013b18f4387bbe12...|  40|         1983|fd0a8c28f44d206e4...|cc9911cda34343c33...|Female|
|  4|d7cda0ca2c8586e51...|c2793b826cc96e7fe...|  22|         2001|0f8df68718227e4fa...|10f7448bdfabade89...|  Male|
|  5|048a2730d09826f3e...|6e8d8801f51aafe84...|  35|         1988|9b82829bd57dba3c6...|15590de3098bbbf08...|Female|
|  6|f089eaef57aba315b...|18f285592896ec8ea...|NULL|         1992|a5de8f

In [0]:
patient_df = patient_df.withColumn("age", when(col("age").isNull(), year(current_date()) - col("year_of_birth") -1).otherwise(col("age")))
patient_df.show()

+---+--------------------+--------------------+---+-------------+--------------------+--------------------+------+
| id|          first_name|           last_name|age|year_of_birth|             address|        phone_number|gender|
+---+--------------------+--------------------+---+-------------+--------------------+--------------------+------+
|  1|a8cfcd74832004951...|fd53ef835b1548557...| 30|         1993|15550b5880db605d3...|25e77e5a54d1dfcd6...|  Male|
|  2|4f23798d92708359b...|9f542590100424c92...| 25|         1998|31924a83ca67c160b...|7f0314e4b8beaa73c...|Female|
|  3|3bc51062973c458d5...|3013b18f4387bbe12...| 40|         1983|fd0a8c28f44d206e4...|cc9911cda34343c33...|Female|
|  4|d7cda0ca2c8586e51...|c2793b826cc96e7fe...| 22|         2001|0f8df68718227e4fa...|10f7448bdfabade89...|  Male|
|  5|048a2730d09826f3e...|6e8d8801f51aafe84...| 35|         1988|9b82829bd57dba3c6...|15590de3098bbbf08...|Female|
|  6|f089eaef57aba315b...|18f285592896ec8ea...| 31|         1992|a5de8f4b1ea4c94

In [0]:
physician_df.show()

+---+------------+---------+----------+
| id|  first_name|last_name|npi_number|
+---+------------+---------+----------+
|  1|   Dr. Alice|    Brown|1234567890|
|  2|   Dr. James|    White|9876543217|
|  3| Dr. Charlie|    Green|1122334455|
|  4|   Dr. Emily| Thompson|2233445566|
|  5|   Dr. Emily|  Johnson|3344556677|
|  6|   Dr. David|    Smith|4455667788|
|  7|   Dr. Megan|      Lee|5566778899|
|  8| Dr. Michael|   Wilson|6677889900|
|  9|    Dr. Mark|Hernandez|7788990011|
| 10|   Dr. Sarah|   Taylor|8899001122|
| 11|   Dr. James|   Carter|9900112233|
| 12| Dr. Jessica| Anderson|1011121314|
| 13|   Dr. Laura|   Thomas|1213141516|
| 14|    Dr. John|   Miller|1314151617|
| 15|Dr. Patricia|   Wilson|1415161718|
| 16|   Dr. David|   Garcia|1516171819|
| 17|   Dr. Kevin|    Brown|1617181920|
| 18|  Dr. Angela| Martinez|1718192021|
| 19|  Dr. Ashley|    Davis|1819202122|
| 20|   Dr. Brian|    Clark|1920212223|
+---+------------+---------+----------+
only showing top 20 rows



In [0]:
clinic_df.show()

+---+--------------------+-----------------+
| id|         clinic_name|          address|
+---+--------------------+-----------------+
|  1|     Downtown Health|   123 Health Ave|
|  2|    Riverside Clinic|     456 River Rd|
|  3|     Wellness Center|789 Wellness Blvd|
|  4|  City Medical Group|      101 Main St|
|  5|Lakeside Family C...|  202 Lakeview Dr|
|  6| Green Valley Clinic|303 Greenway Blvd|
|  7|  Health Plus Clinic|    404 Health St|
|  8|Eastside Medical ...|      505 East St|
|  9|Northpoint Health...|    606 North Ave|
| 10|    Sunnydale Clinic|     707 Sunny St|
| 11|Family Health Center|   808 Family Ave|
| 12| Central City Clinic| 909 Central Blvd|
| 13| New Horizons Clinic|  1010 Horizon Dr|
| 14|Clearview Medical...|1111 Clearview St|
| 15|Healthy Living Cl...|1212 Wellness Way|
| 16|    Starlight Health|     1313 Star Rd|
| 17|   Silver Oak Clinic|   1414 Silver St|
| 18|Pine Grove Medica...|     1515 Pine Rd|
| 19|   Summit Healthcare|  1616 Summit Ave|
| 20|Harmo

In [0]:
session_df.show()

+---+----------+------------+---------+------------+-------------------+-------------------+---------+
| id|patient_id|physician_id|clinic_id|scheduled_on| session_started_at|   session_ended_at|   status|
+---+----------+------------+---------+------------+-------------------+-------------------+---------+
|  1|         1|           1|        1|  2023-10-05|2023-10-05 09:00:00|2023-10-05 09:30:00|Confirmed|
|  2|         2|           2|        2|  2023-10-05|               NULL|               NULL|Cancelled|
|  3|         3|           1|        1|  2023-10-05|2023-10-05 11:00:00|2023-10-05 11:30:00|Confirmed|
|  4|         4|           3|        3|  2023-10-05|2023-10-05 12:00:00|2023-10-05 12:30:00|Confirmed|
|  5|         5|           2|        2|  2023-10-05|2023-10-05 13:00:00|2023-10-05 13:45:00|Confirmed|
|  6|         6|           2|        1|  2023-10-05|               NULL|               NULL|Cancelled|
|  7|         7|           1|        3|  2023-10-05|2023-10-05 15:00:00|2

### Outlier Detection

In [0]:
patient_df.show()

+---+--------------------+--------------------+---+-------------+--------------------+--------------------+------+
| id|          first_name|           last_name|age|year_of_birth|             address|        phone_number|gender|
+---+--------------------+--------------------+---+-------------+--------------------+--------------------+------+
|  1|a8cfcd74832004951...|fd53ef835b1548557...| 30|         1993|15550b5880db605d3...|25e77e5a54d1dfcd6...|  Male|
|  2|4f23798d92708359b...|9f542590100424c92...| 25|         1998|31924a83ca67c160b...|7f0314e4b8beaa73c...|Female|
|  3|3bc51062973c458d5...|3013b18f4387bbe12...| 40|         1983|fd0a8c28f44d206e4...|cc9911cda34343c33...|Female|
|  4|d7cda0ca2c8586e51...|c2793b826cc96e7fe...| 22|         2001|0f8df68718227e4fa...|10f7448bdfabade89...|  Male|
|  5|048a2730d09826f3e...|6e8d8801f51aafe84...| 35|         1988|9b82829bd57dba3c6...|15590de3098bbbf08...|Female|
|  6|f089eaef57aba315b...|18f285592896ec8ea...| 31|         1992|a5de8f4b1ea4c94

In [0]:
z_score = zscore(patient_df.select("age").toPandas())
outliers = np.where(abs(z_score) > 3)
outliers

(array([], dtype=int64), array([], dtype=int64))

### Duplication check & Removal

In [0]:
patient_df.show(2)

+---+--------------------+--------------------+---+-------------+--------------------+--------------------+------+
| id|          first_name|           last_name|age|year_of_birth|             address|        phone_number|gender|
+---+--------------------+--------------------+---+-------------+--------------------+--------------------+------+
|  1|a8cfcd74832004951...|fd53ef835b1548557...| 30|         1993|15550b5880db605d3...|25e77e5a54d1dfcd6...|  Male|
|  2|4f23798d92708359b...|9f542590100424c92...| 25|         1998|31924a83ca67c160b...|7f0314e4b8beaa73c...|Female|
+---+--------------------+--------------------+---+-------------+--------------------+--------------------+------+
only showing top 2 rows



In [0]:
display(patient_df.select("first_name", "last_name", "age", "year_of_birth", "address", "phone_number", "gender").count())
display(patient_df.select("first_name", "last_name", "age", "year_of_birth", "address", "phone_number", "gender").dropDuplicates().count())

21

20

In [0]:
patient_df = patient_df.dropDuplicates(["first_name", "last_name", "age", "year_of_birth", "address", "phone_number", "gender"])

In [0]:
physician_df.show(2)

+---+----------+---------+----------+
| id|first_name|last_name|npi_number|
+---+----------+---------+----------+
|  1| Dr. Alice|    Brown|1234567890|
|  2| Dr. James|    White|9876543217|
+---+----------+---------+----------+
only showing top 2 rows



In [0]:
display(physician_df.select("first_name", "last_name", "npi_number").count())
display(physician_df.select("first_name", "last_name", "npi_number").dropDuplicates().count())

30

30

In [0]:
clinic_df.show(2)

+---+----------------+--------------+
| id|     clinic_name|       address|
+---+----------------+--------------+
|  1| Downtown Health|123 Health Ave|
|  2|Riverside Clinic|  456 River Rd|
+---+----------------+--------------+
only showing top 2 rows



In [0]:
display(clinic_df.select("clinic_name", "address").count())
display(clinic_df.select("clinic_name", "address").dropDuplicates().count())

25

25

In [0]:
session_df.show(2)

+---+----------+------------+---------+------------+-------------------+-------------------+---------+
| id|patient_id|physician_id|clinic_id|scheduled_on| session_started_at|   session_ended_at|   status|
+---+----------+------------+---------+------------+-------------------+-------------------+---------+
|  1|         1|           1|        1|  2023-10-05|2023-10-05 09:00:00|2023-10-05 09:30:00|Confirmed|
|  2|         2|           2|        2|  2023-10-05|               NULL|               NULL|Cancelled|
+---+----------+------------+---------+------------+-------------------+-------------------+---------+
only showing top 2 rows



In [0]:
display(session_df.select("patient_id", "physician_id", "clinic_id", "scheduled_on", "session_started_at", "session_ended_at", "status").count())
display(session_df.select("patient_id", "physician_id", "clinic_id", "scheduled_on", "session_started_at", "session_ended_at", "status").dropDuplicates().count())

40

40

### Calculating session time

In [0]:
session_df.show(2)

+---+----------+------------+---------+------------+-------------------+-------------------+---------+
| id|patient_id|physician_id|clinic_id|scheduled_on| session_started_at|   session_ended_at|   status|
+---+----------+------------+---------+------------+-------------------+-------------------+---------+
|  1|         1|           1|        1|  2023-10-05|2023-10-05 09:00:00|2023-10-05 09:30:00|Confirmed|
|  2|         2|           2|        2|  2023-10-05|               NULL|               NULL|Cancelled|
+---+----------+------------+---------+------------+-------------------+-------------------+---------+
only showing top 2 rows



In [0]:
session_df = session_df.withColumn('session_duration_mins', ((col('session_ended_at').cast('long') - col('session_started_at').cast('long'))/60).cast('int'))
session_df = session_df.select("id", "patient_id", "physician_id", "clinic_id", "scheduled_on", "status", "session_duration_mins")
session_df.show(5)

+---+----------+------------+---------+------------+---------+---------------------+
| id|patient_id|physician_id|clinic_id|scheduled_on|   status|session_duration_mins|
+---+----------+------------+---------+------------+---------+---------------------+
|  1|         1|           1|        1|  2023-10-05|Confirmed|                   30|
|  2|         2|           2|        2|  2023-10-05|Cancelled|                 NULL|
|  3|         3|           1|        1|  2023-10-05|Confirmed|                   30|
|  4|         4|           3|        3|  2023-10-05|Confirmed|                   30|
|  5|         5|           2|        2|  2023-10-05|Confirmed|                   45|
+---+----------+------------+---------+------------+---------+---------------------+
only showing top 5 rows



### Save the cleaned files

In [0]:
dataframes = {
  "patient": patient_df,
  "physician": physician_df,
  "clinic": clinic_df,
  "session": session_df
}

for name, df in dataframes.items():
  df.write.format("parquet").mode("overwrite").save(f"/mnt/cleaning/cleaned_data/{name}.parquet")

---