### SAS Access Token Configuration

this initial code sets up the necessary configurations for accessing data in your Azure Data Lake container. By defining the **storage account name**, **container name**, and **SAS token**, and then setting the Spark configuration, you ensure that your Spark session can securely connect to the Data Lake. This setup is a preliminary step before diving into more substantial data exploration and analysis tasks.

In [1]:
# Base Parameters

storage_account_name    = "dcadlsgen2"
container_name          = "data"
sas_token               = "sp=rwl&st=2024-10-30T20:59:32Z&se=2024-11-06T05:59:32Z&skoid=76091599-8241-44cb-aa8e-18d7e8b9cf37&sktid=304616a7-7248-4620-8cee-5562f30ee9ff&skt=2024-10-30T20:59:32Z&ske=2024-11-06T05:59:32Z&sks=b&skv=2022-11-02&spr=https&sv=2022-11-02&sr=c&sig=LfEQAtCwCbYsxM8L%2F%2BrVJaMQRI4%2FLnAWEO%2FRn%2FhbjQE%3D"

# Set Spark configuration to use the SAS token

spark.conf.set(f"fs.azure.sas.{container_name}.{storage_account_name}.dfs.core.windows.net", sas_token)

# Define the base path for the Data Lake container

base_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"

### Creating Dataframes
in this section, we are loading patient, doctor, appointment, and medical record data from CSV files into Spark DataFrames, with headers and schema inferred from the data. It also introduces the concept of lazy evaluation in Spark, where transformations are not executed until an action is called.

In [2]:
# Create a Patients, Doctors, Appointments and MedicalRecords Dataframes

# Define file paths for each dataset

patients_file_path      =    f"{base_path}ehr/bronz/*/Patients.csv"
doctors_file_path       =    f"{base_path}ehr/bronz/*/Doctors.csv"
appointments_file_path  =    f"{base_path}ehr/bronz/*/Appointments.csv"
medrecords_file_path    =    f"{base_path}ehr/bronz/*/MedicalRecords.csv"

# Load the CSV files into Spark DataFrames with headers and inferred schema

df_patients             =   spark.read.csv(patients_file_path, header=True, inferSchema=True)
df_doctors              =   spark.read.csv(doctors_file_path, header=True, inferSchema=True)
df_appointments         =   spark.read.csv(appointments_file_path, header=True, inferSchema=True)
df_medrecords           =   spark.read.csv(medrecords_file_path, header=True, inferSchema=True)

### Lazy evaluation and action 
```count()``` is an example of an action command that triggers the execution of transformations. In this case, it is an aggregation function. This action command will count the number of rows in the DataFrame, causing Spark to execute the read operations. The preceding code lines are preparatory commands, setting up the necessary configurations and transformations before any action is taken.

I love to think of Lazy evaluation as a **“preparatory command”** to describe the setup steps before the action is executed.

In [3]:


# Count the number of records in each DataFrame and print the results

print(f"Number of records in Patients DataFrame: {df_patients.count():,.2f}")
print(f"Number of records in Doctors DataFrame: {df_doctors.count():,.2f}")
print(f"Number of records in Appointments DataFrame: {df_appointments.count():,.2f}")
print(f"Number of records in Medical Records DataFrame: {df_medrecords.count():,.2f}")


### Using the ```show()``` Function
The show() function is another action command in Spark. It displays the top rows of a DataFrame, providing a quick way to inspect the data. Using show() with a specified number of rows helps you verify that the data has been loaded correctly and gives you a snapshot of the DataFrame’s contents without overwhelming you with too much information.For example:

In [4]:
df_patients.show(5)

### ```Display()``` function
**The display function is a tool used to render a DataFrame** in a tabular format, showing rows and columns similar to a spreadsheet. **display** is **not** an official method of the DataFrame API. It is more of a function provided by specific interactive environments. To limit the number of rows displayed when using the display() function in a Spark notebook, you can use the limit() method before calling display()

In [5]:
# Limit the number of rows to 10 and display the Patients DataFrame

display(df_patients.limit(5))

The ```display(df_patients, summary=True)``` command in a Spark notebook is used to display a summary of the df_patients DataFrame. When you set summary=True, it provides a concise overview of the DataFrame, including statistics such as the count, mean, standard deviation, minimum, and maximum values for each column. This is particularly useful for quickly understanding the distribution and characteristics of your data without displaying all the rows.

In [6]:
display(df_patients,summary=True)

#### Select Columns

**select** method in the DataFrame API is a **transformation** function. It allows you to create a new DataFrame by selecting specific columns or expressions from an existing DataFrame.

In [7]:
df_patients_vw = df_patients.select("PatientID","Firstname","LastName","Gender")
df_patients_vw.show(5)

### Transform data with Apache Spark in Azure Synapse Analytics




In [8]:
df_patients_ga_vw = df_patients.filter(df_patients.Source=="GA")
display(df_patients_ga_vw,summary=True)

In [9]:
from pyspark.sql.functions import col, current_date, datediff, floor

# Add a new column 'Age' to the DataFrame
df_patients_w_age_vw = df_patients.withColumn(
    "Age",
    floor(datediff(current_date(), col("DateOfBirth")) / 365.25)
)

# Display the updated DataFrame with the new 'Age' column
display(df_patients_w_age_vw.limit(5))


In [10]:
# Create a table in the metastore for the patients with the calculated age
df_patients_w_age_vw.createOrReplaceTempView("t_patients")

#### Usig Spark SQL to query data using SQL

This code snippet demonstrates how to use Spark SQL to filter and display records of patients older than 60 years.

In [11]:
# Filtering Elderly Patients and Displaying Results

df_patients_eldery= spark.sql("SELECT PatientID, FirstName,LastName, Gender,Age, Source \
                      FROM t_patients \
                      WHERE Age>60")
display(df_patients_eldery,summary=True)

#### Write the data from the temporary view to a permanent table

In [13]:
# Write the data from the temporary view to a permanent table
spark.sql("""
    CREATE TABLE IF NOT EXISTS EhrLakeDB.DimPatients_stg
    AS SELECT * FROM t_patients
""")

In [14]:
# Drop the table
spark.sql("""DROP TABLE IF EXISTS lakedb.DimPatients_stg""")


This code snippet uses a SQL magic command to query a sample of patient records from the `t_patients` table.

In [15]:
%%sql
Select * from t_patients LIMIT 3

In [16]:
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import functions as F

# Convert AppointmentDate to date format
df_appointments = df_appointments.withColumn("AppointmentDate", F.to_date(F.col("AppointmentDate")))

# Truncate AppointmentDate to the first day of each month before aggregation
df_appointments = df_appointments.withColumn("Month", F.date_trunc("month", F.col("AppointmentDate")))

# Filter, group by Month, and count appointments for TX
df_appointment_TX = df_appointments.filter(df_appointments.Source == "TX") \
    .groupBy("Month") \
    .agg(F.count("AppointmentID").alias("TotalAppointments")) \
    .toPandas()

# Filter, group by Month, and count appointments for GA
df_appointment_GA = df_appointments.filter(df_appointments.Source == "GA") \
    .groupBy("Month") \
    .agg(F.count("AppointmentID").alias("TotalAppointments")) \
    .toPandas()

# Filter, group by Month, and count appointments for NY
df_appointment_NY = df_appointments.filter(df_appointments.Source == "NY") \
    .groupBy("Month") \
    .agg(F.count("AppointmentID").alias("TotalAppointments")) \
    .toPandas()

# Convert Month to datetime and ensure it's sorted
df_appointment_TX['Month'] = pd.to_datetime(df_appointment_TX['Month'])
df_appointment_GA['Month'] = pd.to_datetime(df_appointment_GA['Month'])
df_appointment_NY['Month'] = pd.to_datetime(df_appointment_NY['Month'])

# Sort the data
df_appointment_TX = df_appointment_TX.sort_values(by="Month", ascending=True)
df_appointment_GA = df_appointment_GA.sort_values(by="Month", ascending=True)
df_appointment_NY = df_appointment_NY.sort_values(by="Month", ascending=True)

# Define the date range for filtering
start_date = pd.to_datetime('2024-01-01')
end_date = pd.to_datetime('2024-10-30')

# Filter the DataFrames for the specified date range
df_appointment_TX = df_appointment_TX[(df_appointment_TX['Month'] >= start_date) & (df_appointment_TX['Month'] <= end_date)]
df_appointment_GA = df_appointment_GA[(df_appointment_GA['Month'] >= start_date) & (df_appointment_GA['Month'] <= end_date)]
df_appointment_NY = df_appointment_NY[(df_appointment_NY['Month'] >= start_date) & (df_appointment_NY['Month'] <= end_date)]

# Create a simple line plot with different colors for each state
plt.figure(figsize=(10, 5))

plt.plot(df_appointment_TX['Month'], df_appointment_TX['TotalAppointments'], marker='o', color='blue', label='TX')
plt.plot(df_appointment_GA['Month'], df_appointment_GA['TotalAppointments'], marker='o', color='green', label='GA')
plt.plot(df_appointment_NY['Month'], df_appointment_NY['TotalAppointments'], marker='o', color='red', label='NY')

plt.title('Appointments from January to October 2024')
plt.xlabel('Month')
plt.ylabel('# Appointments')
plt.xticks(rotation=45)
plt.legend()
plt.tight_layout()
plt.show()


In [17]:
df_appointments.show(10)

In [19]:
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import functions as F

# Convert AppointmentDate to date format
df_appointments = df_appointments.withColumn("AppointmentDate", F.to_date(F.col("AppointmentDate")))

# Truncate AppointmentDate to the first day of each month before aggregation
df_appointments = df_appointments.withColumn("Month", F.date_trunc("month", F.col("AppointmentDate")))

# Filter, group by Month, and count appointments for TX
df_appointment_TX = df_appointments.filter(df_appointments.Source == "TX") \
    .groupBy("Month") \
    .agg(F.count("AppointmentID").alias("TotalAppointments")) \
    .toPandas()

# Filter, group by Month, and count appointments for GA
df_appointment_GA = df_appointments.filter(df_appointments.Source == "GA") \
    .groupBy("Month") \
    .agg(F.count("AppointmentID").alias("TotalAppointments")) \
    .toPandas()

# Filter, group by Month, and count appointments for NY
df_appointment_NY = df_appointments.filter(df_appointments.Source == "NY") \
    .groupBy("Month") \
    .agg(F.count("AppointmentID").alias("TotalAppointments")) \
    .toPandas()

# Convert Month to datetime and ensure it's sorted
df_appointment_TX['Month'] = pd.to_datetime(df_appointment_TX['Month'])
df_appointment_GA['Month'] = pd.to_datetime(df_appointment_GA['Month'])
df_appointment_NY['Month'] = pd.to_datetime(df_appointment_NY['Month'])

# Sort the data
df_appointment_TX = df_appointment_TX.sort_values(by="Month", ascending=True)
df_appointment_GA = df_appointment_GA.sort_values(by="Month", ascending=True)
df_appointment_NY = df_appointment_NY.sort_values(by="Month", ascending=True)

# Define the date range for filtering
start_date = pd.to_datetime('2023-01-11')
end_date = pd.to_datetime('2024-10-30')

# Filter the DataFrames for the specified date range
df_appointment_TX = df_appointment_TX[(df_appointment_TX['Month'] >= start_date) & (df_appointment_TX['Month'] <= end_date)]
df_appointment_GA = df_appointment_GA[(df_appointment_GA['Month'] >= start_date) & (df_appointment_GA['Month'] <= end_date)]
df_appointment_NY = df_appointment_NY[(df_appointment_NY['Month'] >= start_date) & (df_appointment_NY['Month'] <= end_date)]

# Filter for appointments with Reason = "Headache Treatment"
df_health_screening = df_appointments.filter(df_appointments.Reason == "Headache Treatment") \
    .groupBy("Month") \
    .agg(F.count("AppointmentID").alias("AppointmentReasons")) \
    .toPandas()

# Convert Month to datetime and ensure it's sorted
df_health_screening['Month'] = pd.to_datetime(df_health_screening['Month'])
df_health_screening = df_health_screening.sort_values(by="Month", ascending=True)

# Create a combined plot with line plots and histogram
fig, ax1 = plt.subplots(figsize=(10, 5))

# Line plots for TX, GA, and NY
ax1.plot(df_appointment_TX['Month'], df_appointment_TX['TotalAppointments'], marker='o', color='blue', label='TX')
ax1.plot(df_appointment_GA['Month'], df_appointment_GA['TotalAppointments'], marker='o', color='green', label='GA')
ax1.plot(df_appointment_NY['Month'], df_appointment_NY['TotalAppointments'], marker='o', color='red', label='NY')

# Labels and title for the line plots
ax1.set_xlabel('Month')
ax1.set_ylabel('TotalAppointments')
ax1.set_title('Appointments and Health Screenings Over Time')
ax1.legend(loc='upper left')
ax1.tick_params(axis='x', rotation=45)

# Histogram for Health Screening appointments
ax2 = ax1.twinx()
ax2.bar(df_health_screening['Month'], df_health_screening['AppointmentReasons'], color='orange', alpha=0.3, width=15, label='Injury Treatment')

# Labels and title for the histogram
ax2.set_ylabel('Headache Treatment Appointments ')
ax2.legend(loc='upper right')
plt.tight_layout()

plt.show()


In [20]:
# Save transformed data with Source column added to the raw file

df_patients.write.mode("overwrite").parquet(f"{base_path}ehr/silver/refined/Patients")
df_doctors.write.mode("overwrite").parquet(f"{base_path}ehr/silver/refined/Doctors")

#### save Dataframe in a parquet format with Patitionning 
Let's update your code to partition the data by Year and Month using AppointmentDate for df_appointments and recordDate for df_medrecords.

In [21]:
from pyspark.sql import functions as F

# Add Year and Month columns for partitioning
df_appointments = df_appointments.withColumn("Year", F.year(F.col("AppointmentDate"))) \
                                 .withColumn("Month", F.month(F.col("AppointmentDate")))

df_medrecords = df_medrecords.withColumn("Year", F.year(F.col("recordDate"))) \
                             .withColumn("Month", F.month(F.col("recordDate")))

# Write the data to parquet files with partitioning by Year and Month
df_appointments.write.mode("overwrite").partitionBy("Year", "Month").parquet(f"{base_path}ehr/silver/refined/Appointments")
df_medrecords.write.mode("overwrite").partitionBy("Year", "Month").parquet(f"{base_path}ehr/silver/refined/MedicalRecords")


### Create Delta Lake tables
Delta lake is built on tables, which provide a relational storage abstraction over files in a data lake.

In [52]:
delta_path = f"{base_path}ehr/silver/delta/"

df_patients.write.mode("overwrite").format("delta").save(f"{delta_path}Patients")
df_doctors.write.mode("overwrite").format("delta").save(f"{delta_path}Doctors")

Let's update the code to partition the data by Year and Month using AppointmentDate for df_appointments and recordDate for df_medrecords and save it in the delta format

In [23]:
# Write the data to delta Format with partitioning by Year and Month

df_appointments.write.mode("overwrite").partitionBy("Year", "Month").format("delta").save(f"{delta_path}Appointments")
df_medrecords.write.mode("overwrite").partitionBy("Year", "Month").format("delta").save(f"{delta_path}MedicalRecords")

In [53]:
# Import necessary libraries:

from delta.tables import *
from pyspark.sql.functions import col, lit

# Load the Patients Delta table

delta_table = DeltaTable.forPath(spark, f"{delta_path}Patients")

# Perform the update

delta_table.update(
    condition = (col("PatientID") == 4)&(col("Source") == "TX"),
    set = {
        "FirstName": lit("Michel"),
        "LastName": lit("Agah"),
        "PhoneNumber": lit("1234567891")
    }
)


In [54]:
from delta.tables import *
from pyspark.sql.functions import col, lit


# Load the Delta table
delta_table = DeltaTable.forPath(spark, f"{delta_path}Patients")

# Update the record where DateOfbirth is 1954-02-28 and LastName is "Travis"
delta_table.update(
    condition = (col("DateOfbirth") == "1954-02-28") & (col("LastName") == "Travis")&(col("Source") == "TX"),
    set = {
        "PhoneNumber": lit("1234567892")
    }
)


See the data before the first changes on the frist and last name and Phone Number

In [55]:
df_before_first_change = spark.read.format("delta").option("versionAsOf", 1).load(f"{delta_path}Patients")
df_before_first_change.filter(df_before_first_change.PatientID==10).show()

In [57]:
df_before_first_change = spark.read.format("delta").option("timestampAsOf", "2024-11-02 03:18:37.278").load(f"{delta_path}Patients")
df_before_first_change.filter(df_before_first_change.PatientID==10).show()


See the data before the second changes

In [58]:
df_before_first_change = spark.read.format("delta").option("versionAsOf", 2).load(f"{delta_path}Patients")
df_before_first_change.filter(df_before_first_change.PatientID==10).show()

In [62]:
df_before_first_change = spark.read.format("delta").option("timestampAsOf", "2024-11-02 03:19:37.278").load(f"{delta_path}Patients")
df_before_first_change.filter((df_before_first_change.LastName=="Agah")&(df_before_first_change.Source=="TX")).show()


how to create new record in your Delta table


In [63]:
delta_table = DeltaTable.forPath(spark, f"{delta_path}Patients")
delta_table.toDF().printSchema()

In [66]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, to_date
from pyspark.sql.types import IntegerType

# Create a DataFrame with the new patient data

columns = ["PatientID","FirstName", "LastName", "DateOfBirth","Gender","PhoneNumber","Email","Source"]
new_patient_data = [(99001,"Christelle", "Williams","1985-11-29","F","240.321.6352","christelle.williams@example.com","ON")]

new_patient_df = spark.createDataFrame(new_patient_data, columns)
new_patient_df = new_patient_df.withColumn("PatientID", new_patient_df["PatientID"].cast(IntegerType()))
new_patient_df = new_patient_df.withColumn("DateOfBirth", to_date(new_patient_df["DateOfBirth"], "yyyy-MM-dd"))

# Append the new data to the Delta table
new_patient_df.write.format("delta").mode("append").save(f"{delta_path}Patients")


In [67]:
# Query the Delta table

df = spark.read.format("delta").load(f"{delta_path}Patients")
df.filter(df.PatientID==99001).show()


#### Create an External catalog tables
Data is stored at the specified location. Only metadata is deleted; data files remain at delta_catalog_path

In [68]:
delta_catalog_path = f"{base_path}ehr/silver/catalog/"

df.write.format("delta").option("path",f"{delta_catalog_path}Patients").saveAsTable("ECT_DimPatients")


#### Create a Managed catalog tables
Data is stored in Spark’s storage. Both metadata and data files are deleted.

you can use Spark.SQL or SQL Directly by leveraging the magic sign %%sql.

In [69]:
spark.sql("CREATE TABLE MCT_DimPatients (id INT, FirstName STRING,LastName STRING)")

In [70]:
%%sql

CREATE TABLE MCT_FactAppointments
(
    AppointmentID INT NOT NULL,
    PatientID INT NOT NULL,
    DoctorID INT NOT NULL,
    AppointmentDate TIMESTAMP NOT NULL,
    Reason STRING,
    Source STRING
)
USING DELTA

Dropping the table

In [71]:
spark.sql("DROP TABLE MN_DimPatients")
spark.sql("DROP TABLE MN_FactAppointments")



In [61]:
from pyspark.sql import SparkSession
import time


# Create a simple DataFrame
df = spark.range(1)

# Write a loop to keep the cluster alive for 30 minutes
for _ in range(30):
    df.show()  # Perform a simple action to keep the cluster active
    time.sleep(60)  # Sleep for 1 minute

# Stop the Spark session

