In [4]:
# ### Initialize a Spark session --- need when run locally
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName("AuHealthExpnd").getOrCreate()

In [28]:
# Include shared notebooks
%run "../includes/configuration.ipynb"

In [None]:
%run "../includes/common_functions.ipynb"

In [18]:
#### Step 0. explore data initially

population_df = df = spark.read.csv("/mnt/auhealthexpnd23dl/raw/au_population.csv", header=True, inferSchema=True)
population_df.show()

In [8]:
#### Step 1. Define file schema and read file
# will failed if the input file does not follow the defined schema. To prevent populating corrupted file

from pyspark.sql.types import StructType, StructField, LongType, StringType

population_schema = StructType(fields=[StructField("Time", StringType(), False),
                                    StructField("Population-New South Wales", LongType(), False),
                                    StructField("Population-Victoria", LongType(), False),
                                    StructField("Population-Queensland", LongType(), False),
                                    StructField("Population-South Australia", LongType(), False),
                                    StructField("Population-Western Australia", LongType(), False),
                                    StructField("Population-Tasmania", LongType(), False),
                                    StructField("Population-Northern Territory", LongType(), False),
                                    StructField("Population-Australian Capital Territory", LongType(), False),
                                    # Aus population can be nullable, as we can calculate from other columns
                                    StructField("Population-Australia", LongType(), True)
                                    ])

population_df = spark.read.schema(population_schema) \
    .option("header", True) \
    .csv("/mnt/auhealthexpnd23dl/raw/au_population.csv")

In [9]:
population_df.show()

+------+--------------------------+-------------------+---------------------+--------------------------+----------------------------+-------------------+-----------------------------+---------------------------------------+--------------------+
|  Time|Population-New South Wales|Population-Victoria|Population-Queensland|Population-South Australia|Population-Western Australia|Population-Tasmania|Population-Northern Territory|Population-Australian Capital Territory|Population-Australia|
+------+--------------------------+-------------------+---------------------+--------------------------+----------------------------+-------------------+-----------------------------+---------------------------------------+--------------------+
|Jun-81|                   5234889|            3946917|              2345208|                   1318769|                     1300056|             427224|                       122616|                                 227581|            14923260|
|Sep-81|            

In [10]:
#### Step 2. Filter rows from 2011 onwards to 2059
from pyspark.sql.functions import col

# only need data in Jun (end of financial year), from 2011. Limit to 2059 for regex, as Jun-94 is Jun 1984 not 2094
pattern = "^(Jun)-([1-5][1-9])$"

population_filtered_df = df.filter(col("Time").rlike(pattern))

population_filtered_df.show()

+------+--------------------------+-------------------+---------------------+--------------------------+----------------------------+-------------------+-----------------------------+---------------------------------------+--------------------+
|  Time|Population-New South Wales|Population-Victoria|Population-Queensland|Population-South Australia|Population-Western Australia|Population-Tasmania|Population-Northern Territory|Population-Australian Capital Territory|Population-Australia|
+------+--------------------------+-------------------+---------------------+--------------------------+----------------------------+-------------------+-----------------------------+---------------------------------------+--------------------+
|Jun-11|                   7218529|            5537817|              4476778|                   1639614|                     2353409|             511483|                       231292|                                 367985|            22340024|
|Jun-12|            

In [29]:
#### Step 3. Rename columns
population_renamed_df = population_filtered_df.withColumnRenamed("Population-New South Wales", "NSW") \
    .withColumnRenamed("Population-Victoria", "VIC") \
    .withColumnRenamed("Population-Queensland", "QLD") \
    .withColumnRenamed("Population-South Australia", "SA") \
    .withColumnRenamed("Population-Western Australia", "WA") \
    .withColumnRenamed("Population-Tasmania", "TAS") \
    .withColumnRenamed("Population-Northern Territory", "NT") \
    .withColumnRenamed("Population-Australian Capital Territory", "ACT") \
    .withColumnRenamed("Population-Australia", "AU") \
    .withColumnRenamed("Time", "time")

In [30]:
population_renamed_df.show()

+------+-------+-------+-------+-------+-------+------+------+------+--------+
|  time|    NSW|    VIC|    QLD|     SA|     WA|   TAS|    NT|   ACT|      AU|
+------+-------+-------+-------+-------+-------+------+------+------+--------+
|Jun-11|7218529|5537817|4476778|1639614|2353409|511483|231292|367985|22340024|
|Jun-12|7304244|5651091|4568687|1656725|2425507|511724|235915|376539|22733465|
|Jun-13|7404032|5772669|4652824|1671488|2486944|512231|241722|383257|23128129|
|Jun-14|7508353|5894917|4719653|1686945|2517608|513621|242894|388799|23475686|
|Jun-15|7616168|6022322|4777692|1700668|2540672|515117|244692|395813|23815995|
|Jun-16|7732858|6173172|4845152|1712843|2555978|517514|245678|403104|24190907|
|Jun-17|7855316|6302608|4926380|1728673|2585720|526762|247412|415046|24592588|
|Jun-18|7954476|6423038|5006623|1746137|2617792|537291|247095|426081|24963258|
|Jun-19|8046748|6537305|5088847|1767395|2659625|547841|246559|435730|25334826|
|Jun-21|8097062|6547822|5215814|1802601|2749365|5672

In [31]:
#### Step 4. Unpivot, convert df to 3 columns only (Time, Jurisdiction, Population). And add ingestion_date column
population_unpivot_df = population_renamed_df.unpivot("time", ["NSW", "VIC", "QLD", "SA", "WA", "TAS", "NT", "ACT", "AU"], "jurisdiction", "population")
population_final_df = add_ingestion_date(population_unpivot_df)
population_final_df.show()

+------+------------+----------+--------------------+
|  time|jurisdiction|population|      ingestion_date|
+------+------------+----------+--------------------+
|Jun-11|         NSW|   7218529|2023-10-31 15:36:...|
|Jun-11|         VIC|   5537817|2023-10-31 15:36:...|
|Jun-11|         QLD|   4476778|2023-10-31 15:36:...|
|Jun-11|          SA|   1639614|2023-10-31 15:36:...|
|Jun-11|          WA|   2353409|2023-10-31 15:36:...|
|Jun-11|         TAS|    511483|2023-10-31 15:36:...|
|Jun-11|          NT|    231292|2023-10-31 15:36:...|
|Jun-11|         ACT|    367985|2023-10-31 15:36:...|
|Jun-11|          AU|  22340024|2023-10-31 15:36:...|
|Jun-12|         NSW|   7304244|2023-10-31 15:36:...|
|Jun-12|         VIC|   5651091|2023-10-31 15:36:...|
|Jun-12|         QLD|   4568687|2023-10-31 15:36:...|
|Jun-12|          SA|   1656725|2023-10-31 15:36:...|
|Jun-12|          WA|   2425507|2023-10-31 15:36:...|
|Jun-12|         TAS|    511724|2023-10-31 15:36:...|
|Jun-12|          NT|    235

In [None]:
#### Step 5. Save to Parquet file and store in Processed folder
population_final_df.write.mode("overwrite").parquet(f"{processed_folder_path}/population")