<b>Mount a Blob storage container </b>

In [None]:
dbutils.widgets.text("inputPath", "","")
input_path = dbutils.widgets.get("inputPath")

 
mnt_path = "/mnt/raw/calendars"       
delta_table_path = "/mnt/delta/calendars"

if any(mount.mountPoint == mnt_path for mount in dbutils.fs.mounts()):
  dbutils.fs.unmount(mnt_path)

dbutils.fs.mount(
  source = "wasbs://demo@demoStorage.blob.core.windows.net",
  mount_point = mnt_path,
  extra_configs = {"fs.azure.sas.demo.demoStorage.blob.core.windows.net":dbutils.secrets.get(scope = "demoBlogStorageScope", key = "blobStorageSAS")}
)

<b>Transform calendar view data</b>

In [None]:
import re
from pyspark.sql.functions import col, udf
 
def exists(obj, chain):
    _key = chain.pop(0)
    if _key in obj:
        return exists(obj[_key], chain) if chain else obj[_key]
    return None

def extractEmails(attendees):
  if attendees and len(attendees)>0:
    return ",".join([x for x in [exists(attendee, ["emailAddress","address"]) for attendee in attendees] if x is not None])
  return ""
  
def extractUTCDate(date_col):
  return exists(date_col, ["dateTime"])
  

def getLatestTableVersion():
  df_version =  spark.sql(f"SELECT max(version) as lastVersion  FROM (DESCRIBE HISTORY delta.`{delta_table_path}`)")
  return df_version.head()[0]


getEmailsUDF = udf(lambda z: extractEmails(z))
getDateUDF = udf(lambda z: extractUTCDate(z))



In [None]:
df = spark.read.json(f"{mnt_path}/{input_path}")


df_calendar = df.select(getEmailsUDF(col("attendees")).alias("emails"),\
                        getDateUDF(col("start")).alias("startDatetime"), \
                        getDateUDF(col("end")).alias("endDatetime"), \
                        col("iCalUId"),
                        col("isCancelled"))\
                       .dropDuplicates()
df_calendar.show(5, False)


<b>Create delta table (SCD Type 1) with Change Data Feed enabled, then merge new extracted data to delta table</b>

In [None]:
from delta.tables import *

if DeltaTable.isDeltaTable(spark, delta_table_path) == False:
  spark.sql(f"CREATE TABLE delta.`{delta_table_path}` (iCalUId STRING, emails STRING, startDatetime STRING, endDatetime STRING, isCancelled Boolean)  TBLPROPERTIES (delta.enableChangeDataFeed = true)")
  
deltaTable = DeltaTable.forPath(spark, delta_table_path)  
deltaTable.alias("calendar").merge(df_calendar.alias("updates"),"calendar.iCalUId = updates.iCalUId")\
                            .whenMatchedDelete(condition="updates.isCancelled = true")\
                            .whenMatchedUpdateAll(condition="calendar.emails <> updates.emails OR calendar.startDatetime <> updates.startDatetime OR calendar.endDatetime <> updates.endDatetime")\
                            .whenNotMatchedInsertAll(condition="updates.isCancelled = false")\
                            .execute()

In [None]:
df_changefeed = spark.sql(f"SELECT * FROM table_changes_by_path('{delta_table_path}', {getLatestTableVersion()})")
df_changefeed.show(5)


<b>Configure AWS S3 connection, and load chang feed data to s3 bucket</b>

In [None]:
AWS_REGION="ap-southeast-2"
AWS_ACCESS_KEY = dbutils.secrets.get(scope = "demoBlogStorageScope", key = "aws-access-key")
AWS_SECRET_KEY = dbutils.secrets.get(scope = "demoBlogStorageScope", key = "aws-secret-key")

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", AWS_SECRET_KEY)

In [None]:
if df_changefeed.first() is not None:
  df_changefeed.repartition(1).write.csv("s3n://officd365.calendarView/update")
else:
  print ("No calendar event loaded")