This notebook creates the staging table and the silver table

# Read file from adls

In [0]:
from pyspark.sql.functions import col, lit, current_date, date_sub
from delta.tables import DeltaTable

In [0]:
df = spark.read.format('parquet').load('abfss://bronze@storagegeneral00001.dfs.core.windows.net/volumes/raw_covid/')

In [0]:
df = df.select(
    col('Date_reported').cast('date'),
    col('Country_code'),
    col('Country'),
    col('WHO_Region'),
    col('New_cases').cast('int'),
    col('Cumulative_cases').cast('bigint'),
    col('New_deaths').cast('int'),
    col('Cumulative_deaths').cast('bigint')
    ).withColumn('Last_update', current_date())


df.display()

In [0]:
if not spark.catalog.tableExists('cat_covid.silver.covid'):
    df = df.na.drop(subset=['Country', 'Date_reported'])
    df = df.na.fill(0, subset=['New_cases', 'Cumulative_cases', 'New_deaths', 'Cumulative_deaths'])

# Create staging table for the last 30 days

In [0]:
df_staging = df.filter(col('Date_reported') >= date_sub(current_date(), 30))

df_staging = df_staging.na.drop(subset=['Country'])
df_staging = df_staging.fillna(0, subset=['New_cases', 'Cumulative_cases', 'New_deaths', 'Cumulative_deaths'])

df_staging.write.mode('overwrite').format('delta').option('overwriteSchema', True).saveAsTable('cat_covid.bronze.stagingcovid')


# Incremental loading
The incremental loading will consider the last 30 days in order to update the registers and insert the new ones

In [0]:
if spark.catalog.tableExists('cat_covid.silver.covid'):
    target = DeltaTable.forName(spark, 'cat_covid.silver.covid')
    target.alias('t').merge(
        df_staging.alias('s'),
        't.Date_reported = s.Date_reported AND t.Country_code = s.Country_code'
        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
else:
    df.write.mode('overwrite').format('delta').option('overwriteSchema', True).saveAsTable('cat_covid.silver.covid')
