In [0]:
%pip install azure-cosmos

In [0]:
from pyspark.sql import functions as f
from pyspark.sql.functions import lit
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, when

from azure.cosmos import exceptions, CosmosClient, PartitionKey
import pandas as pd
import numpy as np
import scipy
from scipy.stats import linregress
import os
import math
import time

cosmosEndpoint = "https://bhoojal-cosmos.documents.azure.com:443/"
cosmosMasterKey = "0dDLs6jiq9oopPFt9oUkxx3Fima5EVJ6yEbPYIVAZY1oHqhyYxZ5mXcghgqsMvAabsVsBfi5Samh5V8onFc1NQ=="
cosmosDatabaseName = "bhoojal_outlets"
cosmosContainerName = "outlet"
# city data to be processed
dbutils.widgets.text('city', '')
query_city = dbutils.widgets.get('city')
print("Starting notebook for city", query_city)

# Get Outlet from cosmos db
client = CosmosClient(cosmosEndpoint, cosmosMasterKey)

database = client.create_database_if_not_exists(id=cosmosDatabaseName)

container = database.create_container_if_not_exists(
    id=cosmosContainerName, 
    partition_key=PartitionKey(path="/city"),
    offer_throughput=400
)

container_depth = database.create_container_if_not_exists(
    id="region_depth", 
    partition_key=PartitionKey(path="/city"),
    offer_throughput=400
)

container_rain = database.create_container_if_not_exists(
    id="region_rain", 
    partition_key=PartitionKey(path="/city"),
)

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# build a view with all outlets
all_outlets_cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
  "spark.cosmos.read.customQuery" : "SELECT * FROM c WHERE c.city = '" + query_city + "'"
}

df = spark.read.format("cosmos.oltp").options(**all_outlets_cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.createOrReplaceTempView("CityOutlets")

# For every outlet, %sql
outlets_df = spark.sql("SELECT * FROM CityOutlets")
city_outlets = outlets_df.toPandas()
city_outlets

In [0]:
# This is a hack. Look away!
# Using Cosmos Python SDK as Spark isn't supporting dynamic spatial queries. This routine can be moved to spark by creating a spatial query in native SQL.
for i in city_outlets.index:
  distance_query = "ST_DISTANCE(f.boundary, {\"type\":\"Point\",\"coordinates\": [" + str(city_outlets['location'][i]['coordinates'][0]) + ", " + str(city_outlets['location'][i]['coordinates'][1]) + "]})"
  depth_query = "SELECT f.id,f.depth,f.scannedIn,"+ distance_query +" as distance FROM f WHERE " + distance_query + " < 600"
  #print(depth_query)
  distance_query = "ST_DISTANCE(f.boundary, {\"type\":\"Point\",\"coordinates\": [" + str(city_outlets['location'][i]['coordinates'][0]) + ", " + str(city_outlets['location'][i]['coordinates'][1]) + "]})"
  rain_query = "SELECT f.id,f.rainFall,f.scannedIn,"+ distance_query +" as distance FROM f WHERE " + distance_query + " < 2000"
  print(rain_query)
  
  # Query depth data within range of outlet
  depth_result = list(container_depth.query_items(
    query=depth_query,
    enable_cross_partition_query=True
  ))
  #print(len(depth_result), "depth results found for outlet", i)
  
  # Aggregate depths per quarter for the outlet
  if len(depth_result) <= 0:
    continue
  depth_df = pd.DataFrame(depth_result, columns = ['scannedIn','depth'])
  depth_df = depth_df.groupby('scannedIn', as_index=False).agg({"depth": "mean"})
  print(len(depth_df), "quarters of depth data found for outlet", i)
  
  # Query rain data within range of outlet
  rain_result = list(container_rain.query_items(
    query=rain_query,
    enable_cross_partition_query=True
  ))
  #print(len(rain_result), "rain results found for outlet", i)
  
  # Aggregate rain per quarter for the outlet
  if len(rain_result) <= 0:
    continue
  rain_df = pd.DataFrame(rain_result, columns = ['scannedIn','rainFall'])
  rain_df
  rain_df = rain_df.groupby('scannedIn', as_index=False).agg({"rainFall": "mean"})
  print(len(rain_df), "quarters of rain data found for outlet", i)
  
  if len(rain_df) != len(depth_df):
    continue
  
  # Initialize X axis
  x = pd.Series([1,2,3,4])
  # Fetch Depth data for last 4 quarters
  depth = pd.Series(depth_df['depth'])
  # Fetch Rain data for last 4 quarters
  rain = pd.Series(rain_df['rainFall'])

  # Normalize depth and rain
  depth = (depth - min(depth)) / (max(depth) - min(depth))
  rain = (rain - min(rain)) / (max(rain) - min(rain))

  # Calculate slope of the Depth metrics
  depth_slope, intercept, r_value, p_value, std_err = linregress(x, depth)
  depth_slope = depth_slope * -1
  print("Depth slope:",depth_slope) # Invert depth slope as higher values mean lesser water

  # Calculate slope of the Rain metrics
  rain_slope, intercept, r_value, p_value, std_err = linregress(x, rain)
  print("Rain slope:",rain_slope)

  #Score is summation of both slopes
  score = depth_slope + rain_slope
  print("Score:",score)
  if  math.isnan(score):
    print("Invalid score")
    continue
  outlet_item = container.read_item(item=city_outlets['id'][i], partition_key=city_outlets['city'][i])
  outlet_item['quantity_score'] = score
  response = container.upsert_item(body=outlet_item)