# Reading From Eventhub Generated Data Lake
## Cleanup Mount if Needed

In [0]:
# Unmount the previous version if pointing to a new blob source 
# dbutils.fs.unmount(mount_point = "/mnt/telemetry")

## Mount the Drive Using a Service Principle
See instructions in the [Azure Databricks Docs](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-use-databricks-spark#create-a-container-and-mount-it) 
- CLIENT ID is from the App Registration (Service Principle)
- CLIENT SECRET is from App Registration - Secrets and generating one
- TENANT ID is the GUID from the App Registration screen
- CONTAINER NAME is the Storage Account Container where the EventHub is capturing to.
- STORAGE ACCOUNT NAME is the Storage Account where the EventHub is capturing to.
- FOLDER PTH TO FIRST DIGIT is usually 'eventHubsName/eventHubName' but whatever path until you get to a numeral (but not including the numeral).

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": "<CLIENT ID>",
       "fs.azure.account.oauth2.client.secret": "<CLIENT SECRET>",
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<TENANT ID>/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

dbutils.fs.mount(
source = "abfss://<CONTAINER NAME>@<STORAGE ACCOUNT NAME>.dfs.core.windows.net/<FOLDER PATH TO FIRST DIGIT>",
mount_point = "/mnt/telemetry",
extra_configs = configs)

## Load All Events with Wildcards

In [0]:
# Get all files
avroDF= spark.read.format("avro").load("/mnt/telemetry/*/*/*/*/*/*/*.avro")
display(avroDF)

SequenceNumber,Offset,EnqueuedTimeUtc,SystemProperties,Properties,Body
12,1816,7/29/2021 6:39:45 PM,"Map(x-opt-enqueued-time -> List(1627583985893, null, null, null))",Map(),eyJpZCI6IjM4MDAxNDYwLWE3YTktNDRiYi04YTA0LWY2NjBkZDdiZDE3ZCIsIk5hbWUiOiJTYW1wbGUwIiwiRGF0ZSI6IjIwMjEtMDctMjlUMTE6Mzk6NDYuODkzODQ0Ni0wNzowMCJ9
13,1968,7/29/2021 6:39:46 PM,"Map(x-opt-enqueued-time -> List(1627583986868, null, null, null))",Map(),eyJpZCI6IjgyMmUzYjNkLTk0MmUtNDdhZi1iNDM3LTAzYWI5MDExOGNkMSIsIk5hbWUiOiJTYW1wbGUxIiwiRGF0ZSI6IjIwMjEtMDctMjlUMTE6Mzk6NDcuOTM0NDM0NS0wNzowMCJ9
14,2120,7/29/2021 6:39:47 PM,"Map(x-opt-enqueued-time -> List(1627583987665, null, null, null))",Map(),eyJpZCI6Ijk0OTU3Njc1LWNiYmEtNGQxZS1iN2M3LWZjNWYwMjg5ODRiYiIsIk5hbWUiOiJTYW1wbGUyIiwiRGF0ZSI6IjIwMjEtMDctMjlUMTE6Mzk6NDguNzQzNTc5Ni0wNzowMCJ9
15,2272,7/29/2021 6:41:54 PM,"Map(x-opt-enqueued-time -> List(1627584114099, null, null, null))",Map(),eyJpZCI6IjYzNzUwZDY3LTdmMTAtNGQ0Yy1iZWJmLTA3OTExNmM5MmI2ZiIsIk5hbWUiOiJTYW1wbGUwIiwiRGF0ZSI6IjIwMjEtMDctMjlUMTE6NDE6NTUuMTEzNjcxOS0wNzowMCJ9
16,2424,7/29/2021 6:41:55 PM,"Map(x-opt-enqueued-time -> List(1627584115193, null, null, null))",Map(),eyJpZCI6IjliOGQ3YWNkLWMyNmEtNGJmOC04NDNiLTAxNWRlZjgwMDg4NSIsIk5hbWUiOiJTYW1wbGUxIiwiRGF0ZSI6IjIwMjEtMDctMjlUMTE6NDE6NTYuMjgyMDk0Mi0wNzowMCJ9
17,2576,7/29/2021 6:41:56 PM,"Map(x-opt-enqueued-time -> List(1627584116168, null, null, null))",Map(),eyJpZCI6ImY5YTM4NjEzLTA5NTItNGFlMy1hM2RiLTdkMWIxOWY1MmZjMyIsIk5hbWUiOiJTYW1wbGUyIiwiRGF0ZSI6IjIwMjEtMDctMjlUMTE6NDE6NTcuMjQzMzkwNi0wNzowMCJ9
21,3176,7/29/2021 6:58:31 PM,"Map(x-opt-enqueued-time -> List(1627585111936, null, null, null))",Map(),eyJpZCI6IjFiMjcyODg3LThjZWQtNGRhMS05NTQ5LWNhMWI2OTdhZGMwYiIsIk5hbWUiOiJTYW1wbGUwIiwiRGF0ZSI6IjIwMjEtMDctMjlUMTE6NTg6MzIuODkwNDE5OC0wNzowMCJ9
22,3328,7/29/2021 6:58:32 PM,"Map(x-opt-enqueued-time -> List(1627585112780, null, null, null))",Map(),eyJpZCI6Ijc1NWQ1M2IxLTNhODItNDZkMC1iNDVlLTdiMzJhNjQzZTU4NyIsIk5hbWUiOiJTYW1wbGUxIiwiRGF0ZSI6IjIwMjEtMDctMjlUMTE6NTg6MzMuODU5ODk2Ny0wNzowMCJ9
23,3480,7/29/2021 6:58:33 PM,"Map(x-opt-enqueued-time -> List(1627585113702, null, null, null))",Map(),eyJpZCI6ImViZTEyNTMwLWVkNzctNDBjZi05YzgyLTdlZDA4NDdiNjhlNCIsIk5hbWUiOiJTYW1wbGUyIiwiRGF0ZSI6IjIwMjEtMDctMjlUMTE6NTg6MzQuNzgzMzUxMS0wNzowMCJ9
24,3632,7/29/2021 7:00:41 PM,"Map(x-opt-enqueued-time -> List(1627585241824, null, null, null))",Map(),eyJpZCI6ImQ2Mzg4Mzk4LWJlZWEtNDU5Yy05NzAyLTUzNTI0NmQwOTkwZCIsIk5hbWUiOiJTYW1wbGUwIiwiRGF0ZSI6IjIwMjEtMDctMjlUMTI6MDA6NDIuODQyMjEwMS0wNzowMCJ9


## Parse the JSON Messages from the Body Column
Much of the remainder of this example adapted from the blog post [here](https://simonlearningsqlserver.wordpress.com/2020/01/15/reading-event-hub-capture-avro-json-messages-using-azure-databricks/)

In [0]:
# clean up the body column to get the encoded JSON
bodyDF = avroDF.withColumn("Body", avroDF.Body.cast("string")).select("Body")
display(bodyDF)

Body
"{""id"":""38001460-a7a9-44bb-8a04-f660dd7bd17d"",""Name"":""Sample0"",""Date"":""2021-07-29T11:39:46.8938446-07:00""}"
"{""id"":""822e3b3d-942e-47af-b437-03ab90118cd1"",""Name"":""Sample1"",""Date"":""2021-07-29T11:39:47.9344345-07:00""}"
"{""id"":""94957675-cbba-4d1e-b7c7-fc5f028984bb"",""Name"":""Sample2"",""Date"":""2021-07-29T11:39:48.7435796-07:00""}"
"{""id"":""63750d67-7f10-4d4c-bebf-079116c92b6f"",""Name"":""Sample0"",""Date"":""2021-07-29T11:41:55.1136719-07:00""}"
"{""id"":""9b8d7acd-c26a-4bf8-843b-015def800885"",""Name"":""Sample1"",""Date"":""2021-07-29T11:41:56.2820942-07:00""}"
"{""id"":""f9a38613-0952-4ae3-a3db-7d1b19f52fc3"",""Name"":""Sample2"",""Date"":""2021-07-29T11:41:57.2433906-07:00""}"
"{""id"":""1b272887-8ced-4da1-9549-ca1b697adc0b"",""Name"":""Sample0"",""Date"":""2021-07-29T11:58:32.8904198-07:00""}"
"{""id"":""755d53b1-3a82-46d0-b45e-7b32a643e587"",""Name"":""Sample1"",""Date"":""2021-07-29T11:58:33.8598967-07:00""}"
"{""id"":""ebe12530-ed77-40cf-9c82-7ed0847b68e4"",""Name"":""Sample2"",""Date"":""2021-07-29T11:58:34.7833511-07:00""}"
"{""id"":""d6388398-beea-459c-9702-535246d0990d"",""Name"":""Sample0"",""Date"":""2021-07-29T12:00:42.8422101-07:00""}"


## Create a Schema to split up the JSON into a Dataframe

In [0]:
from pyspark.sql.types import *

# Manually done
json_schema = StructType(
  [
    StructField("id",StringType(),True),
    StructField("Name",StringType(),True),
    StructField("Date",DateType(),True)
  ]
)

# Automatically Generated (Inferred) Schema
json_schema_auto = spark.read.json(bodyDF.rdd.map(lambda row: row.Body)).schema

## Parse the JSON into a new Dataframe

In [0]:
from pyspark.sql.functions import from_json, col

json_df = bodyDF.withColumn("Body", from_json(col("Body"), json_schema_auto))
display(json_df)

Body
"List(2021-07-29T11:39:46.8938446-07:00, Sample0, 38001460-a7a9-44bb-8a04-f660dd7bd17d)"
"List(2021-07-29T11:39:47.9344345-07:00, Sample1, 822e3b3d-942e-47af-b437-03ab90118cd1)"
"List(2021-07-29T11:39:48.7435796-07:00, Sample2, 94957675-cbba-4d1e-b7c7-fc5f028984bb)"
"List(2021-07-29T11:41:55.1136719-07:00, Sample0, 63750d67-7f10-4d4c-bebf-079116c92b6f)"
"List(2021-07-29T11:41:56.2820942-07:00, Sample1, 9b8d7acd-c26a-4bf8-843b-015def800885)"
"List(2021-07-29T11:41:57.2433906-07:00, Sample2, f9a38613-0952-4ae3-a3db-7d1b19f52fc3)"
"List(2021-07-29T11:58:32.8904198-07:00, Sample0, 1b272887-8ced-4da1-9549-ca1b697adc0b)"
"List(2021-07-29T11:58:33.8598967-07:00, Sample1, 755d53b1-3a82-46d0-b45e-7b32a643e587)"
"List(2021-07-29T11:58:34.7833511-07:00, Sample2, ebe12530-ed77-40cf-9c82-7ed0847b68e4)"
"List(2021-07-29T12:00:42.8422101-07:00, Sample0, d6388398-beea-459c-9702-535246d0990d)"


## Pick the JSON Fields to Generate into a new tabular Dataframe (with new columns)

In [0]:
df = json_df.select(
  col("Body.id"),
  col("Body.Date"),
  col("Body.Name")
)

display(df)

id,Date,Name
38001460-a7a9-44bb-8a04-f660dd7bd17d,2021-07-29T11:39:46.8938446-07:00,Sample0
822e3b3d-942e-47af-b437-03ab90118cd1,2021-07-29T11:39:47.9344345-07:00,Sample1
94957675-cbba-4d1e-b7c7-fc5f028984bb,2021-07-29T11:39:48.7435796-07:00,Sample2
63750d67-7f10-4d4c-bebf-079116c92b6f,2021-07-29T11:41:55.1136719-07:00,Sample0
9b8d7acd-c26a-4bf8-843b-015def800885,2021-07-29T11:41:56.2820942-07:00,Sample1
f9a38613-0952-4ae3-a3db-7d1b19f52fc3,2021-07-29T11:41:57.2433906-07:00,Sample2
1b272887-8ced-4da1-9549-ca1b697adc0b,2021-07-29T11:58:32.8904198-07:00,Sample0
755d53b1-3a82-46d0-b45e-7b32a643e587,2021-07-29T11:58:33.8598967-07:00,Sample1
ebe12530-ed77-40cf-9c82-7ed0847b68e4,2021-07-29T11:58:34.7833511-07:00,Sample2
d6388398-beea-459c-9702-535246d0990d,2021-07-29T12:00:42.8422101-07:00,Sample0


## Filter by Date

In [0]:
filtered_df = df.where((df['Date'] >= '2021-07-29')
                     & (df['Date'] < '2021-07-31'))

display(filtered_df)

id,Date,Name
38001460-a7a9-44bb-8a04-f660dd7bd17d,2021-07-29T11:39:46.8938446-07:00,Sample0
822e3b3d-942e-47af-b437-03ab90118cd1,2021-07-29T11:39:47.9344345-07:00,Sample1
94957675-cbba-4d1e-b7c7-fc5f028984bb,2021-07-29T11:39:48.7435796-07:00,Sample2
63750d67-7f10-4d4c-bebf-079116c92b6f,2021-07-29T11:41:55.1136719-07:00,Sample0
9b8d7acd-c26a-4bf8-843b-015def800885,2021-07-29T11:41:56.2820942-07:00,Sample1
f9a38613-0952-4ae3-a3db-7d1b19f52fc3,2021-07-29T11:41:57.2433906-07:00,Sample2
1b272887-8ced-4da1-9549-ca1b697adc0b,2021-07-29T11:58:32.8904198-07:00,Sample0
755d53b1-3a82-46d0-b45e-7b32a643e587,2021-07-29T11:58:33.8598967-07:00,Sample1
ebe12530-ed77-40cf-9c82-7ed0847b68e4,2021-07-29T11:58:34.7833511-07:00,Sample2
d6388398-beea-459c-9702-535246d0990d,2021-07-29T12:00:42.8422101-07:00,Sample0
