In [1]:
# 0.  Initialization  
# 
# 0.1. Create Cosmos DB container using the RetailDemo, see https://docs.microsoft.com/en-us/azure/cosmos-db/create-notebook-visualize-data
# 0.2. Configure Synapse Line to Cosmos DB, see  https://docs.microsoft.com/en-us/azure/cosmos-db/configure-synapse-link
# 0.3. Add cards.txt set to Storage account adhering to Azure Synapse Analytics
# 0.4. Add requirement.txt with azure-cosmos library to Spark pool (this is needed in last cell when data is upserted to Cosmos DB)

StatementMeta(amlsparklibv4, 17, 1, Finished, Available)



In [2]:
# 1. Create Dataframe from Cosmos DB analytics store HTAP. 
#
# Notice that Spark uses lazy execution and command is only executed when display command. In a production scenario, display is not needed and the data is only fetched using the join
# 
# Read from Cosmos DB analytical store into a Spark DataFrame and display 10 rows from the DataFrame
# To select a preferred list of regions in a multi-region Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfcosmosdb = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "CosmosDbHTAP")\
    .option("spark.cosmos.container", "WebsiteData3")\
    .load()

display(dfcosmosdb.limit(10))

StatementMeta(amlsparklibv4, 17, 2, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4ce5132d-e578-458a-b705-74a5b5bf57e7)

In [3]:
# 2. Create Dataframe from Storage Account 

#%%pyspark
dfcsv = spark.read.csv('abfss://onpremcsv@testsynapserbrstor.dfs.core.windows.net/card1.txt', header='true')
display(dfcsv.limit(10))

StatementMeta(amlsparklibv4, 17, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, 055ea22c-86b5-469b-a0ca-d1b88bb936f7)

In [4]:
# 3. Create join

inner_join = dfcsv.join(dfcosmosdb, dfcsv.cardid == dfcosmosdb.CartID)
display(inner_join)

StatementMeta(amlsparklibv4, 17, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 23ae99dd-7b14-4a3b-8a66-6abe71c2da73)

In [5]:
# 4. Create Python list from Spark Dataframe

update_list = inner_join.select("cardid", "name", "id", "_etag").toPandas().to_dict('records')
print(update_list)

StatementMeta(amlsparklibv4, 17, 5, Finished, Available)

[{'cardid': '5481', 'name': 'Rene', 'id': 'e5c05cb8-e962-4a18-bd21-e5ef659c9732', '_etag': '"0000eb78-0000-0c00-0000-60dc4e040000"'}, {'cardid': '5481', 'name': 'Rene', 'id': '817b9d81-7ba5-4f49-ad1b-f8c7477a47e8', '_etag': '"0000ec78-0000-0c00-0000-60dc4e040000"'}]

In [6]:
# 5. Write data from cards.txt to Cosmos DB. 
#
# Notice that upsert will only succeed if etag was not changed using match_condition=MatchConditions.IfNotModified
# This will happen if this cell is executed twice or more (without fetching the latest etag)
#
# See also https://azuresdkdocs.blob.core.windows.net/$web/python/azure-cosmos/4.0.0b5/azure.cosmos.html
#
from azure.cosmos import exceptions, CosmosClient, PartitionKey
endpoint = "https://<<your url>>.documents.azure.com:443/"
key = '<<your key>>'
database_name ='RetailDemo'
container_name = 'WebsiteData3'

client = CosmosClient(endpoint, key)
database = client.get_database_client(database_name)

database = client.get_database_client(database_name)
container = database.get_container_client(container_name)

# Enumerate the returned items
import json
from azure.core import MatchConditions

#response = container.delete_item("5481", partition_key="5481")

for update_item in update_list:
    for item in container.query_items(
        query='SELECT * FROM WebsiteData3 c where c.CartID = ' + update_item['cardid'] + ' AND c.id = "' + update_item['id'] + '"',
        enable_cross_partition_query=True):
        print(json.dumps(item, indent=True))
        #print(update_item["_etag"])
        #print (item["_etag"])
        item['UserName'] = update_item['name'] + "_updated2"
        response = container.upsert_item(body=item, etag= update_item['_etag'], match_condition=MatchConditions.IfNotModified)

        #print(json.dumps(item, indent=True))

StatementMeta(amlsparklibv4, 17, 6, Finished, Available)

{
 "CartID": 5481,
 "Action": "Added",
 "Item": " Flip Flop Shoes",
 "Price": 14,
 "UserName": "Rene_updated",
 "Country": "Luxembourg",
 "EventDate": "2010-10-13T00:00:00",
 "Year": 1,
 "Latitude": -83.5582,
 "Longitude": 154.5919,
 "Address": "72471 Dee Rue, Lake Jacyntheview, Luxembourg",
 "id": "e5c05cb8-e962-4a18-bd21-e5ef659c9732",
 "_rid": "4MN6AJ3KlgQJAAAAAAAAAA==",
 "_self": "dbs/4MN6AA==/colls/4MN6AJ3KlgQ=/docs/4MN6AJ3KlgQJAAAAAAAAAA==/",
 "_etag": "\"0000eb78-0000-0c00-0000-60dc4e040000\"",
 "_attachments": "attachments/",
 "_ts": 1625050628
}
{
 "CartID": 5481,
 "Action": "Viewed",
 "Item": " Flip Flop Shoes",
 "Price": 14,
 "UserName": "Rene_updated",
 "Country": "Luxembourg",
 "EventDate": "0001-01-01T00:00:00",
 "Year": 1,
 "Latitude": -83.5582,
 "Longitude": 154.5919,
 "Address": "72471 Dee Rue, Lake Jacyntheview, Luxembourg",
 "id": "817b9d81-7ba5-4f49-ad1b-f8c7477a47e8",
 "_rid": "4MN6AJ3KlgQKAAAAAAAAAA==",
 "_self": "dbs/4MN6AA==/colls/4MN6AJ3KlgQ=/docs/4MN6AJ3KlgQKA