# Configuration

- Download the latest .jar from https://search.maven.org/#search%7Cga%7C1%7Cazure-cosmosdb
- Copy the .jar files to a directory in your HDInsight storage layer (blob or ADLS)
- Installed pydocumentdb library on all nodes using a script action:
    
    sudo /usr/bin/anaconda/envs/py35/bin/pip install pyDocumentDB
    
    sudo /usr/bin/anaconda/bin/pip install pyDocumentDB

Note that there are two python environments

In [1]:
%%bash
conda env list
# /usr/bin/anaconda/envs/py35/bin/conda install -y pyDocumentDB
# /usr/bin/anaconda/envs/py35/bin/pip install pyDocumentDB

# conda environments:
#
base                  *  /usr/bin/anaconda
py35                     /usr/bin/anaconda/envs/py35



In [2]:
%%bash
source activate py35
conda list

# packages in environment at /usr/bin/anaconda/envs/py35:
#
# Name                    Version                   Build  Channel
_license                  1.1                      py35_1  
_nb_ext_conf              0.3.0                    py35_0  
alabaster                 0.7.9                    py35_0  
anaconda                  4.2.0               np111py35_0  
anaconda-clean            1.0.0                    py35_0  
anaconda-client           1.5.1                    py35_0  
anaconda-navigator        1.3.1                    py35_0  
argcomplete               1.0.0                    py35_1  
astroid                   1.4.7                    py35_0  
astropy                   1.2.1               np111py35_0  
babel                     2.3.4                    py35_0  
backports                 1.0                      py35_0  
beautifulsoup4            4.5.1                    py35_0  
bitarray                  0.8.1                    py35_0  
blaze                     0.10.1 

In [3]:
import sys
print(sys.version)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
8,application_1523308113254_0012,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.
3.5.2 |Anaconda 4.2.0 (64-bit)| (default, Jul  2 2016, 17:53:06) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]

I used data from https://github.com/jbrooksuk/JSON-Airports/blob/master/airports.json for this example

In [4]:
# Import Necessary Libraries
import pydocumentdb
from pydocumentdb import document_client
from pydocumentdb import documents
import datetime

In [5]:
# Configuring the connection policy (allowing for endpoint discovery)
connectionPolicy = documents.ConnectionPolicy()
connectionPolicy.EnableEndpointDiscovery
connectionPolicy.PreferredLocations = ["East US 2"]


# Set keys to connect to Azure Cosmos DB
masterKey = '<ENTER YOUR COSMOSDB KEY HERE>'
host = 'https://kmsparkcosmos.documents.azure.com:443/'
client = document_client.DocumentClient(host, {'masterKey': masterKey}, connectionPolicy)

# Reading

From https://docs.microsoft.com/en-us/azure/cosmos-db/spark-connector#pydocumentdb-implementation

In [6]:
# Configure Database and Collections
databaseId = 'airports'
collectionId = 'codes'

# Configurations the Azure Cosmos DB client will use to connect to the database and collection
dbLink = 'dbs/' + databaseId
collLink = dbLink + '/colls/' + collectionId

# Set query parameter
querystr = "SELECT * FROM c WHERE c.continent='NA'"

# Query documents
query = client.QueryDocuments(collLink, querystr, options=None, partition_key=None)

# Query for partitioned collections
# query = client.QueryDocuments(collLink, query, options= { 'enableCrossPartitionQuery': True }, partition_key=None)

# Push into list `elements`
elements = list(query)

In [7]:
elements[0]

{'name': 'False Island Seaplane Base', 'iata': 'FAK', 'continent': 'NA', '_attachments': 'attachments/', '_self': 'dbs/Dbs4AA==/colls/Dbs4AJfRKwA=/docs/Dbs4AJfRKwADAAAAAAAAAA==/', '_etag': '"090047f6-0000-0000-0000-5acbf6260000"', 'status': 1, 'size': None, 'iso': 'US', 'id': '096dc7f9-0a9e-45bf-b7fb-d0fd483259d8', '_ts': 1523316262, '_rid': 'Dbs4AJfRKwADAAAAAAAAAA==', 'type': 'seaplanes'}

In [8]:
# Create `df` Spark DataFrame from `elements` Python list
df = spark.createDataFrame(elements)



In [9]:
df.show(10)

+------------+--------------------+--------------------+--------------------+----------+---------+----+--------------------+---+--------------------+------+------+---------+----------+---------+
|_attachments|               _etag|                _rid|               _self|       _ts|continent|iata|                  id|iso|                name|  size|status|     type|       lon|      lat|
+------------+--------------------+--------------------+--------------------+----------+---------+----+--------------------+---+--------------------+------+------+---------+----------+---------+
|attachments/|"090047f6-0000-00...|Dbs4AJfRKwADAAAAA...|dbs/Dbs4AA==/coll...|1523316262|       NA| FAK|096dc7f9-0a9e-45b...| US|False Island Seap...|  null|     1|seaplanes|      null|     null|
|attachments/|"090049f6-0000-00...|Dbs4AJfRKwAFAAAAA...|dbs/Dbs4AA==/coll...|1523316262|       NA| BWS|852c784f-382d-4fc...| US|Blaine Municipal ...|  null|     0|   closed|      null|     null|
|attachments/|"09004af6-0

# Writing

Use the documentation for the "client" object at http://azure.github.io/azure-documentdb-python/api/pydocumentdb.document_client.html

In [10]:
kc_airport = {'name': 'KC Airport',
  'iata': 'KCM22',
  'continent': 'NA',
  'status': 1,
  'size': None,
  'iso': 'US',
  'type': 'seaplanes'}

In [11]:
# Write to CosmosDB using the UpsertDocument method on the client object
client.UpsertDocument(collLink,kc_airport)

{'name': 'KC Airport', 'iata': 'KCM22', 'continent': 'NA', '_attachments': 'attachments/', 'type': 'seaplanes', '_etag': '"0a00f454-0000-0000-0000-5acc0d510000"', 'status': 1, 'size': None, 'iso': 'US', 'id': 'b1a92fce-ed95-4679-979e-87aeba798845', '_ts': 1523322193, '_rid': 'Dbs4AJfRKwBJGgAAAAAAAA==', '_self': 'dbs/Dbs4AA==/colls/Dbs4AJfRKwA=/docs/Dbs4AJfRKwBJGgAAAAAAAA==/'}

In [12]:
# Verify it was written, by reading back

# Set query parameter
querystr2 = "SELECT * FROM c WHERE c.iata='KCM22'"

# Query documents
query2 = client.QueryDocuments(collLink, querystr2, options=None, partition_key=None)

# Push into list `elements`
elements2 = list(query2)

In [13]:
elements2

[{'name': 'KC Airport', 'iata': 'KCM22', 'continent': 'NA', '_attachments': 'attachments/', 'type': 'seaplanes', '_etag': '"0a00f454-0000-0000-0000-5acc0d510000"', 'status': 1, 'size': None, 'iso': 'US', 'id': 'b1a92fce-ed95-4679-979e-87aeba798845', '_ts': 1523322193, '_rid': 'Dbs4AJfRKwBJGgAAAAAAAA==', '_self': 'dbs/Dbs4AA==/colls/Dbs4AJfRKwA=/docs/Dbs4AJfRKwBJGgAAAAAAAA==/'}]