In [0]:
# Securely configure Spark to access Azure Data Lake Storage Gen2 (ADLS Gen2)
# using OAuth 2.0 with Azure Active Directory via service principal credentials.
# Secrets like the client secret are retrieved securely from Databricks secret scope.

# Set the client secret using Databricks secrets for secure storage
spark.conf.set("fs.azure.account.oauth2.client.secret.covid2025storage.dfs.core.windows.net",
               dbutils.secrets.get(scope="myscope", key="sp-secret"))

# Set the authentication type to OAuth for accessing the storage account
spark.conf.set("fs.azure.account.auth.type.covid2025storage.dfs.core.windows.net", "OAuth")

# Specify the provider class that supports OAuth 2.0 client credentials flow
spark.conf.set("fs.azure.account.oauth.provider.type.covid2025storage.dfs.core.windows.net", 
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")

# Set the client ID (application ID) of your Azure AD registered app
spark.conf.set("fs.azure.account.oauth2.client.id.covid2025storage.dfs.core.windows.net", 
               "af9d0047-d4cd-4d32-81a0-3e2ebeff1343")
               
# Define the token endpoint for your Azure tenant to obtain access tokens
spark.conf.set("fs.azure.account.oauth2.client.endpoint.covid2025storage.dfs.core.windows.net", 
               "https://login.microsoftonline.com/fdae273f-8a49-469c-b9bc-444ce8f28607/oauth2/token")

In [0]:
# Import necessary libraries to read and manipulate the notebook
df_silver = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://silver@covid2025storage.dfs.core.windows.net/countrywise.csv")


In [0]:
df_silver.show()

In [0]:
from pyspark.sql.functions import col, round

df_gold = df_silver.withColumn("mortality_rate", round((col("Deaths") / col("Confirmed")) * 100, 2)) \
.withColumn("recovery_rate", round((col("Recovered") / col("Confirmed")) * 100, 2)) \
.withColumn("weekly_growth_rate", round((col("one_week_change") / col("confirmed_last_week")) * 100, 2)) \
.withColumn("active_ratio", round((col("Active") / col("Confirmed")), 2))


df_gold.show()

In [0]:
from pyspark.sql.functions import avg, sum

df_gold_region = df_gold.groupBy("who_region").agg(
    sum("Confirmed").alias("total_confirmed"),
    sum("Deaths").alias("total_deaths"),
    sum("Recovered").alias("total_recovered"),
    sum("Active").alias("total_active"),
    round(avg("mortality_rate"),2).alias("avg_mortality_rate"),
    round(avg("recovery_rate"),2).alias("avg_recovery_rate"),
    round(avg("weekly_growth_rate"),2).alias("avg_weekly_growth_rate")
)

df_gold_region.show()

In [0]:
gold_dfs = [
    (df_gold, "countrywise"),
    (df_gold_region, "regionwise")
]

for df, name in gold_dfs:
    temp_path = f"abfss://gold@covid2025storage.dfs.core.windows.net/temp_{name}/"
    
    df.repartition(1).write.mode("overwrite").option("header", "true").csv(temp_path)

    files = dbutils.fs.ls(temp_path)
    for file in files:
        if file.name.endswith(".csv"):
            part_file_path = file.path
            break

    final_output_path = f"abfss://gold@covid2025storage.dfs.core.windows.net/{name}.csv"

    dbutils.fs.mv(part_file_path, final_output_path)
    dbutils.fs.rm(temp_path, True)

    display(dbutils.fs.ls("abfss://gold@covid2025storage.dfs.core.windows.net/"))
