https://docs.microsoft.com/en-us/learn/modules/query-azure-cosmos-db-with-apache-spark-for-azure-synapse-analytics/

In [None]:
dfCustomer = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "AdventureWorksSQL")\
    .option("spark.cosmos.container", "Customer")\
    .load()

dfSalesOrder = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "AdventureWorksMongoDB")\
    .option("spark.cosmos.container", "SalesOrder")\
    .load()

In [None]:
display(dfCustomer.limit(10))

In [None]:
display(dfSalesOrder.limit(10))

In [None]:
system_document_properties = {'_attachments','_etag','_rid','_self','_ts'}
customer_columns = list(set(dfCustomer.columns) - system_document_properties)
dfCustomer = dfCustomer.select(customer_columns)

display(dfCustomer.limit(10))

In [None]:
system_document_properties = {'_attachments','_etag','_rid','_self','_ts','id'}
salesorder_columns = list(set(dfSalesOrder.columns) - system_document_properties)
dfSalesOrder = dfSalesOrder.select(salesorder_columns)

display(dfSalesOrder.limit(10))

In [None]:
dfCustomer.printSchema()

In [None]:
dfSalesOrder.printSchema()

In [None]:
print(dfCustomer.count())

In [None]:
display(dfCustomer.groupBy("address.country","address.city").count().orderBy("count",  ascending=False).limit(10))

In [None]:
dfCustomer.createOrReplaceTempView("CustomerTempView")

dfResult = spark.sql("SELECT * FROM CustomerTempView")
display(dfResult.limit(10))

In [None]:
%%sql
SELECT * FROM CustomerTempView LIMIT 10

In [None]:
%%sql
CREATE TABLE Customers USING cosmos.olap OPTIONS (
    spark.synapse.linkedService 'AdventureWorksSQL',
    spark.cosmos.container 'Customer'
)

In [None]:
%%sql
CREATE TABLE SalesOrders USING cosmos.olap OPTIONS (
    spark.synapse.linkedService 'AdventureWorksMongoDB',
    spark.cosmos.container 'SalesOrder'
)

In [None]:
%%sql
SELECT address.city AS City_Name, address.country AS Country_Name, count(*) as Address_Count 
                            FROM Customers 
                            GROUP BY address.city, address.country 
                            ORDER BY Address_Count DESC 
                            LIMIT 10

In [None]:
%%sql
CREATE OR REPLACE TEMPORARY VIEW SalesOrderView
AS
SELECT s._id.string as SalesOrderId, 
        c.id AS CustomerId, c.address.country AS Country, c.address.city AS City, 
        to_date(s.orderdate.string) AS OrderDate, to_date(s.shipdate.string) AS ShipDate
    FROM Customers c 
    INNER JOIN SalesOrders s
        ON c.id = s.CustomerId.string

In [None]:
%%sql
SELECT * FROM SalesOrderView LIMIT 10

In [None]:
%%sql
SELECT Country, City, Count(*) AS Total_Orders
    FROM SalesOrderView
    GROUP BY Country, City 
ORDER BY Total_Orders DESC

In [None]:
%%sql
SELECT _id.string as SalesOrderId, details
    FROM SalesOrders
    LIMIT 10

In [None]:
%%sql
SELECT _id.string as SalesOrderId, explode(details.array)
    FROM SalesOrders
    LIMIT 10

In [None]:
%%sql
SELECT _id.string as SalesOrderId, posexplode(details.array) 
    FROM SalesOrders
    LIMIT 10

In [None]:
%%sql
CREATE OR REPLACE TEMPORARY VIEW SalesOrderDetailsView
AS
    SELECT Ax.SalesOrderId,
        pos+1 as SalesOrderLine,
        col.object.sku.string AS SKUCode,
        col.object.price.float64 AS Price, 
        col.object.quantity.int32 AS Quantity
    FROM 
        (
            SELECT _id.string as SalesOrderId, posexplode(details.array) FROM SalesOrders 
        ) Ax

In [None]:
SELECT * 
FROM SalesOrderDetailsView
LIMIT 10

In [None]:
DESCRIBE SalesOrderDetailsView

In [None]:
%%sql
SELECT o.Country, o.City,
    COUNT(DISTINCT o.CustomerId) Total_Customers,
    COUNT(DISTINCT d.SalesOrderId) Total_Orders,
    COUNT(d.SalesOrderId) Total_OrderLines,
    SUM(d.Quantity*d.Price) AS Total_Revenue,
    dense_rank() OVER (ORDER BY SUM(d.Quantity*d.Price) DESC) as Rank_Revenue,
    dense_rank() OVER (ORDER BY COUNT(DISTINCT d.SalesOrderId) DESC) as Rank_Orders,
    dense_rank() OVER (ORDER BY COUNT(d.SalesOrderId) DESC) as Rank_OrderLines,
    dense_rank() OVER (PARTITION BY o.Country ORDER BY SUM(d.Quantity*d.Price) DESC) as Rank_Revenue_Country
FROM SalesOrderView o
INNER JOIN SalesOrderDetailsView d
    ON o.SalesOrderId = d.SalesOrderId
WHERE Country IS NOT NULL OR City IS NOT NULL
GROUP BY o.Country, o.City
ORDER BY Total_Revenue DESC
LIMIT 10

In [None]:
%%sql
CREATE OR REPLACE TEMPORARY VIEW SalesOrderStatsView
AS
SELECT o.Country, o.City,
    COUNT(DISTINCT o.CustomerId) Total_Customers,
    COUNT(DISTINCT d.SalesOrderId) Total_Orders,
    COUNT(d.SalesOrderId) Total_OrderLines,
    SUM(d.Quantity*d.Price) AS Total_Revenue,
    dense_rank() OVER (ORDER BY SUM(d.Quantity*d.Price) DESC) as Rank_Revenue,
    dense_rank() OVER (ORDER BY COUNT(DISTINCT d.SalesOrderId) DESC) as Rank_Orders,
    dense_rank() OVER (ORDER BY COUNT(d.SalesOrderId) DESC) as Rank_OrderLines,
    dense_rank() OVER (PARTITION BY o.Country ORDER BY SUM(d.Quantity*d.Price) DESC) as Rank_Revenue_Country
FROM SalesOrderView o
INNER JOIN SalesOrderDetailsView d
    ON o.SalesOrderId = d.SalesOrderId
WHERE Country IS NOT NULL OR City IS NOT NULL
GROUP BY o.Country, o.City
ORDER BY Total_Revenue DESC

In [None]:
%%sql
SELECT concat(Country,'-',replace(City,' ','')) AS id, 
    'SalesOrderStatistic' AS type, *
FROM SalesOrderStatsView
LIMIT 10

In [None]:
dfSalesOrderStatistic = spark.sql("SELECT concat(Country,'-',replace(City,' ','')) AS id, \
                                    'SalesOrderStatistic' AS type, \
                                    * FROM SalesOrderStatsView")

dfSalesOrderStatistic.write\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "AdventureWorksSQL")\
    .option("spark.cosmos.container", "Sales")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

# Let’s validate that these records are now visible in our Cosmos DB container by.

#### Navigate to your previously created Azure Cosmos DB Core (SQL) API account and Select Data Explorer in the left-hand menu to run following SQL queries.

In [None]:
SELECT * FROM c WHERE c.type = "SalesOrderStatistic"

In [None]:
SELECT COUNT(c.id) FROM c WHERE c.type = 'SalesOrderStatistic’

# Read data from the transactional store


In [None]:
dfCustomer = spark.read\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "AdventureWorksSQL")\
    .option("spark.cosmos.container", "Customer")\
    .load()

display(dfCustomer.limit(10))

In [None]:
%%sql
CREATE TABLE CustomersOLTP USING cosmos.oltp OPTIONS (
    spark.synapse.linkedService 'AdventureWorksSQL',
    spark.cosmos.container 'Customer'
)

In [None]:
%%sql
SELECT * FROM CustomersOLTP
LIMIT 10