In [None]:
from pyspark.sql.functions import split, col

cosmosEndpoint = "https://{Cosmos Account Name}.documents.azure.com:443/"
cosmosMasterKey = "{Cosmos Primary Key}"
cosmosDatabaseName = "ContosoMobile"
cosmosContainerName =  "CallRecords"
cosmosDestContainerName = "UnBilledCallRecords"

In [None]:
checkpointLocation = "/tmp/unbilled_checkpoint"
startOffsetLocation = checkpointLocation + "/startOffset/0"
startOffsetBackupLocation = checkpointLocation + "/startOffset/0.bak"
latestOffsetLocation = checkpointLocation + "/latestOffset/0"

changeFeedCfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
  "spark.cosmos.read.inferSchema.enabled" : "true",   
  "spark.cosmos.changeFeed.startFrom" : "Now",
  "spark.cosmos.changeFeed.mode" : "Incremental",
  "spark.cosmos.read.partitioning.strategy": "Restrictive",
  "spark.cosmos.partitioning.targetedCount": "1",
  "spark.cosmos.changeFeed.itemCountPerTriggerHint" : "100000",
  "spark.cosmos.changeFeed.batchCheckpointLocation" : checkpointLocation
  # optional configuration for throughput control
  # "spark.cosmos.throughputControl.enabled" : "true",
  # "spark.cosmos.throughputControl.name" : "SourceContainerThroughputControl",
  # "spark.cosmos.throughputControl.targetThroughputThreshold" : "0.30", 
  # "spark.cosmos.throughputControl.globalControl.database" : "database-v4", 
  # "spark.cosmos.throughputControl.globalControl.container" : "ThroughputControl"
}

# patch increment operation
cfgIncrement_Incoming = {"spark.cosmos.accountEndpoint": cosmosEndpoint,
            "spark.cosmos.accountKey": cosmosMasterKey,
            "spark.cosmos.database": cosmosDatabaseName,
            "spark.cosmos.container": cosmosDestContainerName,
            "spark.cosmos.write.strategy": "ItemPatch",
            "spark.cosmos.write.bulk.enabled": "true",
            "spark.cosmos.write.patch.columnConfigs": "[col(incoming).op(increment),col(outgoing).op(increment),col(cnt).op(increment)]",
            
            }


In [None]:
from notebookutils import mssparkutils

try:
  mssparkutils.fs.ls(checkpointLocation)
except Exception:
  print("Checkpoint location doesn't exist yet.")    

In [None]:
import time
import datetime
from notebookutils import mssparkutils
from pyspark.sql import functions as F
from pyspark.sql.types import *

sourceRecordCount = 0
targetRecordCount = 0
emptyCount = 0
maxEmptyBeforeStopListening = 3
while (emptyCount < maxEmptyBeforeStopListening):
  changeFeed_DF = spark.read.format("cosmos.oltp.changefeed") \
      .options(**changeFeedCfg) \
      .load() 
  changeFeed_DF.persist()
  sourceRecordCount = changeFeed_DF.count() 
  print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"), ": Retrieved " + str(sourceRecordCount) + " records from change feed.")
  if (sourceRecordCount == 0):
    emptyCount += 1
    sleepTime = emptyCount * 5
    if (emptyCount < maxEmptyBeforeStopListening):
      print("Sleeping " + str(sleepTime) + " seconds...")
      time.sleep(sleepTime)
  else:
    emptyCount = 0    

    changeFeed_transformed_DF = changeFeed_DF \
      .withColumn('outgoingTemp',
              F.when(F.col('IsIncoming') == 0, F.col('DurationSec') *1).otherwise(
                F.when(F.col('IsIncoming') == 1, F.col('DurationSec') * 0))) \
      .withColumn('incomingTemp',
              F.when(F.col('IsIncoming') == 1, F.col('DurationSec') *1).otherwise(
                F.when(F.col('IsIncoming') == 0, F.col('DurationSec') * 0))) \
      .drop("id") \
      .withColumnRenamed('SubscriberId', 'id') \
      .groupBy("id") \
      .agg(F.sum("incomingTemp").alias("incoming"),F.sum("outgoingTemp").alias("outgoing"),F.count("*").alias("cnt"))

    targetRecordCount=changeFeed_transformed_DF.count()
    changeFeed_transformed_DF \
        .write \
        .format("cosmos.oltp") \
        .mode("Append") \
        .options(**cfgIncrement_Incoming) \
        .save()

    print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"), ": Processed " + str(targetRecordCount) + " subscriber records...")    

  print("Updating checkpoints...")
  # Start offset (and backup) might not exist yet
  try:
    mssparkutils.fs.rm(startOffsetBackupLocation, False)
  except Exception:
    print("StartOffset backup file doesn't exist yet.")  

  try:    
    mssparkutils.fs.cp(startOffsetLocation, startOffsetBackupLocation, False)
  except Exception:
    print("StartOffset doesn't exist.")    

  mssparkutils.fs.mv(latestOffsetLocation, startOffsetLocation, True, True)
  
  print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"), ": Processed records and updated checkpoint.")

  changeFeed_DF.unpersist()  

print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"), ": STOPPED LISTENING FOR MORE CHANGES!")  