# Getting Started with Azure Databricks - A Census Data Example

This notebook is a simple example of working with data in Azure Databricks.

If you are reading this on the wiki, you can find the working Notebook in the Azure Databricks environment at the following path: [/Shared/Getting Started with Azure Databricks - A Census Data Example](https://adb-5195694350474952.12.azuredatabricks.net/?o=5195694350474952#notebook/3876325875947739)

## Explore the ADLS folders

You can explore filesystems directly through Notebooks by using the [`dbutils.fs` client](https://docs.microsoft.com/en-us/azure/databricks/dev-tools/databricks-utils).

Below we exaplore the CSVs for the night population census:

In [0]:
%python
display(dbutils.fs.ls("/"))

In [0]:
%python
display(dbutils.fs.ls("abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/"))

In [0]:
%python
display(dbutils.fs.ls("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz"))

As you can see above, the ADLS Gen2 container for the project (`abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/`) is set as the default filesystem for clusters associated with that project (in this case `sandbox`).

## Read in the CSVs and explore the data

We can read the source CSVs into Spark as DataFrames and explore the data.

We use the [DataFrame API](https://spark.apache.org/docs/3.0.0/sql-getting-started.html) to transform the DataFrames.

In [0]:
%python
df = spark.read.csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")

In [0]:
%python
display(df)

The datatypes and headers look incorrect: all the datatypes are strings and the headers have not been read. We can fix this by passing in [reader options](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#manually-specifying-options).

In [0]:
%python
df = spark.read.option("header", True).option("inferSchema", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")

In [0]:
%python
display(df)

In [0]:
%python
df.count()

Let's take a look at a couple of the dimension tables too.

In [0]:
%python
df = spark.read.option("header", True).option("inferSchema", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupAge8317.csv")

In [0]:
%python
display(df)

In [0]:
%python
df = spark.read.option("header", True).option("inferSchema", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupEthnic8317.csv")

In [0]:
%python
display(df)

Observation: the dimension tables seem to have a consistent schema.

## Denormalise the source tables
Since we are using Apache Spark we want to limit joins the number of joins/data transfer between nodes. Filters and aggregations suit the architecture better, and data will be stored in columnar-compressed files therefore it would make sense to denormalise the data.

Let's join all the data into one large DataFrame:

In [0]:
%python
from pyspark.sql.functions import col

denorm_df = spark.read.option("header", True).option("inferSchema", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")
for dim in ["Age", "Area", "Ethnic", "Sex", "Year"]:
  dim_df = spark.read.option("header", True).option("inferSchema", True).csv(f"/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookup{dim}8317.csv")
  denorm_df = denorm_df.join(dim_df, col(dim) == col("Code")).drop("Code", dim).withColumnRenamed("Description", dim).withColumnRenamed("SortOrder", f"{dim}SortOrder")

In [0]:
%python
display(denorm_df)

In [0]:
%python
denorm_df.count()

## Investigate duplicates 🕵️‍♀️

>Pre-join count: 34959673

>Post-join count: 48585735

The counts look incorrect: the dimension joins are only lookups and should not produce additional rows.

Let's look into why this has happened.

Hypothesis: the code column shouldn't be inferred as an integer column.

With schema inference:

In [0]:
%python
df = spark.read.option("header", True).option("inferSchema", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupAge8317.csv")

In [0]:
%python
display(df)

Without schema inference:

In [0]:
%python
df = spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupAge8317.csv")

In [0]:
%python
display(df)

Schema inference shot us in the foot! 🦶🔫

Try again without infering any datatypes (we can manually cast later!):

In [0]:
%python
from pyspark.sql.functions import col

denorm_df = spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")
for dim in ["Age", "Area", "Ethnic", "Sex", "Year"]:
  dim_df = spark.read.option("header", True).csv(f"/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookup{dim}8317.csv")
  denorm_df = denorm_df.join(dim_df, col(dim) == col("Code")).drop("Code", dim).withColumnRenamed("Description", dim).withColumnRenamed("SortOrder", f"{dim}SortOrder")

In [0]:
%python
denorm_df.count()

>Pre-join count: 34959673

>Post-join count: 34885323

Closer, but it looks like we lost a few rows this time.

Let's try a left join:

In [0]:
%python
from pyspark.sql.functions import col

denorm_df = spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")
for dim in ["Age", "Area", "Ethnic", "Sex", "Year"]:
  dim_df = spark.read.option("header", True).csv(f"/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookup{dim}8317.csv")
  denorm_df = denorm_df.join(dim_df, col(dim) == col("Code"), how="left").drop("Code", dim).withColumnRenamed("Description", dim).withColumnRenamed("SortOrder", f"{dim}SortOrder")

In [0]:
%python
denorm_df.count()

Bingo! The counts match. However, this implies something doesn't join to it's dimension lookup.

Let's hunt for nulls:

In [0]:
%python
display(denorm_df)

In [0]:
%python
denorm_df_nulls = denorm_df.filter(col("Age").isNull() | col("Area").isNull() | col("Ethnic").isNull() | col("Sex").isNull() | col("Year").isNull())

In [0]:
%python
denorm_df_nulls.count()

In [0]:
%python
display(denorm_df_nulls)

In [0]:
%python
denorm_df_nulls.filter(col("Age") == "Median age").count()

All the duplicates come from the median age category.

We should take some time to understand our data!

In [0]:
%python
display(spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupAge8317.csv"))

Let's filter the fact table by the top two codes as they look odd:

In [0]:
%python
df = spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv").filter(col("Age").isin(["999999", "888"]))

In [0]:
%python
display(df)

In [0]:
%python
display(df.filter(col("Age") == "888"))

It's getting a little hard to trace columns and codes, so let's denormalise whilst retaining the code columns:

In [0]:
%python
from pyspark.sql.functions import col

denorm_df = spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")
for dim in ["Age", "Area", "Ethnic", "Sex", "Year"]:
  dim_df = spark.read.option("header", True).csv(f"/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookup{dim}8317.csv")
  denorm_df = denorm_df.join(dim_df, col(dim) == col("Code"), how="left").drop("Code").withColumnRenamed(dim, f"{dim}Code").withColumnRenamed("Description", dim).withColumnRenamed("SortOrder", f"{dim}SortOrder")

In [0]:
%python
denorm_df_nulls = denorm_df.filter(col("Age").isNull() | col("Area").isNull() | col("Ethnic").isNull() | col("Sex").isNull() | col("Year").isNull())

In [0]:
%python
display(denorm_df_nulls)

The area code doesn't match.

Let's dig deeper:

In [0]:
%python
display(spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupArea8317.csv").filter(col("Code").isin(["1", "01"])))

Seems like some of the area codes don't lookup correctly.

Let's also look at sex:

In [0]:
%python
display(denorm_df_nulls)

In [0]:
%python
display(spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupSex8317.csv"))

Conclusion: the data looks a little odd and needs investigating further. For now, let's continue with the denormalisation as we can fix these issues later by keeping the raw CSVs.

## Creating output files, Hive databases and tables

When writing out DataFrames, we can write them out in [various formats](https://spark.apache.org/docs/3.0.0/sql-data-sources.html), and optionally add a Hive table over these files.

Hive tables are 'virtual' SQL tables over data stored (in this case stored on ADLS).
We can use either:
* External tables: create tables over existing data
* Hive-managed tables: create tables and data at the same time

Both options produce the same end, and are only subtly different.

When we create Hive tables, we are really writing out the DataFrame to ADLS and adding a schema and file path reference to Hive.

Let's create a Hive database:

In [0]:
%sql
create database if not exists sandbox;
use sandbox;

Now, let's create our final DataFrame we would like to write out:

In [0]:
%python
denorm_df = spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")
for dim in ["Age", "Area", "Ethnic", "Sex", "Year"]:
  dim_df = spark.read.option("header", True).csv(f"/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookup{dim}8317.csv")
  denorm_df = denorm_df.join(dim_df, col(dim) == col("Code"), how="left").drop("Code").withColumnRenamed(dim, f"{dim}Code").withColumnRenamed("Description", dim).withColumnRenamed("SortOrder", f"{dim}SortOrder")

We can write the files out directly as Parquet (with no Hive table):

In [0]:
%python
denorm_df.write.mode("overwrite").parquet("/data/derived/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz_denorm/")

Or we can write the files out (by default in Parquet) and create a Hive table:

In [0]:
%python
denorm_df.write.mode("overwrite").saveAsTable("sandbox.age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz")

We can now query the Hive table using SQL:

In [0]:
%sql
select * from sandbox.age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz

And we have three ways of opening the Hive table as a DataFrame:

In [0]:
%python
df_s = spark.sql("select * from sandbox.age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz")
df_h = spark.read.table("sandbox.age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz")
df_p = spark.read.parquet("/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/")

The underlying Hive-backed table metadata and data files look like the following:

In [0]:
%sql
describe formatted sandbox.age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz;

In [0]:
%python
display(dbutils.fs.ls("/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/"))

## Summary

We now have a virtual Hive table we can query using SQL, and we can create a DataFrame using an SQL query, a reference to the Hive table or by reading the underlying Parquet files.

And since the underlying files are Snappy-compressed Parquet, the underlying filesize has gone from 800Mb CSVs (100Mb compressed) to 50Mb Parquet files (even though we denormalised!).