### Geospatial data with Cosmos DB
Azure Cosmos DB Geospatial example <br/>
https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/geospatial-intro

In [None]:
# Install required libraries
%pip install azure-cosmos
%pip install faker

# import azure.cosmos
# print(azure.cosmos.__version__)

In [1]:
# Initialization and config
from azure.cosmos import CosmosClient, PartitionKey
from configparser import ConfigParser
from faker import Faker

import os
import json
import uuid

# Assuming structure Notebooks/Models
parser = ConfigParser()
parser.read('../NotebookConfig.cfg')

cosmosAccountURI = parser.get('CosmosDB', 'COSMOSDB_ACCOUNT_URI')
cosmosAccountKey = parser.get('CosmosDB', 'COSMOSDB_ACCOUNT_KEY')

databaseName = 'Learn'
containerName = 'GeoJSON'
partitionKeypath = '/PartitionKey'
osPath = './OutputFiles/'

Faker.seed(0)
fake = Faker(['en-US'])

if not os.path.exists(osPath):
    os.mkdir(osPath)

In [2]:
client = CosmosClient(cosmosAccountURI, cosmosAccountKey)
db = client.create_database_if_not_exists(databaseName)

pkPath = PartitionKey(path=partitionKeypath)
ctr = db.create_container_if_not_exists(id=containerName, partition_key=pkPath, offer_throughput=1000)

In [8]:
# Reference list for documents that should have consistent values across cell/operations.
from collections import OrderedDict
maxRange = 10000
IOTSources = []

os.makedirs(os.path.dirname('./OutputFiles/'), exist_ok=True)
with open('./OutputFiles/' + containerName + '_referenceData.json', 'w') as jsonFile:
    for i in range(maxRange):
        entity = {            
            'Name': fake.bothify('????_############')
            , 'Type': fake.random_element(elements=('Type1', 'Type2', 'Type3', 'Type4', 'Type5', 'Type6'))
        }
        IOTSources.append(entity)

        # Save patients for reference
        json.dump(entity, jsonFile)
        if (i < maxRange):
            jsonFile.write(',\n')

In [5]:
# Load 100K documents into container
# Random locations based on geospatial coordiates around latitude 40.xxxxx and longitude -74.xxxxx

from datetime import datetime, timedelta

RUCharges = []
daysRange = 10
iotRange = 10000

dtBase = datetime(year=2022, month=12, day=1)

for iot in range(iotRange):
    IOTSrc = IOTSources[iot]

    for day in range(daysRange):
        docs = []        
        readings = []

        # *** Produce 10 readings - 1 for each minute
        for m in range(1):
            readings.append(
                {
                    'Dimension': fake.random_element(elements=('D1', 'D2', 'D3', 'D4', 'D5', 'D6', 'D7', 'D8', 'D9'))
                    , 'Metric': fake.random_number(digits=5)
                    , 'Timestamp': (dtBase + timedelta(days=day,minutes=m)).isoformat()  # fake.date_time_this_year().isoformat()
                })

        doc = {
            'id': str(uuid.uuid4())
            , 'PartitionKey': IOTSrc['Name'] + '_' + IOTSrc['Type'] + '_' + (dtBase + timedelta(days=day)).strftime('%Y_%m_%d')
            , 'Name': IOTSrc['Name']
            , 'Type': IOTSrc['Type']            
            , 'Entity': IOTSrc
            , 'Location': {
               "type":"Point",
                "coordinates":[ float(fake.numerify(text='40.#####')), float(fake.numerify(text='-74.#####')) ]     
            }
            , 'DateTime': (dtBase + timedelta(days=day)).isoformat()
            , 'Timestamp': (dtBase + timedelta(days=day)).timestamp()
            , 'Readings': readings
            , 'class': fake.random_element(elements=OrderedDict([("A", 0.40), ("B", 0.35), ("C", 0.15), ("D", 0.05), ("E", 0.05)]))
        }

        ctr.create_item(doc)
        RUCharges.append(float(ctr.client_connection.last_response_headers['x-ms-request-charge']))
        # print(ctr.client_connection.last_response_headers['x-ms-request-charge'])

In [6]:
# Average RU for each insert operation: 12.569
print('Average RU cost: ' + str(sum(RUCharges) / len(RUCharges)))

Average RU cost: 12.569999999991982


In [7]:
# Create one document close to the polygon that will be queried 
# Others may exist from the 100K randomized documents

IOTSrc = IOTSources[fake.random_int(min=0, max=9999)]
dtBase = datetime(year=2022, month=12, day=15)
docs = []        
readings = []

doc = {
    'id': str(uuid.uuid4())
    , 'PartitionKey': IOTSrc['Name'] + '_' + IOTSrc['Type'] + '_' + (dtBase + timedelta(days=day)).strftime('%Y_%m_%d')
    , 'Name': IOTSrc['Name']
    , 'Type': IOTSrc['Type']            
    , 'Entity': IOTSrc
    , 'Location': {
        "type":"Point",
        "coordinates":[ float(fake.numerify(text='40.80###')), float(fake.numerify(text='-73.96500')) ]     
    }
    , 'DateTime': (dtBase + timedelta(days=day)).isoformat()
    , 'Timestamp': (dtBase + timedelta(days=day)).timestamp()
    , 'Readings': {
            'Dimension': fake.random_element(elements=('D1', 'D2', 'D3', 'D4', 'D5', 'D6', 'D7', 'D8', 'D9'))
            , 'Metric': fake.random_number(digits=5)
            , 'Timestamp': fake.date_time_this_year().isoformat()
    }
    , 'class': fake.random_element(elements=OrderedDict([("A", 0.40), ("B", 0.35), ("C", 0.15), ("D", 0.05), ("E", 0.05)]))
}

print(doc)
ctr.create_item(doc)
print(ctr.client_connection.last_response_headers['x-ms-request-charge'])

{'id': '7db61c58-636b-46dd-8fcc-39e2b2a8bb57', 'PartitionKey': 'Nkbk_080864497860_Type5_2022_12_24', 'Name': 'Nkbk_080864497860', 'Type': 'Type5', 'Entity': {'Name': 'Nkbk_080864497860', 'Type': 'Type5'}, 'Location': {'type': 'Point', 'coordinates': [40.80183, -73.965]}, 'DateTime': '2022-12-24T00:00:00', 'Timestamp': 1671861600.0, 'Readings': {'Dimension': 'D8', 'Metric': 59475, 'Timestamp': '2022-07-04T22:49:38'}, 'class': 'C'}
12.19


In [None]:
# Sample queries that can be executed to validate number of documents in the container
#   and if geospatial polygon is valid

# SELECT COUNT(c.id) FROM c

# SELECT ST_ISVALIDDETAILED ({
#     'type':'Polygon',
#     'coordinates': [[[40.808023, -73.966375], [40.806159, -73.961795], [40.799321, -73.966796], [40.801040, -73.970726], [40.808023, -73.966375]]]
# })

### RU cost comparison, before and after geospatial indexing

In [3]:
import sys, logging

# Create a logger for the 'azure' SDK
logger = logging.getLogger('azure')
logger.setLevel(logging.DEBUG)

handler = logging.FileHandler(filename=osPath+'corelog.log')
logger.addHandler(handler)

In [4]:
# Execute a geospatial query
for item in ctr.query_items(query="SELECT c.id, c.location FROM c WHERE ST_WITHIN(c.Location, { 'type':'Polygon', 'coordinates': [[[40.808023, -73.966375], [40.806159, -73.961795], [40.799321, -73.966796], [40.801040, -73.970726], [40.808023, -73.966375]]]})" \
    , enable_cross_partition_query=True, logger=logger, logging_enable=True):
    print(item)
    # print ('RUs: ' + ctr.client_connection.last_response_headers['x-ms-request-charge'])
    # print ('ActivityId: ' + ctr.client_connection.last_response_headers['x-ms-activity-id'])

# The effective RU usage is different from the output below, as there may be multiple requests between the client/server
# Comparing execution below in terms of RU with output from query from the portal
#   Portal: ~ 9015 RUs / Index hit document count = 0

# With DEBUG logging enabled, is possible to get all the HTTP responses and sum 'x-ms-request-charge'
# Ex.: 'x-ms-request-charge': '1987.99', 'x-ms-request-charge': '2137.54', 'x-ms-request-charge': '1940.9', 'x-ms-request-charge': '2128.36', 'x-ms-request-charge': '820.14' => ~9015 RUs

{'id': '7db61c58-636b-46dd-8fcc-39e2b2a8bb57'}


In [5]:
# Change index policy to add GeoSpatial index
# Confirm that index rebuild has been completed before executing next query

indexPolicy = {
    "indexingMode":"consistent",
    "includedPaths":[
        {"path":"/*"}
        ]
    , "spatialIndexes": [
        {
            "path": "/Location/*",
            "types": [
                "Point"
            ]
        }
    ]
    , "excludedPaths":[{"path": "/\"_etag\"/?"}]
}

db.replace_container(containerName, pkPath, indexing_policy=indexPolicy)

<ContainerProxy [dbs/Learn/colls/GeoJSON]>

In [6]:
# Without index: 9015 RUs
# With geo index: ~0.2s / 7.11 RUs
for item in ctr.query_items(query="SELECT c.id, c.location FROM c WHERE ST_WITHIN(c.Location, { 'type':'Polygon', 'coordinates': [[[40.808023, -73.966375], [40.806159, -73.961795], [40.799321, -73.966796], [40.801040, -73.970726], [40.808023, -73.966375]]]})" \
    , enable_cross_partition_query=True):
    print(item)

print ('RUs: ' + ctr.client_connection.last_response_headers['x-ms-request-charge'])
print ('ActivityId: ' + ctr.client_connection.last_response_headers['x-ms-activity-id'])

{'id': '7db61c58-636b-46dd-8fcc-39e2b2a8bb57'}
RUs: 7.11
ActivityId: 36a4c36c-b307-4d00-b4dc-a1f1ceca69c8


In [12]:
# RU cost for single item insert was: 12.19
# RU cost after geo index: 12.38

from datetime import datetime, timedelta

IOTSrc = IOTSources[fake.random_int(min=0, max=9999)]
dtBase = datetime(year=2022, month=12, day=15)
docs = []
readings = []

doc = {
    'id': str(uuid.uuid4())
    , 'PartitionKey': IOTSrc['Name'] + '_' + IOTSrc['Type'] + '_' + (dtBase + timedelta(days=0)).strftime('%Y_%m_%d')
    , 'Name': IOTSrc['Name']
    , 'Type': IOTSrc['Type']            
    , 'Entity': IOTSrc
    , 'Location': {
        "type":"Point",
        "coordinates":[ float(fake.numerify(text='40.#####')), float(fake.numerify(text='-73.#####')) ]     
    }
    , 'DateTime': (dtBase + timedelta(days=0)).isoformat()
    , 'Timestamp': (dtBase + timedelta(days=0)).timestamp()
    , 'Readings': {
            'Dimension': fake.random_element(elements=('D1', 'D2', 'D3', 'D4', 'D5', 'D6', 'D7', 'D8', 'D9'))
            , 'Metric': fake.random_number(digits=5)
            , 'Timestamp': fake.date_time_this_year().isoformat()
    }
    , 'class': fake.random_element(elements=OrderedDict([("A", 0.40), ("B", 0.35), ("C", 0.15), ("D", 0.05), ("E", 0.05)]))
}

print(doc)
ctr.create_item(doc)
print(ctr.client_connection.last_response_headers['x-ms-request-charge'])

{'id': '268faf3b-9205-4bc7-97fa-9ff4c627a944', 'PartitionKey': 'LIOi_373139920146_Type3_2022_12_15', 'Name': 'LIOi_373139920146', 'Type': 'Type3', 'Entity': {'Name': 'LIOi_373139920146', 'Type': 'Type3'}, 'Location': {'type': 'Point', 'coordinates': [40.2479, -73.39276]}, 'DateTime': '2022-12-15T00:00:00', 'Timestamp': 1671084000.0, 'Readings': {'Dimension': 'D6', 'Metric': 41919, 'Timestamp': '2022-01-22T04:43:06'}, 'class': 'A'}
12.38


In [None]:
# Restore default indexing policy without GeoSpatial indexing
indexPolicy = {
    "indexingMode":"consistent",
    "includedPaths":[
        {"path":"/*"}
        ]
    , "excludedPaths":[{"path": "/\"_etag\"/?"}]
}

db.replace_container(containerName, pkPath, indexing_policy=indexPolicy)