In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
storage_account_name = "covidreportingdladf"
client_id = dbutils.secrets.get(scope="covid-reporting-scope", key="client-id")
client_secret = dbutils.secrets.get(scope="covid-reporting-scope", key="client-secret")
tenant_id = dbutils.secrets.get(scope="covid-reporting-scope", key="tenant-id")

spark.conf.set(
    f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net",
    "OAuth"
)
spark.conf.set(
    f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net",
    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net",
    client_id
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net",
    client_secret
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net",
    f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
)

### 1) Ingestion

Reading Country Lookup File

In [0]:
df_country_lookup = spark.read.format('csv')\
                              .option('header','true')\
                              .option('inferSchema','true')\
                              .load('abfss://lookup@covidreportingdladf.dfs.core.windows.net/country_lookup/country_lookup.csv')

Reading Dim Date File

In [0]:
df_dim_date = spark.read.format('csv')\
                              .option('header','true')\
                              .option('inferSchema','true')\
                              .load('abfss://lookup@covidreportingdladf.dfs.core.windows.net/dim_date/dim_date.csv')

Reading the Population File

In [0]:
df_population_raw = spark.read.format('csv')\
                              .option('header','true')\
                              .option('inferSchema','true')\
                              .option('sep','\t')\
                              .load('abfss://raw@covidreportingdladf.dfs.core.windows.net/population/population_by_age.tsv')

### 2) Transformation 

Removing Unnecessary Column

In [0]:
df_population = df_population_raw.drop("2008 ","2009 ","2010 ","2011 ","2012 ","2013 ","2014 ","2015 ","2016 ","2017 ","2018 ")

Cleaning and Transformaing the multiple columns

In [0]:
df_population = df_population.withColumn("age_group", split(col("indic_de,geo\\time"), ',').getItem(0))\
                             .withColumn("country_code", substring(split(col("indic_de,geo\\time"), ',').getItem(1), 1, 2))\
                             .withColumn("age_group", regexp_replace(col("age_group"), "PC_", ""))\
                             .withColumn("2019_numeric", regexp_extract(col("2019 "), r"([0-9]+\.[0-9]+|[0-9]+)", 1))\
                             .drop("indic_de,geo\\time")\
                             .drop("2019 ")

Replacing Blank text ('') with (null)

In [0]:
df_population = df_population.na.replace('', None, subset=["2019_numeric"])

Casting the field to double type

In [0]:
df_population = df_population.withColumn("2019_numeric", col("2019_numeric").cast("double"))

Pivoting the data to convert it into correct structure

In [0]:
df_population_pivot = df_population.groupBy("country_code").pivot("age_group").agg(round(sum("2019_numeric"), 1))

Joining the Population with Country Lookup

In [0]:
df_processed_population = df_population_pivot.join(df_country_lookup, df_population_pivot.country_code == df_country_lookup.country_code_2_digit, "inner")

Remaining and Selecting the reqiured columns

In [0]:
df_processed_population = df_processed_population.select(
    "country",
    "country_code_2_digit",
    "country_code_3_digit",
    "population",
    col("Y0_14").alias("age_group_0_14"),
    col("Y15_24").alias("age_group_15_24"),
    col("Y25_49").alias("age_group_25_49"),
    col("Y50_64").alias("age_group_50_64"),
    col("Y65_79").alias("age_group_65_79"),
    col("Y80_MAX").alias("age_group_80_max")
)

Sorting the data in ascending order based on Country

In [0]:
df_processed_population = df_processed_population.sort("country")

### 3) Writing

In [0]:
df_processed_population.write.format("csv")\
                             .option("header", "true")\
                             .mode("overwrite")\
                             .save("abfss://processed@covidreportingdladf.dfs.core.windows.net/population/")