## 3.1 Spark - Cosmos Write and Update from 

https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3_2-12/Samples/Python/NYC-Taxi-Data/01_Batch.ipynb

This IDs there are dynamically set every time, so updates will fail, updated code:

```
df_input_withId = df_rawInput \
  .withColumn("id", f.expr("uuid()")) \
  ```

  I take 100 records and change the vendorId to be 0 and update those records

In [295]:
cosmosEndpoint = 'https://mmh.documents.azure.com:443/'
cosmosMasterKey = 'MyMasterKey=='


StatementMeta(pool02, 3, 8, Finished, Available)

In [296]:
import uuid
import pyspark.sql.functions as f
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.views.repositoryPath", "/viewDefinitions" + str(uuid.uuid4()))

StatementMeta(pool02, 3, 9, Finished, Available)

In [297]:
%%sql
CREATE DATABASE IF NOT EXISTS cosmosCatalog.SampleDatabase;

CREATE TABLE IF NOT EXISTS cosmosCatalog.SampleDatabase.GreenTaxiRecords
USING cosmos.oltp
TBLPROPERTIES(partitionKeyPath = '/id', autoScaleMaxThroughput = '100000', indexingPolicy = 'OnlySystemProperties');

CREATE TABLE IF NOT EXISTS cosmosCatalog.SampleDatabase.GreenTaxiRecordsCFSink
USING cosmos.oltp
TBLPROPERTIES(partitionKeyPath = '/id', autoScaleMaxThroughput = '100000', indexingPolicy = 'OnlySystemProperties');

/* NOTE: It is important to enable TTL (can be off/-1 by default) on the throughput control container */
CREATE TABLE IF NOT EXISTS cosmosCatalog.SampleDatabase.ThroughputControl
USING cosmos.oltp
OPTIONS(spark.cosmos.database = 'SampleDatabase')
TBLPROPERTIES(partitionKeyPath = '/groupId', autoScaleMaxThroughput = '4000', indexingPolicy = 'AllProperties', defaultTtlInSeconds = '-1');

StatementMeta(, 3, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [298]:
import datetime
import time
import uuid
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, LongType
import pyspark.sql.functions as f

print("Starting preparation: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))
# Azure storage access info
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "green"
blob_sas_token = r""
# Allow SPARK to read from Blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set(
  'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
  blob_sas_token)
print('Remote blob path: ' + wasbs_path)
# SPARK read parquet, note that it won't load any data yet by now
# NOTE - if you want to experiment with larger dataset sizes - consider switching to Option B (commenting code 
# for Option A/uncommenting code for option B) the lines below or increase the value passed into the 
# limit function restricting the dataset size below

#------------------------------------------------------------------------------------
# Option A - with limited dataset size
#------------------------------------------------------------------------------------
df_rawInputWithoutLimit = spark.read.parquet(wasbs_path)
partitionCount = df_rawInputWithoutLimit.rdd.getNumPartitions()
df_rawInput = df_rawInputWithoutLimit.limit(1_000_000).repartition(partitionCount) #1_000_000
df_rawInput.persist()

#------------------------------------------------------------------------------------
# Option B - entire dataset
#------------------------------------------------------------------------------------
#df_rawInput = spark.read.parquet(wasbs_path)


nowUdf= udf(lambda : int(time.time() * 1000),LongType())
df_input_withId = df_rawInput \
  .withColumn("id", f.expr("uuid()")) \
  .withColumn("insertedAt", nowUdf()) \

print('Register the DataFrame as a SQL temporary view: source')
df_input_withId.createOrReplaceTempView('source')
print("Finished preparation: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))

df = df_input_withId

StatementMeta(pool02, 3, 14, Finished, Available)

Starting preparation:  2022-05-04 22:29:26.325303
Remote blob path: wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/green
Register the DataFrame as a SQL temporary view: source
Finished preparation:  2022-05-04 22:29:32.132500

## Write Configuration - bulk.enabled with ItemOverwrite

In [299]:
import uuid
import datetime

print("Starting ingestion: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))

writeCfg = {
  "spark.cosmos.accountEndpoint": cosmosEndpoint,
  "spark.cosmos.accountKey": cosmosMasterKey,
  "spark.cosmos.database": "SampleDatabase",
  "spark.cosmos.container": "GreenTaxiRecords",
  "spark.cosmos.write.strategy": "ItemOverwrite",
  "spark.cosmos.write.bulk.enabled": "true",
  "spark.cosmos.throughputControl.enabled": "true",
  "spark.cosmos.throughputControl.name": "NYCGreenTaxiDataIngestion",
  "spark.cosmos.throughputControl.targetThroughputThreshold": "0.95",
  "spark.cosmos.throughputControl.globalControl.database": "SampleDatabase",
  "spark.cosmos.throughputControl.globalControl.container": "ThroughputControl",
}


df_input_withId \
  .write \
  .format("cosmos.oltp") \
  .mode("Append") \
  .options(**writeCfg) \
  .save()

print("Finished ingestion: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))

StatementMeta(pool02, 3, 15, Finished, Available)

Starting ingestion:  2022-05-04 22:29:39.166637
Finished ingestion:  2022-05-04 22:31:55.436754

In [300]:
count_source = spark.sql('SELECT * FROM source').count()
print("Number of records in source: ", count_source) 

StatementMeta(pool02, 3, 16, Finished, Available)

Number of records in source:  1000000

Read from Cosmos

## Read from Cosmos

In [301]:
from pyspark.sql.types import *
import pyspark.sql.functions as F

print("Starting validation via query: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))
readCfg = {
  "spark.cosmos.accountEndpoint": cosmosEndpoint,
  "spark.cosmos.accountKey": cosmosMasterKey,
  "spark.cosmos.database": "SampleDatabase",
  "spark.cosmos.container": "GreenTaxiRecords",
  "spark.cosmos.read.partitioning.strategy": "Restrictive",#IMPORTANT - any other partitioning strategy will result in indexing not being use to count - so latency and RU would spike up
  "spark.cosmos.read.inferSchema.enabled" : "true",
  "spark.cosmos.read.customQuery" : "SELECT COUNT(0) AS Count FROM c"
}

count_query_schema=StructType(fields=[StructField("Count", LongType(), True)])
query_df = spark.read.format("cosmos.oltp").schema(count_query_schema).options(**readCfg).load()
count_query = query_df.select(F.sum("Count").alias("TotalCount")).first()["TotalCount"]
print("Number of records retrieved via query: ", count_query) 
print("Finished validation via query: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))

assert count_source == count_query

StatementMeta(pool02, 3, 17, Finished, Available)

Starting validation via query:  2022-05-04 22:33:45.698941
Number of records retrieved via query:  1000000
Finished validation via query:  2022-05-04 22:33:50.464621

In [302]:
import math

print("identify documents: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))
readCfgwithRaw = {
  "spark.cosmos.accountEndpoint": cosmosEndpoint,
  "spark.cosmos.accountKey": cosmosMasterKey,
  "spark.cosmos.database": "SampleDatabase",
  "spark.cosmos.container": "GreenTaxiRecords",
  "spark.cosmos.read.partitioning.strategy": "Default",
  "spark.cosmos.read.inferSchema.enabled" : "true",
}

cosmos_df = spark.read.format("cosmos.oltp").options(**readCfgwithRaw).load()
print(cosmos_df.count())


StatementMeta(pool02, 3, 18, Finished, Available)

identify documents:  2022-05-04 22:33:51.128751
1000000

In [303]:
df_to_update = df.limit(100)

StatementMeta(pool02, 3, 19, Finished, Available)

In [304]:
df_to_update = df_to_update.withColumn('vendorId', f.lit(0))

StatementMeta(pool02, 3, 20, Finished, Available)

In [305]:


print("Number of records to be updated: ", df.count()) 

print("Starting to bulk update documents: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))
UpdateCfg = writeCfg.copy()

df_to_update \
        .write \
        .format("cosmos.oltp") \
        .mode("Append") \
        .options(**UpdateCfg) \
        .save()
print("Finished updating documents: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))

print("Starting count validation via query: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))
count_query_schema=StructType(fields=[StructField("Count", LongType(), True)])
readCfg["spark.cosmos.read.customQuery"] = "SELECT COUNT(0) AS Count FROM c"
query_df = spark.read.format("cosmos.oltp").schema(count_query_schema).options(**readCfg).load()
count_query = query_df.select(F.sum("Count").alias("TotalCount")).first()["TotalCount"]
print("Number of records retrieved via query: ", count_query) 
print("Finished count validation via query: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))

print(count_query)

StatementMeta(pool02, 3, 21, Finished, Available)

Number of records to be updated:  1000000
Starting to bulk update documents:  2022-05-04 22:34:49.854036
Finished updating documents:  2022-05-04 22:34:53.930036
Starting count validation via query:  2022-05-04 22:34:53.930300
Number of records retrieved via query:  1000000
Finished count validation via query:  2022-05-04 22:34:54.691137
1000000

## Check where your vendor ID is 0, there should be 100 rows

In [306]:
print("Starting count validation via query: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))
count_query_schema=StructType(fields=[StructField("Count", LongType(), True)])
readCfg["spark.cosmos.read.customQuery"] = "SELECT COUNT(0) AS Count FROM c where c.vendorId = 0"
query_df = spark.read.format("cosmos.oltp").schema(count_query_schema).options(**readCfg).load()
count_query = query_df.select(F.sum("Count").alias("TotalCount")).first()["TotalCount"]
print("Number of records retrieved via query: ", count_query) 
print("Finished count validation via query: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))

print(count_query)

StatementMeta(pool02, 3, 22, Finished, Available)

Starting count validation via query:  2022-05-04 22:34:58.486932
Number of records retrieved via query:  100
Finished count validation via query:  2022-05-04 22:34:59.486427
100