In [1]:
from pprint import pprint
import json

import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.documents as documents
import azure.cosmos.errors as errors

In [2]:
COSMOSDB_ENDPOINT = '{AZURE_COSMOS_DB_ENDPOINT}'
COSMOSDB_PRIMARY_KEY = '{AZURE_COSMOS_DB_PRIMARY_KEY}'
COSMOSDB_DB_ID = 'FinancialDatabase'
COSMOSDB_COLL_ID = 'TransactionsCollection'

client = cosmos_client.CosmosClient(COSMOSDB_ENDPOINT, { 'masterKey': COSMOSDB_PRIMARY_KEY })
database_link = f'dbs/{COSMOSDB_DB_ID}'
partition_key = '/costCenter'
throughput = 10000
collection_link = f'{database_link}/colls/{COSMOSDB_COLL_ID}'

In [3]:
def create_database(id):
    print('Create database')

    try:
        client.CreateDatabase({"id": id})
        print(f'Database with id "{id}" created')

    except errors.HTTPFailure as e:
        if e.status_code == 409:
           print(f'A database with id "{id}" already exists')
        else: 
            raise

In [4]:
create_database(COSMOSDB_DB_ID)

Create database
A database with id "FinancialDatabase" already exists


In [5]:
def create_container(id, throughput, partition_key):
    try:
        coll = {
            "id": id,
            "partitionKey": {
                "paths": [
                    partition_key
                ],
                "kind": "Hash",
                "version": 2
            }
        }
        collection_options = { 'offerThroughput': throughput }
        collection = client.CreateContainer(database_link, coll, collection_options)

        print(f'Collection with id "{id}" created')
        print(f'Partition Key - "{partition_key}"')

    except errors.CosmosError as e:
        if e.status_code == 409:
            print(f'A collection with id "{id}" already exists')
        else:
            raise

In [6]:
create_container(COSMOSDB_COLL_ID, throughput, partition_key)

A collection with id "TransactionsCollection" already exists


In [7]:
# Import transaction data with 'automotive' cost center

json_file = open('../data/automotive_transactions.json')
transactions_json = json.load(json_file)
json_file.close()

print(len(transactions_json))
pprint(transactions_json[0:5])

2400
[{'amount': 188.13,
  'costCenter': 'automotive',
  'paidBy': 'fritz.waters',
  'processed': True},
 {'amount': 115.37,
  'costCenter': 'automotive',
  'paidBy': 'hermann.hermann',
  'processed': True},
 {'amount': 177.97,
  'costCenter': 'automotive',
  'paidBy': 'lou.ledner',
  'processed': True},
 {'amount': 6.19,
  'costCenter': 'automotive',
  'paidBy': 'garnett.ullrich',
  'processed': False},
 {'amount': 234.81,
  'costCenter': 'automotive',
  'paidBy': 'sherwood.kozey',
  'processed': True}]


In [8]:
# This stored procedure uploads an array of documents in one batch. If the
# entire batch is not completed, the stored procedure will set the response
# body to the number of documents that were imported. Your client-side code
# is expected to call this stored procedure multiple times until all documents
# are imported.

sproc_definition = {
    'id': 'bulkUpload',
    'body': '''
         function bulkUpload(docs) {
             var collection = getContext().getCollection();
             var collectionLink = collection.getSelfLink();
             var count = 0;
            if (typeof docs == "string") {
                docs = JSON.parse(docs);
            }
             if (!docs) throw new Error("The array is undefined or null.");
             var docsLength = docs.length;
             if (docsLength == 0) {
                 getContext().getResponse().setBody(0);
                 return;
             }
             tryCreate(docs[count], callback);
             function tryCreate(doc, callback) {
                 var isAccepted = collection.createDocument(collectionLink, doc, callback);
                 if (!isAccepted) getContext().getResponse().setBody(count);
             }
             function callback(err, doc, options) {
                 if (err) throw err;
                 count++;
                 if (count >= docsLength) {
                     getContext().getResponse().setBody(count);
                 } else {
                     tryCreate(docs[count], callback);
                 }
             }
         }
    '''
}

bulk_upload_sproc = client.CreateStoredProcedure(collection_link, sproc_definition)

In [9]:
# You should see the number '2400' returned as the response body. This
# represents the last index of the uploaded items. If for any reason the 
# stored procedure fails during its bulk upload process, the client can 
# implement logic to pick up the bulk upload operation from the last succeed
# index onward. 

sproc_result = client.ExecuteStoredProcedure(bulk_upload_sproc['_self'],
                                             json.dumps(transactions_json),
                                             {'partitionKey':'automotive'})

pprint(sproc_result)

2400


In [10]:
def query_documents(collection_link, query, options={ "enableCrossPartitionQuery": True }):
    try:
        results = list(client.QueryItems(collection_link, query, options))
        return results

    except errors.HTTPFailure as e:
        if e.status_code == 404:
            print("Document doesn't exist")
        elif e.status_code == 400:
            # Can occur when we are trying to query on excluded paths
            print("Bad Request exception occured: ", e)
            pass
        else:
            raise

    finally:
        print()

In [11]:
# Querying for any automotive cost center transactions in which the amount is
# less than $10. There should be 34 results returned.

query = '''
SELECT * 
FROM transaction t 
WHERE t.costCenter = 'automotive' 
    AND t.amount <= 10
'''
query_results = query_documents(collection_link, query)

print(f'Length of query results: {len(query_results)}')
pprint(query_results[0])


Length of query results: 33
{'_attachments': 'attachments/',
 '_etag': '"04009116-0000-0100-0000-5e347c2d0000"',
 '_rid': '2bNRAM1NXPUEAAAAAAAACA==',
 '_self': 'dbs/2bNRAA==/colls/2bNRAM1NXPU=/docs/2bNRAM1NXPUEAAAAAAAACA==/',
 '_ts': 1580497965,
 'amount': 6.19,
 'costCenter': 'automotive',
 'id': '1d26ee9a-6b23-47d6-a119-840914c2a911',
 'paidBy': 'garnett.ullrich',
 'processed': False}


In [12]:
# This stored procedure iterates through all documents that match a specific
# query and deletes the documents. If the stored procedure is unable to delete
# all documents, it will return a continuation token. Your client-side code is
# expected to repeatedly call the stored procedure passing in a continuation
# token until the stored procedure does not return a continuation token.

sproc_definition = {
    'id': 'bulkDelete',
    'body': '''
         function bulkDelete(query) {
             var collection = getContext().getCollection();
             var collectionLink = collection.getSelfLink();
             var response = getContext().getResponse();
             var responseBody = {
                 deleted: 0,
                 continuation: true
             };
             if (!query) throw new Error("The query is undefined or null.");
             tryQueryAndDelete();
             function tryQueryAndDelete(continuation) {
                 var requestOptions = {continuation: continuation};
                 var isAccepted = collection.queryDocuments(collectionLink, query, requestOptions, function (err, retrievedDocs, responseOptions) {
                     if (err) throw err;
                     if (retrievedDocs.length > 0) {
                         tryDelete(retrievedDocs);
                     } else if (responseOptions.continuation) {
                         tryQueryAndDelete(responseOptions.continuation);
                     } else {
                         responseBody.continuation = false;
                         response.setBody(responseBody);
                     }
                 });
                 if (!isAccepted) {
                     response.setBody(responseBody);
                 }
             }
             function tryDelete(documents) {
                 if (documents.length > 0) {
                     var isAccepted = collection.deleteDocument(documents[0]._self, {}, function (err, responseOptions) {
                         if (err) throw err;
                         responseBody.deleted++;
                         documents.shift();
                         tryDelete(documents);
                     });
                     if (!isAccepted) {
                         response.setBody(responseBody);
                     }
                 } else {
                     tryQueryAndDelete();
                 }
             }
         }
    '''
}

bulk_delete_sproc = client.CreateStoredProcedure(collection_link, sproc_definition)

In [13]:
# Observe the continuation token and the number of deleted documents in the 
# response. The client has to implement logic to handle continuation when the 
# continuation token is 'True'.

query = '''
SELECT * 
FROM transaction t 
WHERE t.costCenter = 'automotive' 
    AND t.amount <= 10
'''

sproc_result = client.ExecuteStoredProcedure(bulk_delete_sproc['_self'],
                                             query,
                                             {'partitionKey':'automotive'})

pprint(sproc_result)

{'continuation': False, 'deleted': 33}


In [14]:
# Querying for any automotive cost center transactions in which the amount is
# less than $10. After successful execution of bulk delete, here should be 0
# result returned.

query = '''
SELECT * 
FROM transaction t 
WHERE t.costCenter = 'automotive' 
    AND t.amount <= 10
'''
query_results = query_documents(collection_link, query)

print(f'Length of query results: {len(query_results)}')


Length of query results: 0


In [15]:
def delete_database(id):
    print('Delete Database')

    try:
       database_link = 'dbs/' + id
       client.DeleteDatabase(database_link)

       print(f'Database with id "{id}" was deleted')

    except errors.HTTPFailure as e:
        if e.status_code == 404:
           print('A database with id "{id}" does not exist')
        else: 
            raise

In [16]:
# Clean up
delete_database(COSMOSDB_DB_ID)

Delete Database
Database with id "FinancialDatabase" was deleted
