# Structured Streaming with Azure EventHubs 

## Learning Objectives
By the end of this lesson, you should be able to:
* Establish a connection with Event Hubs in Spark
* Subscribe to and configure an Event Hubs stream
* Parse JSON records from Event Hubs

## Library Requirements

The Maven library with coordinate `com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.18`

## Resources
- [Docs for Azure Event Hubs connector](https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/streaming-event-hubs)
- [Documentation on how to install Maven libraries](https://docs.azuredatabricks.net/user-guide/libraries.html#maven-or-spark-package)
- [Spark-EventHub debugging FAQ](https://github.com/Azure/azure-event-hubs-spark/blob/master/FAQ.md)

## Lab Setup

To use this notebook in your own Databricks environment, you will need to create libraries, using the [Create Library](https://docs.azuredatabricks.net/user-guide/libraries.html) interface in Azure Databricks. Follow the steps below to attach the `azure-eventhubs-spark` library to your cluster:

1. In the left-hand navigation menu of your Databricks workspace, select **Clusters**, then select your cluster in the list. If it's not running, start it now.

  ![Select cluster](https://databricksdemostore.blob.core.windows.net/images/10-de-learning-path/select-cluster.png)

2. Select the **Libraries** tab (1), then select **Install New** (2). In the Install Library dialog, select **Maven** under Library Source (3). Under Coordinates, paste **com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.18** (4), then select **Install**.
  
  ![Databricks new Maven library](https://raw.githubusercontent.com/MicrosoftDocs/mslearn_databricks/main/images/install-eventhubs-spark-library.png)

3. Wait until the library successfully installs before continuing.

  ![Library installed](https://databricksdemostore.blob.core.windows.net/images/10-de-learning-path/eventhubs-spark-library-installed.png)

Once complete, return to this notebook to continue with the lesson.

### Getting Started

Run the following cell to configure our classroom and set up a local streaming file read that we'll be writing to Event Hubs.

In [0]:
%run ./Includes/Streaming-Demo-Setup

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Azure Event Hubs</h2>

Microsoft Azure Event Hubs is a fully managed, real-time data ingestion service.
You can stream millions of events per second from any source to build dynamic data pipelines and immediately respond to business challenges.
It integrates seamlessly with a host of other Azure services.

Event Hubs can be used in a variety of applications such as
* Anomaly detection (fraud/outliers)
* Application logging
* Analytics pipelines, such as clickstreams
* Archiving data
* Transaction processing
* User telemetry processing
* Device telemetry streaming
* <b>Live dashboarding</b>

### Define Connection Strings and Create Configuration Object

This cell uses a connection string to create a simple `EventHubsConf` object, which will be used to connect.

To run this notebook, you'll need to configure Event Hubs and provide the relavent information in the following format:
```
Endpoint=sb://<event_hubs_namespace>.servicebus.windows.net/;SharedAccessKeyName=<key_name>;SharedAccessKey=<signing_key>=;EntityPath=<event_hubs_instance>
```

Note that during the setup steps prior to this noteobok, you were instructed to copy the `Connect string-primary key`; you will need to append the EntityPath with the name of your Event Hub instance to that copied string to successfully connect.

In [0]:
%scala

import org.apache.spark.eventhubs.{EventHubsConf, EventPosition}

val connectionString = "Endpoint=sb://<event_hubs_namespace>.servicebus.windows.net/;SharedAccessKeyName=<key_name>;SharedAccessKey=<signing_key>=;EntityPath=<event_hubs_instance>"

val ehWriteConf = EventHubsConf(connectionString)

### Write Stream to Event Hub to Produce Stream

Below, we configure a streaming write to Event Hubs. Refer to the docs for additional ways to [write data to Event Hubs](https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-integration.md#writing-data-to-eventhubs).

In [0]:
%scala
import org.apache.spark.sql.streaming.Trigger.ProcessingTime

val checkpointPath = userhome + "/event-hub/write-checkpoint"
dbutils.fs.rm(checkpointPath,true)

val activityStreamDF =activityStreamDF
  .writeStream
  .format("eventhubs")
  .outputMode("update")
  .options(ehWriteConf.toMap)
  .trigger(ProcessingTime("25 seconds"))
  .option("checkpointLocation", checkpointPath)
  .start()

In [0]:
activityStreamDF = spark.writeStream.format("eventhubs").outputMode("update").options(ehWriteConf.toMap).trigger(ProcessingTime("25 seconds")).option("checkpointLocation",checkpointPath).start()

## Event Hubs Configuration

Above, a simple `EventHubsConf` object is used to write data. There are [numerous additional options for configuration](https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-integration.md#eventhubsconf). Below, we specify an `EventPosition` ([docs](https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/streaming-event-hubs#eventposition)) and limit our throughput by setting `MaxEventsPerTrigger`.

In [0]:
%scala

val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromStartOfStream)
  .setMaxEventsPerTrigger(10)

### READ Stream using EventHub

The `readStream` method is a <b>transformation</b> that outputs a DataFrame with specific schema specified by `.schema()`.

In [0]:
%scala

spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

val eventStreamDF = spark.readStream
  .format("eventhubs")
  .options(eventHubsConf.toMap)
  .load()

eventStreamDF.printSchema()

Most of the fields in this response are metadata describing the state of the Event Hubs stream. We are specifically interested in the `body` field, which contains our JSON payload.

Noting that it's encoded as binary, as we select it, we'll cast it to a string.

In [0]:
%scala
val bodyDF = eventStreamDF.select('body.cast("STRING"))

Each line of the streaming data becomes a row in the DataFrame once an <b>action</b> such as `writeStream` is invoked.

Notice that nothing happens until you engage an action, i.e. a `display()` or `writeStream`.

In [0]:
%scala
display(bodyDF, streamName= "bodyDF")

While we can see our JSON data now that it's cast to string type, we can't directly manipulate it.

Before proceeding, stop this stream. We'll continue building up transformations against this streaming DataFrame, and a new action will trigger an additional stream.

In [0]:
%scala
for (s <- spark.streams.active if s.name == "bodyDF") s.stop()

## <img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Parse the JSON payload

The EventHub acts as a sort of "firehose" (or asynchronous buffer) and displays raw data in the JSON format.

If desired, we could save this as raw bytes or strings and parse these records further downstream in our processing.

Here, we'll directly parse our data so we can interact with the fields.

The first step is to define the schema for the JSON payload.

:SIDENOTE: Both time fields are encoded as `LongType` here because of non-standard formatting.

In [0]:
%scala
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType, DoubleType}

lazy val schema = StructType(List(
  StructField("Arrival_Time", LongType),
  StructField("Creation_Time", LongType),
  StructField("Device", StringType),
  StructField("Index", LongType),
  StructField("Model", StringType),
  StructField("User", StringType),
  StructField("gt", StringType),
  StructField("x", DoubleType),
  StructField("y", DoubleType),
  StructField("z", DoubleType),
  StructField("geolocation", StructType(List(
    StructField("PostalCode", StringType),
    StructField("StateProvince", StringType),
    StructField("city", StringType),
    StructField("country", StringType)))),
  StructField("id", StringType)))

### Parse the data

Next we can use the function `from_json` to parse out the full message with the schema specified above.

When parsing a value from JSON, we end up with a single column containing a complex object.

In [0]:
%scala

import org.apache.spark.sql.functions.from_json

val parsedEventsDF = bodyDF.select(
  from_json('body, schema).alias("json"))

parsedEventsDF.printSchema()

Note that we can further parse this to flatten the schema entirely and properly cast our time fields.

In [0]:
%scala

import org.apache.spark.sql.functions.{from_unixtime, col}

val flatSchemaDF = parsedEventsDF
  .select(from_unixtime(col("json.Arrival_Time")/1000).alias("Arrival_Time").cast("timestamp"),
          (col("json.Creation_Time")/1E9).alias("Creation_Time").cast("timestamp"),
          col("json.Device").alias("Device"),
          col("json.Index").alias("Index"),
          col("json.Model").alias("Model"),
          col("json.User").alias("User"),
          col("json.gt").alias("gt"),
          col("json.x").alias("x"),
          col("json.y").alias("y"),
          col("json.z").alias("z"),
          col("json.id").alias("id"),
          col("json.geolocation.country").alias("country"),
          col("json.geolocation.city").alias("city"),
          col("json.geolocation.PostalCode").alias("PostalCode"),
          col("json.geolocation.StateProvince").alias("StateProvince"))

This flat schema provides us the ability to view each nested field as a column.

In [0]:
%scala
display(flatSchemaDF)

### Stop all active streams

In [0]:
%scala
for (s <- spark.streams.active)
  s.stop