Imports

In [0]:
import dlt
from pyspark.sql.functions import *

Download dataset and store in a Unity Catalog volume

In [0]:
catalog = 'main'
databaseName = 'default'
volumeName = 'unity_volume'
spark.sql("CREATE VOLUME " + catalog + "." + databaseName + "." + volumeName)

DataFrame[]

In [0]:
import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/main/default/unity_volume/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

True

Ingest raw data into a bronze (raw) table

In [0]:
@dlt.table(
  comment="Data ingested from NY State Department of Health"
)
def dlt_bronze():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

Name,Type
Year,int
First_Name,string
County,string
Sex,string
Count,int


Clean and prepare data

In [0]:
@dlt.table(
  comment="Data cleaned and prepared for analysis"
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def dlt_silver():
  return (
    dlt.read("dlt_bronze")
    .withColumnRenamed("Year", "Year_Of_Birth")
    .select("Year_Of_Birth", "First_Name", "Count")
  )

Name,Type
Year_Of_Birth,int
First_Name,string
Count,int


Aggregate data

In [0]:
@dlt.table(
  comment="A table summarizing counts of the top names for NY in 2021."
)
def dlt_gold():
  return (
    dlt.read("dlt_silver")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

Name,Type
First_Name,string
Total_Count,bigint
