# Getting started with Azure Cosmos DB's API for MongoDB and Synapse Link

## Key Information about this notebook

* This notebook is part of the Azure Synapse Link for Azure Cosmos DB analitycal sample notebooks. For more information, click [here](../../../README.md). 

* It was build for Azure Cosmos DB API for MongoDB but you can, by yourself, customize it for Azure Cosmos DB SQL API. Please read about the analytical store inference schema differences between these 2 APIs [here](https://docs.microsoft.com/azure/cosmos-db/analytical-store-introduction#analytical-schema). 

* This is a Synapse Notebook and it was created to run in Synapse Analytics workspaces. Please make sure that you followed the pre-reqs of the [README](/README.md) file. After that, please execute the steps below in the same order that they are presented here. 

* From now on, all operations are case sentitive. Please be careful with everything you need to type.

In this sample we will execute the following tasks:

1. Insert a dataset using the traditional MongoDB client.
1. Execute aggregation queries against the Analytical Store from the transactional data we inserted.
1. Insert another dataset, but this time using a different datatype for the timestamp property.
1. Execute aggregation queries again, consolidating both datasets.

## Pre-requisites
1. Have you created a MongoDB API account in Azure Cosmos DB? If not, go to [Create an account for Azure Cosmos DB's API for MongoDB](https://docs.microsoft.com/azure/cosmos-db/mongodb-introduction).
1. For your Cosmos DB account, have you enabled Synapse Link? If not, go to [Enable Synapse Link for Azure Cosmos DB's API for MongoDB](https://docs.microsoft.com/azure/cosmos-db/configure-synapse-link).
1. Have you created a Synapse Workspace? If not, go to [Create Synapse Workspace account](https://docs.microsoft.com/azure/synapse-analytics/synapse-link/how-to-connect-synapse-link-cosmos-db). Please don't forget to add yourself as **Storage Blob Data Contributor** to the primary ADLS G2 account that is linked to the Synapse workspace.

## Create a Cosmos DB collection with analytical store enabled

Please be careful, all commands are case sensitive.

1. Create a database named `DemoSynapseLinkMongoDB`. 
1. Create a collection named `HTAP` with a Shard key called `item`. Make sure you set the `Analytical store` option to `On` when you create your collection.

## Optional - Connect your collection to Synapse

To accelerate future work, you can connect your collection to Synapse. **We won't use this capability in this demo**, but fell free to test and use it.

1. Go to your Synapse Analytics workspace.
1. Create a `Linked Data` connection for your MongoDB API account. 
    1. Under the `Data` blade, select the + (plus) sign.
    1. Select the `Connect to external data` option.
    1. Now select the `Azure Cosmos DB (MongoDB API)` option. 
    1. Enter all the information regarding your specific Azure Cosmos DB account either by using the dropdowns or by entering the connection string. Take note of the name you assigned to your `Linked Data` connection. 
    - Alternatively, you can also use the connection parameters from your account overview.
1. Test the connection by looking for your database accounts in the `Data` blade, and under the `Linked` tab.
    - There should be a list that contains all accounts and collections.
    - Collections that have an `Analytical Store` enabled will have a distinctive icon.

### Let's get the environment ready

This environment allows you to install and use any python libraries that you want to run. For this sample, you need to add the following libraries to your Spark pool:

```
pymongo==3.5.1
aenum==2.2.4
```

Learn how to import libraries into your Spark pools in [this article](https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-azure-portal-add-libraries). Please use the `requirements.txt` file located in the same folder of this notebook to update your pool packages.

You can execute the following command to make sure all the libraries are installed correctly:

In [1]:
import importlib

packages = ['pymongo','aenum']
for package in packages:
    test = importlib.util.find_spec(package)
    if test:
        print(package, "OK")
    else:
        print(package, "PROBLEM - NOK")

StatementMeta(rosouzMongo, 34, 3, Finished, Available)

pymongo OK
bson OK
aenum OK

### Add your database account and collection details here!

In [3]:
DATABASE_ACCOUNT_NAME = 'your-cosmos-db-mongodb-account-name'
DATABASE_ACCOUNT_READWRITE_KEY = 'your-cosmos-db-mongodb-account-key'
DATABASE_NAME = 'DemoSynapseLinkMongoDB'
COLLECTION_NAME = 'HTAP'

StatementMeta(rosouzMongo, 34, 5, Submitted, Available)



## Let's initialize the MongoDB client

You are only going to need the following parameters from your account overview: 
- Connection string.
- Primary or secondary ready/write key.

Remember that we named our database `DemoSynapseLinkMongoDB` and our collection `HTAP`.

The code snippet below shows how to initialize the `MongoClient` object.

In [9]:
from pymongo import MongoClient
from bson import ObjectId # For ObjectId to work

client = MongoClient("mongodb://{account}.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb".format(account = DATABASE_ACCOUNT_NAME)) # Your own database account endpoint.
db = client.DemoSynapseLinkMongoDB    # Select the database
db.authenticate(name=DATABASE_ACCOUNT_NAME,password=DATABASE_ACCOUNT_READWRITE_KEY) # Use your database account name and any of your read/write keys.

StatementMeta(rosouzMongo, 34, 11, Finished, Available)

True

## Inserting data with the MongoClient driver

The following sample will generate 500 items based on random data. Each item will contain the following fields:
- item, string
- price, float
- rating, integer
- timestamp, [epoch integer](http://unixtimestamp.50x.eu/about.php)

This data will be inserted into the MongoDB store of your database. This emulates the transactional data that an application would generate.

In [10]:
from random import randint
import time

orders = db["HTAP"]

items = ['Pizza','Sandwich','Soup', 'Salad', 'Tacos']
prices = [2.99, 3.49, 5.49, 12.99, 54.49]

for x in range(1, 501):
    order = {
        'item' : items[randint(0, (len(items)-1))],
        'price' : prices[randint(0, (len(prices)-1))],
        'rating' : randint(1, 5),
        'timestamp' : time.time()
    }
    
    result=orders.insert_one(order)

print('finished creating 500 orders')


StatementMeta(rosouzMongo, 34, 12, Finished, Available)

finished creating 500 orders

## Read data from the Analytical Store.

Now that we have inserted some transactional data, let's read it from Azure Cosmos DB analytical store. Cosmos DB will automatically transform the BSON data (Binary JSON) into a columnar format, which will make it fast and easy to execute aggregation workloads on top of your transactional data, at no RUs or performance costs.

The cells below will:

1. Load the data from analytical store into a DataFrame.
1. Check the top rows. Yes, the BSON data was converted into columar structured format.
1. Check the DataFrame schema.
1. Run aggregations


> If you get an "no snapshot" error, Please check if your container was created with **analytical store** enabled. 

If your DataFrame has no data, please wait a couple of minutes because the root cause is that the auto sync between transactional and analytical stores isn't completed yet. This process usually takes 2 minutes, but in some cases it may take up to 5 minutes. Please wait a few minutes and run the command below again.

**Important: Please note that we are using random values for prices and ratings. Don't expect the same results of the outputs below. What you can expect is the same behavior and experience.**


In [12]:
# Load the data from analytical store into a DataFrame.
df = spark.read.format("cosmos.olap")\
    .option("spark.cosmos.accountEndpoint", "https://{account}.documents.azure.com:443/".format(account = DATABASE_ACCOUNT_NAME))\
    .option("spark.cosmos.accountKey", DATABASE_ACCOUNT_READWRITE_KEY)\
    .option("spark.cosmos.database", DATABASE_NAME)\
    .option("spark.cosmos.container", COLLECTION_NAME)\
    .load()

# Checking the data
display(df)

StatementMeta(rosouzMongo, 34, 14, Finished, Available)

SynapseWidget(Synapse.DataFrame, 409d5e1d-b7a4-42f6-99f9-a860e6410a05)

## First Schema Analysis

Let's run the command below and check the schema of the `df` DataFrame that we just created and loaded. Please note that all properties of our document (item, price, rating, and timestamp) are represented in the DataFrame as a `struct` with one datatype within each one of them. This will change in the next cells, and to understand that is part of the learning objectives of this notebook.

In [13]:
df.printSchema()

StatementMeta(rosouzMongo, 34, 15, Finished, Available)

root
 |-- _rid: string (nullable = true)
 |-- _ts: long (nullable = true)
 |-- id: string (nullable = true)
 |-- _etag: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- objectId: string (nullable = true)
 |-- item: struct (nullable = true)
 |    |-- string: string (nullable = true)
 |-- price: struct (nullable = true)
 |    |-- float64: double (nullable = true)
 |-- rating: struct (nullable = true)
 |    |-- int32: integer (nullable = true)
 |-- timestamp: struct (nullable = true)
 |    |-- float64: double (nullable = true)
 |-- _partitionKey: struct (nullable = true)
 |    |-- string: string (nullable = true)

## Agregations

Now let's run aggregations on top of the `df` DataFrame that we just created.

In [14]:
# Run aggregations

df.groupBy(df.item.string).sum().show()

StatementMeta(rosouzMongo, 34, 16, Finished, Available)

+------------+------------+
|item[string]|    sum(_ts)|
+------------+------------+
|        Soup|147570020190|
|       Pizza|166818283753|
|       Salad|150778064041|
|       Tacos|174838393606|
|    Sandwich|162006217845|
+------------+------------+

## Important - Spark Syntax for Aggregations on DataFrames

For the aggregation above, a syntax that doesn't explicity mentions the datatypes, like `df.groupBy(df['item']).sum().show()`, executes without an error. But it is **not recommended!** 

It runs because Spark automatically flattens the structure into an Array, where it takes each distinct value in the `struct` dict and applies the aggregation function. But you will see in the next cells that we may have more than one datatype for the same struct of a property, and the implicit conversion that Spark does can cause wrong results.
## Schema Representation - A quick note about the MongoDB schema in analytical store

Please note in the result above that for the `timestamp` field we have only 1 datatype: `struct<float64:double>`. 
We will see that this detail will change since we will insert data with different datatype for that `timestamp` field.

For Azure Cosmos DB API for MongoDB accounts, we make use of a **Full Fidelity Schema** as a default option. This is a representation of property names extended with their data types to provide an accurate 
representation of their values and avoid ambiguity.

This is why, when we called the fields above, we used their datatype as a suffix. Like in the example below:

```
df.filter((df.item.string == "Pizza")).show(10)
```

Notice how we specified the `string` type after the name of the property. Here is a map of all potential properties and their suffix representations in the Analytical Store:

| Original Data Type     &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;| Suffix    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;| Example &nbsp;&nbsp;&nbsp;&nbsp; | 
|---------------|----------------|--------|
| Double        | ".float64"     |  `24.99`   |
| Array         | ".array"       |  `["a", "b"]`   |
| Binary        | ".binary"      |  `0`   |
| Boolean       | ".bool"        |  `True`   |
| Int32         | ".int32"       |  `123`   |
| Int64         | ".int64"       |  `255486129307`   |
| Null          | ".null"        |  `null`   |
| String        | ".string"      |  `"ABC"`   |
| Timestamp     | ".timestamp"   |  `Timestamp(0, 0)`   |
| DateTime      | ".date"        |  `ISODate("2020-08-21T07:43:07.375Z")`   |
| ObjectId      | ".objectId"    |  `ObjectId("5f3f7b59330ec25c132623a2")`   |
| Document      | ".object"      |  `{"a": "a"}`   |

These types are inferred from the data that is inserted in the transactional store. You can see the schema by executing the following command:
```
df.printSchema
```

> The default option for Azure Cosmos DB CORE (SQL) API, **Well defined Schema** is the default option. For more information about schemas representation, click [here](https://docs.microsoft.com/azure/cosmos-db/analytical-store-introduction#schema-representation) .



## Let's insert more orders!

This time we will use slightly different data. Each item will contain the following fields:
- item, string
- price, float
- rating, integer
- timestamp, [ISO String format](https://en.wikipedia.org/wiki/ISO_8601)

Notice how the `Timestamp` field is now in a string format. This will help us understand how the different data fields can be read based on their data type.

After that, we will load the data, check the schema, and run some queries.

In [15]:
from random import randint
from time import strftime

orders = db["HTAP"]

items = ['Pizza','Sandwich','Soup', 'Salad', 'Tacos']
prices = [2.99, 3.49, 5.49, 12.99, 54.49]

for x in range(1, 501):
    order = {
        'item' : items[randint(0, (len(items)-1))],
        'price' : prices[randint(0, (len(prices)-1))],
        'rating' : randint(1, 5),
        'timestamp' : strftime("%Y-%m-%d %H:%M:%S")
    }
    
    result=orders.insert_one(order)

print('finished creating 500 orders')

StatementMeta(rosouzMongo, 34, 17, Finished, Available)

finished creating 500 orders

## Let's reload the DataFrame and check the schema again!

In [18]:
# Load the Analytical Store data into a dataframe
# Make sure to run the cell with the secrets to get the DATABASE_ACCOUNT_NAME and the DATABASE_ACCOUNT_READWRITE_KEY variables.
df = spark.read.format("cosmos.olap")\
    .option("spark.cosmos.accountEndpoint", "https://{account}.documents.azure.com:443/".format(account = DATABASE_ACCOUNT_NAME))\
    .option("spark.cosmos.accountKey", DATABASE_ACCOUNT_READWRITE_KEY)\
    .option("spark.cosmos.database", DATABASE_NAME)\
    .option("spark.cosmos.container", COLLECTION_NAME)\
    .load()

# Check the schema AGAIN. Try to find something different.
df.printSchema()

StatementMeta(rosouzMongo, 34, 20, Finished, Available)

root
 |-- _rid: string (nullable = true)
 |-- _ts: long (nullable = true)
 |-- id: string (nullable = true)
 |-- _etag: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- objectId: string (nullable = true)
 |-- item: struct (nullable = true)
 |    |-- string: string (nullable = true)
 |-- price: struct (nullable = true)
 |    |-- float64: double (nullable = true)
 |-- rating: struct (nullable = true)
 |    |-- int32: integer (nullable = true)
 |-- timestamp: struct (nullable = true)
 |    |-- float64: double (nullable = true)
 |    |-- string: string (nullable = true)
 |-- _partitionKey: struct (nullable = true)
 |    |-- string: string (nullable = true)

## Schema Representation - What Changed?

Please note in the result above that now, for the `timestamp` field, we have 2 datatypes: `struct<float64:double>` and `string:string`. That happened because we added data with a different datatype. That's `Full Fidelity Schema`, when Azure Cosmos DB will do a full representation of your data, with the datatypes you used.

> If the result doesn't show two datatypes for `timestamp`, then wait a few minutes because the backend auto-sync process has not yet occurred.

## Queries

Now let's run some interesting queries, using the datypes to filter the data.


In [22]:
# SQL!!
# Let's see the data for pizzas that have a string timestamp
df.createOrReplaceTempView("Pizza")
sql_results = spark.sql("SELECT sum(price.float64),count(*) FROM Pizza where timestamp.string is not null and item.string = 'Pizza'")
sql_results.show()

StatementMeta(rosouzMongo, 34, 24, Finished, Available)

+-------------------------------+--------+
|sum(price.float64 AS `float64`)|count(1)|
+-------------------------------+--------+
|             1736.4900000000007|     101|
+-------------------------------+--------+

In [20]:
# SQL!!
# Let's see the data for pizzas that have a string timestamp
df.createOrReplaceTempView("Pizza")
sql_results = spark.sql("SELECT sum(price.float64),count(*) FROM Pizza where timestamp.float64 is not null and item.string = 'Pizza'")
sql_results.show()

StatementMeta(rosouzMongo, 34, 22, Finished, Available)

+-------------------------------+--------+
|sum(price.float64 AS `float64`)|count(1)|
+-------------------------------+--------+
|             1571.9600000000007|     104|
+-------------------------------+--------+

In [21]:
# SQL!!
# Let's compare both timestamp columns
df.createOrReplaceTempView("Pizza")
sql_results = spark.sql("SELECT max(timestamp.float64),max(timestamp.string) FROM Pizza where item.string = 'Pizza'")
sql_results.show()

StatementMeta(rosouzMongo, 34, 23, Finished, Available)

+-----------------------------------+---------------------------------+
|max(timestamp.float64 AS `float64`)|max(timestamp.string AS `string`)|
+-----------------------------------+---------------------------------+
|               1.6040219670579042E9|              2020-10-30 01:43:10|
+-----------------------------------+---------------------------------+

## Schema Representation - Last thoughts

Please note that the queries above return different data because of the filters on the timestamp column. From the user perspective, it's like there are 2 different columns, `timestamp.float64` and `timestamp.string`.

## Conclusion

Now you know how to use Azure Synapse Link for Azure Cosmos DB analitical store for MongoDB API. Also, now you know how to work with dataframes, full fidelity schema, and Spark Sql.