In [1]:
from pyspark.sql.types import *
from delta.tables import *
from pyspark.sql.utils import AnalysisException

In [2]:
customSchema = StructType([
    StructField("DATE", DateType(), True),        
    StructField("PROVINCE", StringType(), True),
    StructField("REGION", StringType(), True),
    StructField("AGEGROUP", StringType(), True),
    StructField("SEX", StringType(), True),
    StructField("CASES", DoubleType(), True)
])

In [3]:
def get_keys(keyName:str)->str:
  return dbutils.secrets.get(scope = 'formula1-scope', key = keyName)

In [4]:
file_format = 'csv'
connection_string = get_keys('storage-account-connectionString')
tenant_id = get_keys('databricks-app-tenant-id')
client_id = get_keys('databricks-app-client-id')
client_secret = get_keys('databricks-app-client-secret')
subscription_id = get_keys('re-subscriber-id')
resoruce_group = 'test-rg'





In [5]:
subscription_id

In [6]:
df = spark.readStream.format("cloudFiles")\
          .option("cloudFiles.format", "csv")\
          .option("cloudFiles.connectionString", connection_string)\
          .option("cloudFiles.resourceGroup", resoruce_group)\
          .option("cloudFiles.subscriptionId", subscription_id)\
          .option("cloudFiles.tenantId", tenant_id)\
          .option("cloudFiles.clientId", client_id)\
          .option("cloudFiles.clientSecret", client_secret)\
          .option("cloudFiles.validateOptions", "false")\
          .option("header","true")\
          .schema(customSchema)\
          .load("/mnt/f234dl/raw/covid_daily")

In [7]:
%sql
CREATE TABLE IF NOT EXISTS covid_data
   (DATE DATE,
   PROVINCE STRING,
   REGION STRING,
   AGEGROUP STRING,
   SEX STRING,
   CASES DOUBLE)
  USING DELTA
  LOCATION '/mnt/f234dl/processed/covid_processed_daily'

In [8]:
def upsert_data(df, epochId):
    deltaTable = DeltaTable.forPath(spark, "/mnt/f234dl/processed/covid_processed_daily")
    deltaTable.alias("data").merge(
      df.alias("newData"),
      "data.DATE = newData.DATE and data.PROVINCE = newData.PROVINCE and data.AGEGROUP = newData.AGEGROUP and data.SEX = newData.SEX") \
    .whenNotMatchedInsertAll() \
    .execute()  

In [9]:
df.writeStream\
  .format("delta")\
  .foreachBatch(upsert_data)\
  .option("checkpointLocation", "/mnt/f234dl/processed/checkpoints")\
  .start("/mnt/f234dl/processed/covid_processed_daily")

In [10]:
%sql
SELECT * FROM delta.`/mnt/f234dl/processed/covid_processed_daily`

DATE,PROVINCE,REGION,AGEGROUP,SEX,CASES
,PROVINCE,REGION,AGEGROUP,SEX,
2020-03-22,Liège,Wallonia,40-49,M,7.0
2020-03-22,Liège,Wallonia,50-59,F,8.0
2020-03-22,Liège,Wallonia,50-59,M,9.0
2020-03-22,Liège,Wallonia,60-69,M,2.0
2020-03-22,Liège,Wallonia,70-79,F,4.0
2020-03-22,Liège,Wallonia,70-79,M,3.0
2020-03-22,Liège,Wallonia,80-89,F,4.0
2020-03-22,Liège,Wallonia,80-89,M,1.0
2020-03-22,Liège,Wallonia,90+,F,2.0


In [11]:
%sql
select count(*) from delta.`/mnt/f234dl/processed/covid_processed_daily`

count(1)
5003
