In [None]:
inventory_data = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "CosmosDb")\
    .option("spark.cosmos.container", "inventory")\
    .load()
inventory_data = inventory_data.drop("_rid","_ts","_etag")
display(inventory_data.limit(2))

In [None]:
store_pots_stock = inventory_data[inventory_data["PRODUCT_NAME"] == "Pots"]\
    .groupBy("PRODUCT_NAME","STORE_ID")\
    .sum("STOCK")\
    .withColumnRenamed("SUM(STOCK)", "STORE_STOCK")
    
display(store_pots_stock)

In [None]:
from pyspark.sql.types import IntegerType

sales_data = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "CosmosDb")\
    .option("spark.cosmos.container", "sales")\
    .load()

sales_data = sales_data.withColumn("QUANTITY", sales_data["QUANTITY"].cast(IntegerType()))

In [None]:
avg_sales = sales_data.groupBy("PRODUCT_NAME","STORE_ID")\
    .avg("QUANTITY")\
    .withColumnRenamed("avg(QUANTITY)", "AVG_SALES")

stock = inventory_data.groupBy("PRODUCT_NAME","STORE_ID")\
    .sum("STOCK")\
    .withColumnRenamed("SUM(STOCK)", "STORE_STOCK")\

draft_orders = stock.join(avg_sales, on=['STORE_ID','PRODUCT_NAME'], how='inner')\
    .orderBy("AVG_SALES", ascending=False)

draft_orders = draft_orders.where("STORE_STOCK < 20")
display(draft_orders)

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [None]:
def should_order(stock, sales):
  if   stock < sales: 
      return 'Y'
  else:
      return 'N'

should_order = F.udf(should_order, StringType())
draft_orders = draft_orders.withColumn("SHOULD_ORDER", should_order(draft_orders["STORE_STOCK"],draft_orders["AVG_SALES"])) 
display(draft_orders)

In [None]:
# Write a Spark DataFrame into a Cosmos DB container
# To select a preferred list of regions in a multi-region Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

draft_orders.write\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "CosmosDb")\
    .option("spark.cosmos.container", "draftorders")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()