## ADZD
# Cosmos DB Real-time Data Analysis


## Exercise 1 - dataframes
Read dataframe from CosmosDB container - __invoices__

In [1]:
df = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "CosmosDb1")\
    .option("spark.cosmos.container", "invoices")\
    .load()

In [3]:
df.printSchema()

root
 |-- _rid: string (nullable = true)
 |-- _ts: long (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- CustomerId: long (nullable = true)
 |-- Items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- StockCode: string (nullable = true)
 |    |    |-- Description: string (nullable = true)
 |    |    |-- Quantity: long (nullable = true)
 |    |    |-- UnitPrice: double (nullable = true)
 |-- id: string (nullable = true)
 |-- _etag: string (nullable = true)

Write query to create new dataframe consisting of two columns -> __id__ ( customerID ) and __ClientMoneySpent__ ( money which each client spent according to invoices). Need to write some aggregations...

In [9]:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType

df_clients = ???

In [10]:
df_clients.printSchema()

root
 |-- id: string (nullable = true)
 |-- ClientMoneySpent: double (nullable = true)

Show dataframe:

In [11]:
df_clients.show()

+-----+-------------------+
|   id|   ClientMoneySpent|
+-----+-------------------+
|17850|  5391.210000000009|
|13047|  366.6300000000001|
|12583|             855.86|
|13748|              204.0|
|15100|              350.4|
|15291|              328.8|
|14688|             444.98|
|17809|             1251.5|
|15311|            1095.97|
|14527| 236.15999999999994|
|16098| 430.59999999999997|
|18074|              489.6|
|17420|             130.85|
|16029|            4271.52|
|16250|             226.14|
|12431|             358.25|
|17511|            1825.74|
|17548|-141.48000000000002|
|13705| 318.14000000000004|
|13747|               79.6|
+-----+-------------------+
only showing top 20 rows

In [12]:
# Write a Spark DataFrame into a Cosmos DB container
# To select a preferred list of regions in a multi-region Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df_clients.write\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "CosmosDb1")\
    .option("spark.cosmos.container", "clients")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

## Exercise 2 - streams
Read spark stream from invoices and write aggregated stream to __Countries__ container

In [13]:
# Load a streaming Spark DataFrame from a Cosmos DB container
# To select a preferred list of regions in a multi-region Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "CosmosDb1")\
    .option("spark.cosmos.container", "invoices")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

In [14]:
dfStream.printSchema()

root
 |-- Country: string (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- Items: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- UnitPrice: double (nullable = true)
 |    |    |-- Description: string (nullable = true)
 |    |    |-- Quantity: integer (nullable = true)
 |    |    |-- StockCode: string (nullable = true)
 |-- _attachments: string (nullable = true)
 |-- _etag: string (nullable = true)
 |-- _rid: string (nullable = true)
 |-- _self: string (nullable = true)
 |-- _ts: integer (nullable = true)
 |-- id: string (nullable = true)

Write aggregations, stream should return __id__ (Country) and __Invoices__ ( How many invoices are from current country)<br>
Remember about creating additional column which holds timestamp (created from 'InvoiceDate') and setting watermark on this column, e.g. for 10 minutes.
HINT: _to_timestamp(?)_ , _with_watermark(?,?)_

In [16]:
dfStream_countries = ???

In [17]:
dfStream_countries.printSchema()

root
 |-- id: string (nullable = true)
 |-- Invoices: long (nullable = false)

In [20]:
streamQuery = dfStream_countries\
    .writeStream\
    .format("cosmos.oltp")\
    .outputMode("update")\
    .option("checkpointLocation", "/localWriteCheckpointFolder")\
    .option("spark.synapse.linkedService", "CosmosDb1")\
    .option("spark.cosmos.container", "countries")\
    .option("spark.cosmos.connection.mode", "gateway")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .start()

Execution plan:

In [21]:
streamQuery.explain()

== Physical Plan ==
*(5) HashAggregate(keys=[Country#332], functions=[count(1)])
+- StateStoreSave [Country#332], state info [ checkpoint = abfss://fdlsg2@dlsg2acc.dfs.core.windows.net/localWriteCheckpointFolder/state, runId = abca3a6e-7519-4f88-b787-7d7a39517c04, opId = 0, ver = 0, numPartitions = 200], Update, 0, 2
   +- *(4) HashAggregate(keys=[Country#332], functions=[merge_count(1)])
      +- StateStoreRestore [Country#332], state info [ checkpoint = abfss://fdlsg2@dlsg2acc.dfs.core.windows.net/localWriteCheckpointFolder/state, runId = abca3a6e-7519-4f88-b787-7d7a39517c04, opId = 0, ver = 0, numPartitions = 200], 2
         +- *(3) HashAggregate(keys=[Country#332], functions=[merge_count(1)])
            +- Exchange hashpartitioning(Country#332, 200), [id=#135]
               +- *(2) HashAggregate(keys=[Country#332], functions=[partial_count(1)])
                  +- *(2) Project [Country#332]
                     +- EventTimeWatermark ts#257: timestamp, interval 10 minutes
      

Stream actual status:

In [24]:
streamQuery.status

{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}

Stop stream:

In [25]:
streamQuery.stop()