## Transform Population By Age data by performing the transformations below
####-----------------------------------------------------------------------
1. Split the country code & age group
2. Exclude all data other than 2019
3. Remove non numeric data from percentage
4. Pivot the data by age group
5. Join to dim_country to get the country, 3 digit country code and the total population.

####-----------------------------------------------------------------------

### Replace **storage account name** with your storage account name before executing. 

In [0]:
from pyspark.sql.functions import *

### Read the population data & create a temp view

In [0]:
df_raw_population = spark.read.csv("/mnt/covidprojadls/raw/population", sep=r'\t', header=True)

In [0]:
df_raw_population.show(10)

+-----------------+-----+-----+-----+------+-----+-----+-----+-----+-----+-----+-----+-----+
|indic_de,geo\time|2008 |2009 |2010 | 2011 |2012 |2013 |2014 |2015 |2016 |2017 |2018 |2019 |
+-----------------+-----+-----+-----+------+-----+-----+-----+-----+-----+-----+-----+-----+
|      PC_Y0_14,AD|14.6 |14.5 |14.5 | 15.5 |15.5 |15.5 |   : |   : |   : |   : |   : |13.9 |
|      PC_Y0_14,AL|24.1 |23.3 |22.5 | 21.6 |20.7 |20.1 |19.6 |19.0 |18.5 |18.2 |17.7 |17.2 |
|      PC_Y0_14,AM|19.0 |18.6 |18.3 |    : |   : |   : |   : |19.4 |19.6 |20.0 |20.2 |20.2 |
|      PC_Y0_14,AT|15.4 |15.1 |14.9 | 14.7 |14.6 |14.4 |14.3 |14.3 |14.3 |14.4 |14.4 |14.4 |
|      PC_Y0_14,AZ|23.2 |22.6 |22.6 | 22.3 |22.2 |22.3 |22.4 |22.4 |22.5 |22.6 |22.6 |22.4 |
|      PC_Y0_14,BE|16.9 |16.9 |16.9 |17.0 b|17.0 |17.0 |17.0 |17.0 |17.0 |17.0 |17.0 |16.9 |
|      PC_Y0_14,BG|13.1 |13.1 |13.2 | 13.2 |13.4 |13.6 |13.7 |13.9 |14.0 |14.1 |14.2 |14.4 |
|      PC_Y0_14,BY|14.7 |14.6 |   : | 14.9 |15.1 |15.4 |15.7 |16.0 |16

In [0]:
df_raw_population1 = df_raw_population.withColumn('age_group', regexp_replace(split(df_raw_population['indic_de,geo\\time'], ',')[0], 'PC_', '')).withColumn('country_code', split(df_raw_population['indic_de,geo\\time'], ',')[1])

df_raw_population2 = df_raw_population1.select(col("country_code").alias("country_code"),
                                             col("age_group").alias("age_group"),
                                             col("2019 ").alias("percentage_2019"))
                                             
df_raw_population2.createOrReplaceTempView("raw_population")

In [0]:
spark.sql("select * from raw_population limit 10").show()

+------------+---------+---------------+
|country_code|age_group|percentage_2019|
+------------+---------+---------------+
|          AD|    Y0_14|          13.9 |
|          AL|    Y0_14|          17.2 |
|          AM|    Y0_14|          20.2 |
|          AT|    Y0_14|          14.4 |
|          AZ|    Y0_14|          22.4 |
|          BE|    Y0_14|          16.9 |
|          BG|    Y0_14|          14.4 |
|          BY|    Y0_14|          16.9 |
|          CH|    Y0_14|          15.0 |
|          CY|    Y0_14|          16.1 |
+------------+---------+---------------+



### Pivot the data by age group

In [0]:
# Create a data frame with pivoted percentages
df_raw_population_pivot = spark.sql("SELECT country_code, age_group, cast(regexp_replace(percentage_2019, '[a-z]', '') AS decimal(4,2)) AS percentage_2019 FROM raw_population WHERE length(country_code) = 2").groupBy("country_code").pivot("age_group").sum("percentage_2019").orderBy("country_code")
df_raw_population_pivot.createOrReplaceTempView("raw_population_pivot")

In [0]:
spark.sql("select * from raw_population_pivot limit 10").show()

+------------+-----+------+------+------+------+-------+
|country_code|Y0_14|Y15_24|Y25_49|Y50_64|Y65_79|Y80_MAX|
+------------+-----+------+------+------+------+-------+
|          AD|13.90| 10.60| 39.40| 22.50| 10.20|   3.40|
|          AL|17.20| 15.50| 33.00| 20.20| 11.40|   2.70|
|          AM|20.20| 11.80| 36.90| 19.10|  9.00|   3.00|
|          AT|14.40| 10.90| 34.00| 21.70| 13.80|   5.00|
|          AZ|22.40| 14.10| 39.10| 17.60|  5.30|   1.50|
|          BE|16.90| 11.40| 32.70| 20.10| 13.30|   5.60|
|          BG|14.40|  8.90| 35.00| 20.40| 16.50|   4.80|
|          BY|16.90|  9.90| 36.60| 21.30| 11.30|   3.90|
|          CH|15.00| 10.60| 35.00| 20.90| 13.30|   5.20|
|          CY|16.10| 12.80| 37.10| 17.90| 12.50|   3.70|
+------------+-----+------+------+------+------+-------+



### Read the country lookup

In [0]:
%fs
ls /mnt/covidprojadls/lookupdata/

path,name,size,modificationTime
dbfs:/mnt/covidprojadls/lookupdata/country_lookup.csv,country_lookup.csv,7020,1718971560000
dbfs:/mnt/covidprojadls/lookupdata/dim_date.csv,dim_date.csv,75435,1718971561000


In [0]:
# Create a data frame for the country lookup
df_dim_country = spark.read.csv("/mnt/covidprojadls/lookupdata/country_lookup.csv", sep=r',', header=True)
df_dim_country.createOrReplaceTempView("dim_country")

In [0]:
spark.sql("select * from dim_country limit 10").show()

+--------------------+--------------------+--------------------+---------+----------+
|             country|country_code_2_digit|country_code_3_digit|continent|population|
+--------------------+--------------------+--------------------+---------+----------+
|               Aruba|                  AW|                 ABW|  America|    106766|
|         Afghanistan|                  AF|                 AFG|     Asia|  38928341|
|              Angola|                  AO|                 AGO|   Africa|  32866268|
|            Anguilla|                  AI|                 AIA|  America|     15002|
|             Albania|                  AL|                 ALB|   Europe|   2862427|
|             Andorra|                  AD|                 AND|   Europe|     76177|
|United Arab Emirates|                  AE|                 ARE|     Asia|   9890400|
|           Argentina|                  AR|                 ARG|  America|  45195777|
|             Armenia|                  AM|           

### Join population data with country lookup

In [0]:
df_processed_population = spark.sql("""SELECT c.country,
       c.country_code_2_digit,
       c.country_code_3_digit,
       c.population,
       p.Y0_14  AS age_group_0_14,
       p.Y15_24 AS age_group_15_24,
       p.Y25_49 AS age_group_25_49,
       p.Y50_64 AS age_group_50_64, 
       p.Y65_79 AS age_group_65_79,
       p.Y80_MAX AS age_group_80_max
  FROM raw_population_pivot p
  JOIN dim_country c ON p.country_code = c.country_code_2_digit
 ORDER BY country""")

In [0]:
display(df_processed_population)

country,country_code_2_digit,country_code_3_digit,population,age_group_0_14,age_group_15_24,age_group_25_49,age_group_50_64,age_group_65_79,age_group_80_max
Albania,AL,ALB,2862427,17.2,15.5,33.0,20.2,11.4,2.7
Andorra,AD,AND,76177,13.9,10.6,39.4,22.5,10.2,3.4
Armenia,AM,ARM,2963234,20.2,11.8,36.9,19.1,9.0,3.0
Austria,AT,AUT,8858775,14.4,10.9,34.0,21.7,13.8,5.0
Azerbaijan,AZ,AZE,10139175,22.4,14.1,39.1,17.6,5.3,1.5
Belarus,BY,BLR,9449321,16.9,9.9,36.6,21.3,11.3,3.9
Belgium,BE,BEL,11455519,16.9,11.4,32.7,20.1,13.3,5.6
Bulgaria,BG,BGR,7000039,14.4,8.9,35.0,20.4,16.5,4.8
Croatia,HR,HRV,4076246,14.4,10.9,32.5,21.6,15.2,5.3
Cyprus,CY,CYP,875899,16.1,12.8,37.1,17.9,12.5,3.7


### Write output to the processed mount point

In [0]:
df_processed_population.write.format("csv").option("header","true").option("delimiter", ",").mode("overwrite").save("/mnt/covidprojadls/processed/population")