# Streaming Sample: Cosmos DB ChangeFeed - Databricks
In this notebook, you read a live stream of tweets that stored in Cosmos DB by leveraging Apache Spart to read the Cosmos DB's Change Feed, and run transformations on the data in Databricks cluster.

## prerequisites:
- Databricks Cluster (Spark)
- Cosmos DB Spark Connector (azure-cosmosdb-spark)
  - Create a library using maven coordinates. Simply typed in `azure-cosmosdb-spark_2.2.0` in the search box and search it, or create library by simply uploading jar file that can be donwload from marven central repository
- Azure Cosmos DB Collection

## Test Feed Generator
- https://github.com/tknandu/TwitterCosmosDBFeed

## LINKS
- [Working with the change feed support in Azure Cosmos DB](https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed)
- [Twitter with Spark and Azure Cosmos DB Change Feed Sample](https://github.com/Azure/azure-cosmosdb-spark/blob/master/samples/notebooks/Twitter%20with%20Spark%20and%20Azure%20Cosmos%20DB%20Change%20Feed.ipynb)
- [Stream Processing Changes using Azure Cosmos DB Change Feed and Apache Spark](https://github.com/Azure/azure-cosmosdb-spark/wiki/Stream-Processing-Changes-using-Azure-Cosmos-DB-Change-Feed-and-Apache-Spark)
- https://github.com/tknandu/TwitterCosmosDBFeed

## Configure Connection to Cosmos DB Change Feed using azure-cosmosdb-spark
The parameters below connect to the Cosmos DB Change Feed; for more information, please refer to Change Feed Test Runs.

In [3]:
# Adding variables 
rollingChangeFeed = False
startFromTheBeginning = False
useNextToken = True 

database = "changefeedsource"
collection = "tweet_new"

tweetsConfig = {
"Endpoint" : "https://dbstreamdemo.documents.azure.com:443/",
"Masterkey" : "ekRLXkETPJ93s6XZz4YubZOw1mjSnoO5Bhz1Gk29bVxCbtgtKmiyRz4SogOSxLOGTouXbwlaAHcHOzct4JVwtQ==",
#"Database" : database,
#"Collection" : collection, 
"Database" : "changefeedsource",
"Collection" : "tweet_new", 
"ReadChangeFeed" : "true",
"ChangeFeedQueryName" : database + collection + " ",
"ChangeFeedStartFromTheBeginning" : str(startFromTheBeginning),
"ChangeFeedUseNextToken" : str(useNextToken),
"RollingChangeFeed" : str(rollingChangeFeed),
#"ChangeFeedCheckpointLocation" : "./changefeedcheckpointlocation",
"SamplingRatio" : "1.0"
}# Adding

## Read a DataFrame

In [5]:
# Read a DataFrame
# SparkSession available as 'spark'.
tweets = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**tweetsConfig).load()


##Get the number of tweets
This provides the count of tweets; it will start off 0 and then continue growing as you re-run the cell below.

In [7]:
# Get the number of tweets
tweets.count()
# display(tweets)
# tweets.printSchema()

## Create tweets TempView
This way we can run SQL statements within the notebook

In [9]:
# Create tweets TempView
# This way we can run SQL statements within the notebook
tweets.createOrReplaceTempView("tweets")

In [10]:
%sql
select count(1) from tweets

## Show various attributes of the first 20 tweets

In [12]:
%sql
select 
  id,
  created_at,
  user.screen_name,
  user.location,
  text,
  retweet_count,
  entities.hashtags,
  entities.user_mentions,
  favorited,
  source
from tweets
limit 20

## Determine Top 10 hashtags for the tweets

In [14]:
%sql
select concat(concat((dense_rank() OVER (PARTITION BY 1 ORDER BY tweets DESC)-1), '. '), text) as hashtags, tweets
from (
select hashtags.text, count(distinct id) as tweets
from (
select 
  explode(entities.hashtags) as hashtags,
  id
from tweets
) a
group by hashtags.text
order by tweets desc
limit 10
) b

# [APPENDIX] Connnecting to Cosmos DB using pydocumentdb

In [16]:
# Import Necessary Libraries
import pydocumentdb
from pydocumentdb import document_client
from pydocumentdb import documents
import datetime

# Configuring the connection policy (allowing for endpoint discovery)
connectionPolicy = documents.ConnectionPolicy()
connectionPolicy.EnableEndpointDiscovery 
connectionPolicy.PreferredLocations = ["Japan East", "Japan West"]


# Set keys to connect to Cosmos DB 
masterKey = 'b3KPBHQvWTD8prYsQDiHlaM8kDzBholipD1sgshjT60ayDK9WkvRAT0Qywsi5FkcyKsYcvF4iIrUEBBzaZwJKw==' 
host = 'https://videoanalytics.documents.azure.com:443/'
client = document_client.DocumentClient(host, {'masterKey': masterKey}, connectionPolicy)


# Configure Database and Collections
databaseId = 'asset'
collectionId = 'meta'

# Configurations the Cosmos DB client will use to connect to the database and collection
dbLink = 'dbs/' + databaseId
collLink = dbLink + '/colls/' + collectionId


# Set query parameter
#querystr = "SELECT c.City FROM c WHERE c.State='WA'"
querystr= "SELECT * FROM c"
# Query documents
query = client.QueryDocuments(collLink, querystr, options=None, partition_key=None)

# Query for partitioned collections
# query = client.QueryDocuments(collLink, query, options= { 'enableCrossPartitionQuery': True }, partition_key=None)

# Push into list `elements`
elements = list(query)
print(elements)