In [0]:
import delta
import time
from datetime import timedelta

#### Setting up cosmos db connection parameters

In [0]:
cosmos_endpoint = dbutils.secrets.get('cosmosmasterkey', 'cosmosdbendpoint')
cosmos_masterkey = dbutils.secrets.get('cosmosmasterkey', 'cosmosmasterkey')
cosmos_databasename = "nasdaq-stocks-oltp"
cosmos_containername = "nasdaq-stocks-rltm"

#### Initial extraction of delta table

In [0]:
# cosmos_read_config = {
#   "spark.cosmos.accountEndpoint": cosmos_endpoint,
#   "spark.cosmos.accountKey": cosmos_masterkey,
#   "spark.cosmos.database": cosmos_databasename,
#   "spark.cosmos.container": cosmos_containername,
#   "spark.cosmos.read.inferSchema.enabled" : "true",
#   "spark.cosmos.write.strategy": "ItemOverwrite"
# }

# cosmos_df = spark.read.format("cosmos.oltp").options(**cosmos_read_config).load()
# cosmos_df.write.mode("overwrite").saveAsTable("STOCK_PULL")

#### Run/Merge Change Feed Into Delta Table 

In [0]:
#using creation timestamp to consider for starting point of change data feed
#spark streaming will control checkpoint automatically and will only request changes since last extraction timestamp
table_creation_timestmap = (delta.DeltaTable.forName(spark,"STOCK_PULL").history().collect()[0]["timestamp"] - timedelta(hours=1)).isoformat() + "Z"

In [0]:
cosmos_change_feed_config = {
  "spark.cosmos.accountEndpoint": cosmos_endpoint,
  "spark.cosmos.accountKey": cosmos_masterkey,
  "spark.cosmos.database": cosmos_databasename,
  "spark.cosmos.container": cosmos_containername,
  "spark.cosmos.read.partitioning.strategy": "Default",
  "spark.cosmos.read.inferSchema.enabled" : "true",
  "spark.cosmos.changeFeed.startFrom" : table_creation_timestmap,
  "spark.cosmos.changeFeed.mode" : "Incremental",
}

df_change_feed = (
  spark
  .readStream
  .format("cosmos.oltp.changeFeed")
  .options(**cosmos_change_feed_config)
  .load()
)

def merge_delta(incremental, target): 
  incremental.createOrReplaceTempView("incremental")
  
  incremental._jdf.sparkSession().sql(f"""
    MERGE INTO {target} tgt
    USING incremental i
    ON (i.StRecievdTimeStamp=tgt.StRecievdTimeStamp AND i.ticker = tgt.ticker)
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
 
streaming_output = (
  df_change_feed
  .writeStream
  .trigger(availableNow=True) #it can be changed to run in streaming mode instead of batch
  .format('delta')
  .outputMode("update")
  .option("checkpointLocation", "/cosmos/checkpoint_changefeed_stocks_table")
  .foreachBatch(lambda i, b: merge_delta(i, "STOCK_PULL"))
  .start()
)

for s in spark.streams.active:
  while s.isActive:
    print('waiting for trigger once to finish')
    time.sleep(1)
    
if streaming_output.lastProgress:
  print(f"{streaming_output.lastProgress['numInputRows']} rows read")
else:
  print("no changes in cosmos since last execution")

waiting for trigger once to finish
waiting for trigger once to finish
waiting for trigger once to finish
waiting for trigger once to finish
70 rows read
