# Stream data to from Kafka to Cosmos DB

Spark Structured Streaming to retrieve data from Kafka on HDInsight and store it into Azure Cosmos DB. It uses the [Azure CosmosDB Spark Connector] to write to a Cosmos DB CQL/SQL API database.

## Requirements

* An Azure Virtual Network
* A Spark on HDInsight 3.6 cluster, inside the virtual network
* A Kafka on HDInsight cluster, inside the virtual network
* A Cosmos DB SQL API database

## Load packages

* spark-sql-kafka-0-10_2.11, version 2.1.0 - Used to read from Kafka.
* azure-cosmosdb-spark_2.1.0_2.11, version 1.0.0 - The Spark connector used to communicate with Azure Cosmos DB.
* azure-documentdb, version 1.15.1 - The DocumentDB SDK. This is used by the connector to communicate with Cosmos DB.

In [1]:
%%configure -f
{
    "name":"Spark-to-Cosmos_DB_Connector", 
    "executorMemory": "8G", 
    "executorCores": 2, 
    "numExecutors":9,
    "driverMemory" : "2G",
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,com.microsoft.azure:azure-cosmosdb-spark_2.2.0_2.11:1.0.0,com.microsoft.azure:azure-documentdb:1.15.1", 
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7,application_1543170503042_0008,spark,busy,Link,Link,


## Set the Kafka broker hosts information
"wn0-kafka.yr21tghw1lbedojd2c1yzdkfqb.dx.internal.cloudapp.net:9092,wn1-kafka.yr21tghw1lbedojd2c1yzdkfqb.dx.internal.cloudapp.net:9092"

In [2]:
// The Kafka broker hosts and topic used to read to Kafka
val kafkaBrokers="wn0-kafka.yr21tghw1lbedojd2c1yzdkfqb.dx.internal.cloudapp.net:9092,wn1-kafka.yr21tghw1lbedojd2c1yzdkfqb.dx.internal.cloudapp.net:9092"
val kafkaTopic="trafficdata"

println("broker and topic set.")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
8,application_1543170503042_0009,spark,idle,Link,Link,✔


SparkSession available as 'spark'.
broker and topic set.

## Configure the Cosmos DB connection information

Using the information in [Create a document database using Java and the Azure portal](https://docs.microsoft.com/en-us/azure/cosmos-db/create-sql-api-java) we create a database and collection, then retrieve the endpoint, master key, and preferred region information.

In [3]:
// Import Necessary Libraries
import org.joda.time._
import org.joda.time.format._

// Current version of the connector
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSinkProvider
import com.microsoft.azure.cosmosdb.spark.config.Config

var configMap = Map(
    "Endpoint" -> "https://bigdatanew.documents.azure.com:443/",
    "Masterkey" -> "NLDkyJR9hDdRs4eexlqXvKPUVUCM5YuXebHrhQj9kkNpZg1pMXF3731FLSmPqEo7vDWuqAuzKJs3e8p7CtvDkA==",
    "Database" -> "trafficdata",
    // use a ';' to delimit multiple regions
    "PreferredRegions" -> "West US;",
    "Collection" -> "trafficcollection"
)

println("Cosmos DB configuration set.")

Cosmos DB configuration set.

##Cassandra API configuration in Spark2 

Using the given instruction in https://docs.microsoft.com/en-us/azure/cosmos-db/cassandra-spark-hdinsight
The Spark connector for Cassandra requires that the Cassandra connection details to be initialized as part of the Spark context. 
When you launch a Jupyter notebook, the spark session and context are already initialized and it is not advisable to stop and reinitialize the Spark context unless it's complete with every configuration set as part of the HDInsight default Jupyter notebook start-up. One workaround is to add the Cassandra instance details to Ambari, Spark2 service configuration directly. This is a one-time activity per cluster that requires a Spark2 service restart.

1.Go to Ambari, Spark2 service and select configs

2.Then go to custom spark2-defaults and add a new property with the following, and restart Spark2 service.

spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
spark.cassandra.connection.port=10350<br>
spark.cassandra.connection.ssl.enabled=true<br>
spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>

In [None]:
package com.microsoft.azure.cosmosdb.cassandra

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Random

  // MAIN
  def main (arg: Array[String]): Unit = {

    // CONFIG. NOTE: Please read the README.md for more details regarding each conf value.
    val conf = new SparkConf(true)
      .setAppName("SampleCosmosDBCassandraApp")
      // Cosmos DB Cassandra API Connection configs
      .set("spark.cassandra.connection.host", "<COSMOSDB_CASSANDRA_ENDPOINT>")
      .set("spark.cassandra.connection.port", "10350")
      .set("spark.cassandra.connection.ssl.enabled", "true")
      .set("spark.cassandra.auth.username", "COSMOSDB_ACCOUNTNAME")
      .set("spark.cassandra.auth.password", "COSMODB_KEY")
      // Parallelism and throughput configs.
      .set("spark.cassandra.output.batch.size.rows", "1")
      // NOTE: The values below are meant as defaults for a sample workload. Please read the README.md for more information on fine tuning these conf value.
      .set("spark.cassandra.connection.connections_per_executor_max", "10")
      .set("spark.cassandra.output.concurrent.writes", "100")
      .set("spark.cassandra.concurrent.reads", "512")
      .set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
      .set("spark.cassandra.connection.keep_alive_ms", "60000")
      // Cosmos DB Connection Factory, configured with retry policy for rate limiting.
      .set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.CosmosDbConnectionFactory")


    // SPARK CONTEXT
    val sc = new SparkContext(conf)

    // CREATE KEYSPACE/TABLE, AND ANY ARBITRARY QUERY STRING.
    CassandraConnector(conf).withSessionDo { session =>
      session.execute("CREATE KEYSPACE kspc WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
      session.execute("CREATE TABLE kspc.tble (id1 int, id2 int, col1 int, col2 text, PRIMARY KEY(id1, id2))")
    }

    // INSERT DATA
    val collection = sc.parallelize(Seq((1, 1, 1, "text_1"), (2, 2, 2, "text_2")))
    collection.saveToCassandra("kspc", "tble", SomeColumns("id1", "id2", "col1", "col2"))

    // INSERT GENERATED DATA
    randomDataPerPartitionId(sc, DoP= 10, start = 0, end = 10000, col1 = 100000, col2 = 100000).
      saveToCassandra("large", "large", SomeColumns("id1", "id2", "col1", "col2"))
      
    sc.stop
  }
}

## Define the schema and source stream

The following cell creates the stream that reads from Kafka. Data read from Kafka contains several columns. In this case, we only use the `value` column, as it contains the taxi trip data written by the other notebook. To make this data easier to work with, a schema is applied.

In [4]:
// Import bits useed for declaring schemas and working with JSON data
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Define a schema for the data
val schema = (new StructType).add("_direction", StringType).add("_fromst", StringType).add("_last_updt",StringType).add("_length",StringType).add("_lif_lat",StringType).add("_lit_lat",StringType).add("_lit_lon",StringType).add("_strheading",StringType).add("_tost",StringType).add("_traffic",StringType).add("segmentid",StringType).add("start_lon",StringType).add("street",StringType)
// Reproduced here for readability
//val schema = (new StructType)
//   .add("_direction", StringType)
//   .add("_fromost", StringType)
//   .add("_last_updt", StringType)
//   .add("_length", StringType)
//   .add("_lif_lat",StringType)
//   .add("_lit_lat",StringType)
//   .add("_lit_lon",StringType)
//   .add("_strheading",StringType)
//   .add("_tost",StringType)
//   .add("_traffic",StringType)
//   .add("segmentid",StringType)
//   .add("start_lon",StringType)
//   .add("street",StringType)
// Reproduced here for readability

// Read from the Kafka stream source
val kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets","earliest").load()

// Select the value of the Kafka message and apply the trip schema to it
val trafficData = kafka.select(
    from_json(col("value").cast("string"), schema) as "traffic")

// The output of this cell is similar to the following value:
// taxiData: org.apache.spark.sql.DataFrame = [traffic: struct<_direction: string, _fromst: string ... 11 more fields>]

trafficData: org.apache.spark.sql.DataFrame = [traffic: struct<_direction: string, _fromst: string ... 11 more fields>]

## Write the data to Cosmos DB

The following cell selects the traffic data from the stream and writes it to Cosmos DB. This is the data structure that was created in the previous cell by applying a schema to the value data retrieved from kafka.

This stream only runs for 10 seconds (10000ms).

In [5]:
trafficData.select("traffic").writeStream.format(classOf[CosmosDBSinkProvider].getName).outputMode("append").options(configMap).option("checkpointLocation", "cosmoscheckpointlocation").start.awaitTermination(10000)
println("Stream finished.")

Stream finished.

## To verify that data is in Cosmos DB

In the [Azure portal](https://portal.azure.com), select your Cosmos DB account, and then select __Data Explorer__. From the dropdown, select the database and collection that the data is written to.Select __Refresh__ before the data appears. Select the id of one of the entries to view the data in Cosmos DB. The document contain data similar to the following:

```json
{
  "traffic": {
    "_direction":"EB",
    "_fromst":"Pulaski",
    "_last_updt":"2018-12-01 01:10:18.0",
    "_length":"0.5",
    "_lif_lat":"41.7930671862",
    "_lit_lon":"-87.7136071496",
    "_strheading":"W",
    "_tost":"Central Park",
    "_traffic":"-1",
    "segmentid":"1",
    "start_lon":"-87.7231602513",
    "street":"55th",
  },
  "id": "abfe6ff1-51a7-46a6-9600-1c330166cf12"
}
```

## Connect Power BI and Azure  Cosmos DB to create the dashboard

1.Run Power BI Desktop.
2.You can Get Data, see Recent Sources, or Open Other Reports directly from the welcome screen. 
  Select the "X" at the top right corner to close the screen. 
3.The Report view of Power BI Desktop is displayed.
4.Select the Home ribbon, then click on Get Data. The Get Data window should appear.
5.Click on Azure, select Azure Cosmos DB (Beta), and then click Connect.
6.On the Preview Connector page, click Continue. The Azure Cosmos DB window appears.
7.Specify the Azure Cosmos DB account endpoint URL to retrieve the data and then click OK. 
8.Retrieve the URL from the URI box in the Keys blade of the Azure portal.
9.When the account is successfully connected, the Navigator pane appears. The Navigator shows a list of databases under the account.
10.Populated data from Cosmos Db to create the dashboard in Power BI and is scheduled for refresh.