# Establishing Connections (Linking to the Storage Account Services)

In [0]:
dbutils.secrets.listScopes()

[SecretScope(name='keyvault20241026scope')]

In [0]:
dbutils.secrets.list("keyvault20241026scope")

[SecretMetadata(key='client-id'),
 SecretMetadata(key='client-secret'),
 SecretMetadata(key='directory-id')]

In [0]:
client_id = dbutils.secrets.get("keyvault20241026scope", "client-id")
client_secret = dbutils.secrets.get("keyvault20241026scope", "client-secret")
directory_id = dbutils.secrets.get("keyvault20241026scope", "directory-id")

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": client_id,
          "fs.azure.account.oauth2.client.secret": client_secret,
          "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{directory_id}/oauth2/token"}

## Mounting the storage containers locally

In [0]:
storage_account = "datalake20241022"

container_raw = "raw"
container_processed = "processed"

source_raw = f"abfss://{container_raw}@{storage_account}.dfs.core.windows.net/"
mount_point_raw = f"/mnt/{storage_account}/{container_raw}"

source_processed = f"abfss://{container_processed}@{storage_account}.dfs.core.windows.net/"
mount_point_processed = f"/mnt/{storage_account}/{container_processed}"

In [0]:
if any(mount.mountPoint == mount_point_raw for mount in dbutils.fs.mounts()):
    # Unmount the existing mount point
    dbutils.fs.unmount(mount_point_raw)

    print(f"Unmount existing mount at {mount_point_raw}")

try:
    dbutils.fs.mount(
        source = source_raw,
        mount_point = mount_point_raw,
        extra_configs = configs
    )
    print(f"Mounted successfully at {mount_point_raw}")
except Exception as e:
    print(f"Error mounting: {mount_point_raw}")

/mnt/datalake20241022/raw has been unmounted.
Unmount existing mount at /mnt/datalake20241022/raw
Mounted successfully at /mnt/datalake20241022/raw


In [0]:
if any(mount.mountPoint == mount_point_processed for mount in dbutils.fs.mounts()):
    # Unmount the existing mount point
    dbutils.fs.unmount(mount_point_processed)

    print(f"Unmount existing mount at {mount_point_processed}")

try:
    dbutils.fs.mount(
        source = source_processed,
        mount_point = mount_point_processed,
        extra_configs = configs
    )
    print(f"Mounted successfully at {mount_point_processed}")
except Exception as e:
    print(f"Error mounting: {mount_point_processed}")

/mnt/datalake20241022/processed has been unmounted.
Unmount existing mount at /mnt/datalake20241022/processed
Mounted successfully at /mnt/datalake20241022/processed


# Extraction: Reading the CSV/TSV File

In [0]:
csv_files = "population_by_age_*.tsv"

df = spark.read.options(delimiter = "\t", header = True, inferSchema = True).csv(f"{mount_point_raw}/{csv_files}")
df.show(15)

+-----------------+------+-----+------+------+-----+-----+------+-----+-----+-----+-----+-----+
|indic_de,geo\time| 2008 |2009 | 2010 | 2011 |2012 |2013 | 2014 |2015 |2016 |2017 |2018 |2019 |
+-----------------+------+-----+------+------+-----+-----+------+-----+-----+-----+-----+-----+
|     PC_Y50_64,PL| 19.6 |20.2 |21.0 b| 21.4 |21.5 |21.4 | 21.3 |21.1 |20.8 |20.4 |20.1 |19.7 |
|     PC_Y50_64,PT| 18.3 |18.6 | 18.8 | 19.1 |19.4 |19.8 | 20.0 |20.2 |20.4 |20.6 |20.7 |20.9 |
|     PC_Y50_64,RO| 18.9 |20.0 | 20.0 | 20.1 |20.0 |20.0 |19.8 e|19.3 |18.9 |18.6 |19.1 |19.7 |
|     PC_Y50_64,RS| 20.7 |21.0 | 21.4 |22.7 b|22.8 |22.8 | 22.6 |22.3 |21.9 |21.7 |21.3 |20.9 |
|     PC_Y50_64,RU| 17.7 |18.7 | 19.7 | 20.5 |   : |   : | 21.4 |   : |   : |   : |   : |   : |
|     PC_Y50_64,SE| 19.6 |19.4 | 19.1 | 18.8 |18.5 |18.3 | 18.1 |18.1 |18.1 |18.1 |18.1 |18.1 |
|     PC_Y50_64,SI|19.9 b|20.0 | 20.3 | 20.9 |21.2 |21.4 | 21.5 |21.6 |21.7 |21.8 |21.8 |21.6 |
|     PC_Y50_64,SK| 18.9 |19.3 | 19.6 | 

## Transformation

## Checking Schema and Column names

In [0]:
df.printSchema()

root
 |-- indic_de,geo\time: string (nullable = true)
 |-- 2008 : string (nullable = true)
 |-- 2009 : string (nullable = true)
 |-- 2010 : string (nullable = true)
 |-- 2011 : string (nullable = true)
 |-- 2012 : string (nullable = true)
 |-- 2013 : string (nullable = true)
 |-- 2014 : string (nullable = true)
 |-- 2015 : string (nullable = true)
 |-- 2016 : string (nullable = true)
 |-- 2017 : string (nullable = true)
 |-- 2018 : string (nullable = true)
 |-- 2019 : string (nullable = true)



In [0]:
df.columns

['indic_de,geo\\time',
 '2008 ',
 '2009 ',
 '2010 ',
 '2011 ',
 '2012 ',
 '2013 ',
 '2014 ',
 '2015 ',
 '2016 ',
 '2017 ',
 '2018 ',
 '2019 ']

> Note that there's an extra space at the end of each year field.

## Renaming columns

In [0]:
df = df.withColumnRenamed("indic_de,geo\\time", "geo_time") \
               .withColumnRenamed("2008 ", "y_2008") \
               .withColumnRenamed("2009 ", "y_2009") \
               .withColumnRenamed("2010 ", "y_2010") \
               .withColumnRenamed("2011 ", "y_2011") \
               .withColumnRenamed("2012 ", "y_2012") \
               .withColumnRenamed("2013 ", "y_2013") \
               .withColumnRenamed("2014 ", "y_2014") \
               .withColumnRenamed("2015 ", "y_2015") \
               .withColumnRenamed("2016 ", "y_2016") \
               .withColumnRenamed("2017 ", "y_2017") \
               .withColumnRenamed("2018 ", "y_2018") \
               .withColumnRenamed("2019 ", "y_2019")

df.show(15)

+------------+------+------+------+------+------+------+------+------+------+------+------+------+
|    geo_time|y_2008|y_2009|y_2010|y_2011|y_2012|y_2013|y_2014|y_2015|y_2016|y_2017|y_2018|y_2019|
+------------+------+------+------+------+------+------+------+------+------+------+------+------+
|PC_Y50_64,PL| 19.6 | 20.2 |21.0 b| 21.4 | 21.5 | 21.4 | 21.3 | 21.1 | 20.8 | 20.4 | 20.1 | 19.7 |
|PC_Y50_64,PT| 18.3 | 18.6 | 18.8 | 19.1 | 19.4 | 19.8 | 20.0 | 20.2 | 20.4 | 20.6 | 20.7 | 20.9 |
|PC_Y50_64,RO| 18.9 | 20.0 | 20.0 | 20.1 | 20.0 | 20.0 |19.8 e| 19.3 | 18.9 | 18.6 | 19.1 | 19.7 |
|PC_Y50_64,RS| 20.7 | 21.0 | 21.4 |22.7 b| 22.8 | 22.8 | 22.6 | 22.3 | 21.9 | 21.7 | 21.3 | 20.9 |
|PC_Y50_64,RU| 17.7 | 18.7 | 19.7 | 20.5 |    : |    : | 21.4 |    : |    : |    : |    : |    : |
|PC_Y50_64,SE| 19.6 | 19.4 | 19.1 | 18.8 | 18.5 | 18.3 | 18.1 | 18.1 | 18.1 | 18.1 | 18.1 | 18.1 |
|PC_Y50_64,SI|19.9 b| 20.0 | 20.3 | 20.9 | 21.2 | 21.4 | 21.5 | 21.6 | 21.7 | 21.8 | 21.8 | 21.6 |
|PC_Y50_64

In [0]:
df.columns

['geo_time',
 'y_2008',
 'y_2009',
 'y_2010',
 'y_2011',
 'y_2012',
 'y_2013',
 'y_2014',
 'y_2015',
 'y_2016',
 'y_2017',
 'y_2018',
 'y_2019']

## Splitting columns

In [0]:
from pyspark.sql.functions import *

df = df.withColumn("age_range", split(df["geo_time"], ",").getItem(0)) \
                     .withColumn("country_code", split(df["geo_time"], ",").getItem(1))

df.show(15)

+------------+------+------+------+------+------+------+------+------+------+------+------+------+---------+------------+
|    geo_time|y_2008|y_2009|y_2010|y_2011|y_2012|y_2013|y_2014|y_2015|y_2016|y_2017|y_2018|y_2019|age_range|country_code|
+------------+------+------+------+------+------+------+------+------+------+------+------+------+---------+------------+
|PC_Y50_64,PL| 19.6 | 20.2 |21.0 b| 21.4 | 21.5 | 21.4 | 21.3 | 21.1 | 20.8 | 20.4 | 20.1 | 19.7 |PC_Y50_64|          PL|
|PC_Y50_64,PT| 18.3 | 18.6 | 18.8 | 19.1 | 19.4 | 19.8 | 20.0 | 20.2 | 20.4 | 20.6 | 20.7 | 20.9 |PC_Y50_64|          PT|
|PC_Y50_64,RO| 18.9 | 20.0 | 20.0 | 20.1 | 20.0 | 20.0 |19.8 e| 19.3 | 18.9 | 18.6 | 19.1 | 19.7 |PC_Y50_64|          RO|
|PC_Y50_64,RS| 20.7 | 21.0 | 21.4 |22.7 b| 22.8 | 22.8 | 22.6 | 22.3 | 21.9 | 21.7 | 21.3 | 20.9 |PC_Y50_64|          RS|
|PC_Y50_64,RU| 17.7 | 18.7 | 19.7 | 20.5 |    : |    : | 21.4 |    : |    : |    : |    : |    : |PC_Y50_64|          RU|
|PC_Y50_64,SE| 19.6 | 19

## Formatting Field Values

In [0]:
df = df.withColumn("age_range", regexp_replace("age_range", "^PC_", ""))
df.show(15)

+------------+------+------+------+------+------+------+------+------+------+------+------+------+---------+------------+
|    geo_time|y_2008|y_2009|y_2010|y_2011|y_2012|y_2013|y_2014|y_2015|y_2016|y_2017|y_2018|y_2019|age_range|country_code|
+------------+------+------+------+------+------+------+------+------+------+------+------+------+---------+------------+
|PC_Y50_64,PL| 19.6 | 20.2 |21.0 b| 21.4 | 21.5 | 21.4 | 21.3 | 21.1 | 20.8 | 20.4 | 20.1 | 19.7 |   Y50_64|          PL|
|PC_Y50_64,PT| 18.3 | 18.6 | 18.8 | 19.1 | 19.4 | 19.8 | 20.0 | 20.2 | 20.4 | 20.6 | 20.7 | 20.9 |   Y50_64|          PT|
|PC_Y50_64,RO| 18.9 | 20.0 | 20.0 | 20.1 | 20.0 | 20.0 |19.8 e| 19.3 | 18.9 | 18.6 | 19.1 | 19.7 |   Y50_64|          RO|
|PC_Y50_64,RS| 20.7 | 21.0 | 21.4 |22.7 b| 22.8 | 22.8 | 22.6 | 22.3 | 21.9 | 21.7 | 21.3 | 20.9 |   Y50_64|          RS|
|PC_Y50_64,RU| 17.7 | 18.7 | 19.7 | 20.5 |    : |    : | 21.4 |    : |    : |    : |    : |    : |   Y50_64|          RU|
|PC_Y50_64,SE| 19.6 | 19

In [0]:
df.select("age_range").distinct().show()

+---------+
|age_range|
+---------+
|   Y65_79|
|   Y50_64|
|  Y80_MAX|
|   Y25_49|
|   Y15_24|
|    Y0_14|
+---------+



In [0]:
df.select("country_code").distinct().show()

+------------+
|country_code|
+------------+
|          LT|
|          AZ|
|          FI|
|          UA|
|          RO|
|          NL|
|          PL|
|          AM|
|          MK|
|          EE|
|          SM|
|          AT|
|          RU|
|          AD|
|          HR|
|          LI|
|          CZ|
|          PT|
|   EU27_2007|
|          MT|
+------------+
only showing top 20 rows



## Filtering Records

In [0]:
df = df.filter(length(col("country_code")) == 2)
df.show(15)

+------------+------+------+------+------+------+------+------+------+------+------+------+------+---------+------------+
|    geo_time|y_2008|y_2009|y_2010|y_2011|y_2012|y_2013|y_2014|y_2015|y_2016|y_2017|y_2018|y_2019|age_range|country_code|
+------------+------+------+------+------+------+------+------+------+------+------+------+------+---------+------------+
|PC_Y50_64,PL| 19.6 | 20.2 |21.0 b| 21.4 | 21.5 | 21.4 | 21.3 | 21.1 | 20.8 | 20.4 | 20.1 | 19.7 |   Y50_64|          PL|
|PC_Y50_64,PT| 18.3 | 18.6 | 18.8 | 19.1 | 19.4 | 19.8 | 20.0 | 20.2 | 20.4 | 20.6 | 20.7 | 20.9 |   Y50_64|          PT|
|PC_Y50_64,RO| 18.9 | 20.0 | 20.0 | 20.1 | 20.0 | 20.0 |19.8 e| 19.3 | 18.9 | 18.6 | 19.1 | 19.7 |   Y50_64|          RO|
|PC_Y50_64,RS| 20.7 | 21.0 | 21.4 |22.7 b| 22.8 | 22.8 | 22.6 | 22.3 | 21.9 | 21.7 | 21.3 | 20.9 |   Y50_64|          RS|
|PC_Y50_64,RU| 17.7 | 18.7 | 19.7 | 20.5 |    : |    : | 21.4 |    : |    : |    : |    : |    : |   Y50_64|          RU|
|PC_Y50_64,SE| 19.6 | 19

In [0]:
df.select("country_code").distinct().show()

+------------+
|country_code|
+------------+
|          LT|
|          AZ|
|          FI|
|          UA|
|          RO|
|          NL|
|          PL|
|          AM|
|          MK|
|          EE|
|          SM|
|          AT|
|          RU|
|          AD|
|          HR|
|          LI|
|          CZ|
|          PT|
|          MT|
|          BY|
+------------+
only showing top 20 rows



In [0]:
df.show(15)

+------------+------+------+------+------+------+------+------+------+------+------+------+------+---------+------------+
|    geo_time|y_2008|y_2009|y_2010|y_2011|y_2012|y_2013|y_2014|y_2015|y_2016|y_2017|y_2018|y_2019|age_range|country_code|
+------------+------+------+------+------+------+------+------+------+------+------+------+------+---------+------------+
|PC_Y50_64,PL| 19.6 | 20.2 |21.0 b| 21.4 | 21.5 | 21.4 | 21.3 | 21.1 | 20.8 | 20.4 | 20.1 | 19.7 |   Y50_64|          PL|
|PC_Y50_64,PT| 18.3 | 18.6 | 18.8 | 19.1 | 19.4 | 19.8 | 20.0 | 20.2 | 20.4 | 20.6 | 20.7 | 20.9 |   Y50_64|          PT|
|PC_Y50_64,RO| 18.9 | 20.0 | 20.0 | 20.1 | 20.0 | 20.0 |19.8 e| 19.3 | 18.9 | 18.6 | 19.1 | 19.7 |   Y50_64|          RO|
|PC_Y50_64,RS| 20.7 | 21.0 | 21.4 |22.7 b| 22.8 | 22.8 | 22.6 | 22.3 | 21.9 | 21.7 | 21.3 | 20.9 |   Y50_64|          RS|
|PC_Y50_64,RU| 17.7 | 18.7 | 19.7 | 20.5 |    : |    : | 21.4 |    : |    : |    : |    : |    : |   Y50_64|          RU|
|PC_Y50_64,SE| 19.6 | 19

In [0]:
df.columns

['geo_time',
 'y_2008',
 'y_2009',
 'y_2010',
 'y_2011',
 'y_2012',
 'y_2013',
 'y_2014',
 'y_2015',
 'y_2016',
 'y_2017',
 'y_2018',
 'y_2019',
 'age_range',
 'country_code']

## Selecting and Aliasing Columns

In [0]:
df = df.select("country_code", col("age_range").alias("age_group"),
               col("y_2008").alias("percentage_2008"),
               col("y_2009").alias("percentage_2009"),
               col("y_2010").alias("percentage_2010"),
               col("y_2011").alias("percentage_2011"),
               col("y_2012").alias("percentage_2012"),
               col("y_2013").alias("percentage_2013"),
               col("y_2014").alias("percentage_2014"),
               col("y_2015").alias("percentage_2015"),
               col("y_2016").alias("percentage_2016"),
               col("y_2017").alias("percentage_2017"),
               col("y_2018").alias("percentage_2018"),
               col("y_2019").alias("percentage_2019"),)

df.show(15)

+------------+---------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+
|country_code|age_group|percentage_2008|percentage_2009|percentage_2010|percentage_2011|percentage_2012|percentage_2013|percentage_2014|percentage_2015|percentage_2016|percentage_2017|percentage_2018|percentage_2019|
+------------+---------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+
|          PL|   Y50_64|          19.6 |          20.2 |         21.0 b|          21.4 |          21.5 |          21.4 |          21.3 |          21.1 |          20.8 |          20.4 |          20.1 |          19.7 |
|          PT|   Y50_64|          18.3 |          18.6 |          18.8 |          19.1 |          19.4 |          19.8 |          20

## Casting and Replacing Values

In [0]:
df.createOrReplaceTempView("population_by_age")

df = spark.sql("SELECT country_code, age_group, \
               CAST(REGEXP_REPLACE(percentage_2008, '[a-z]', '') AS DECIMAL(4,2)) AS percentage_2008, \
               CAST(REGEXP_REPLACE(percentage_2009, '[a-z]', '') AS DECIMAL(4,2)) AS percentage_2009, \
               CAST(REGEXP_REPLACE(percentage_2010, '[a-z]', '') AS DECIMAL(4,2)) AS percentage_2010, \
               CAST(REGEXP_REPLACE(percentage_2011, '[a-z]', '') AS DECIMAL(4,2)) AS percentage_2011, \
               CAST(REGEXP_REPLACE(percentage_2012, '[a-z]', '') AS DECIMAL(4,2)) AS percentage_2012, \
               CAST(REGEXP_REPLACE(percentage_2013, '[a-z]', '') AS DECIMAL(4,2)) AS percentage_2013, \
               CAST(REGEXP_REPLACE(percentage_2014, '[a-z]', '') AS DECIMAL(4,2)) AS percentage_2014, \
               CAST(REGEXP_REPLACE(percentage_2015, '[a-z]', '') AS DECIMAL(4,2)) AS percentage_2015, \
               CAST(REGEXP_REPLACE(percentage_2016, '[a-z]', '') AS DECIMAL(4,2)) AS percentage_2016, \
               CAST(REGEXP_REPLACE(percentage_2017, '[a-z]', '') AS DECIMAL(4,2)) AS percentage_2017, \
               CAST(REGEXP_REPLACE(percentage_2018, '[a-z]', '') AS DECIMAL(4,2)) AS percentage_2018, \
               CAST(REGEXP_REPLACE(percentage_2019, '[a-z]', '') AS DECIMAL(4,2)) AS percentage_2019 \
               FROM population_by_age \
               WHERE LEN(country_code) == 2")

df.show(15)

+------------+---------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+
|country_code|age_group|percentage_2008|percentage_2009|percentage_2010|percentage_2011|percentage_2012|percentage_2013|percentage_2014|percentage_2015|percentage_2016|percentage_2017|percentage_2018|percentage_2019|
+------------+---------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+
|          PL|   Y50_64|          19.60|          20.20|          21.00|          21.40|          21.50|          21.40|          21.30|          21.10|          20.80|          20.40|          20.10|          19.70|
|          PT|   Y50_64|          18.30|          18.60|          18.80|          19.10|          19.40|          19.80|          20

In [0]:
df.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- age_group: string (nullable = true)
 |-- percentage_2008: decimal(4,2) (nullable = true)
 |-- percentage_2009: decimal(4,2) (nullable = true)
 |-- percentage_2010: decimal(4,2) (nullable = true)
 |-- percentage_2011: decimal(4,2) (nullable = true)
 |-- percentage_2012: decimal(4,2) (nullable = true)
 |-- percentage_2013: decimal(4,2) (nullable = true)
 |-- percentage_2014: decimal(4,2) (nullable = true)
 |-- percentage_2015: decimal(4,2) (nullable = true)
 |-- percentage_2016: decimal(4,2) (nullable = true)
 |-- percentage_2017: decimal(4,2) (nullable = true)
 |-- percentage_2018: decimal(4,2) (nullable = true)
 |-- percentage_2019: decimal(4,2) (nullable = true)



# Loading: Writing Partitioned Data Incrementally

In [0]:
delta_file_location = "delta/population_by_age"

In [0]:
from delta.tables import DeltaTable

delta_table_path = f"{mount_point_processed}/{delta_file_location}"

if DeltaTable.isDeltaTable(spark, delta_table_path):
    existing_data = DeltaTable.forPath(spark, delta_table_path)
    
    (existing_data.alias("existing") \
        .merge(df.alias("new"), "existing.country_code = new.country_code AND existing.age_group = new.age_group") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df.write.format("delta").mode("overwrite").partitionBy("country_code", "age_group").save(delta_table_path)

In [0]:
spark.read.format("delta").load(f"{mount_point_processed}/{delta_file_location}").show(15)

+------------+---------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+
|country_code|age_group|percentage_2008|percentage_2009|percentage_2010|percentage_2011|percentage_2012|percentage_2013|percentage_2014|percentage_2015|percentage_2016|percentage_2017|percentage_2018|percentage_2019|
+------------+---------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+
|          BE|    Y0_14|          16.90|          16.90|          16.90|          17.00|          17.00|          17.00|          17.00|          17.00|          17.00|          17.00|          17.00|          16.90|
|          CY|   Y50_64|          16.90|          17.20|          17.40|          17.60|          17.70|          17.90|          18

In [0]:
print(df.count())

288
