##1. Read Streaming Data, Define Schema for Streaming Data and Load into Silver Table ##

In [0]:
# Databricks notebook source
from pyspark.sql.types import *
import pyspark.sql.functions as F
from datetime import datetime as dt
import json

In [0]:
connectionString = "Endpoint=sb://cloud-namespace.servicebus.windows.net/;SharedAccessKeyName=policy;SharedAccessKey=jFkGlYICLs65V/xKnC0mBg4D4UL+S/uVI+AEhB+83vw=;EntityPath=cloud-hub"
ehConf = {}
startOffset = "-1"
endTime = dt.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
startingEventPosition = {
    "offset": startOffset,
    "seqNo": -1,  # not in use
    "enqueuedTime": None,  # not in use
    "isInclusive": True,
}
endingEventPosition = {
    "offset": None,  # not in use
    "seqNo": -1,  # not in use
    "enqueuedTime": endTime,
    "isInclusive": True,
}
ehConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
ehConf["eventhubs.endingPosition"] = json.dumps(endingEventPosition)

ehConf[
    "eventhubs.connectionString"
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
ehConf["eventhubs.consumerGroup"] = "$Default"

json_schema = StructType(
    [
        StructField("VendorID", IntegerType(), True),
        StructField("tpep_pickup_datetime", TimestampType(), True),
        StructField("tpep_dropoff_datetime", TimestampType(), True),
        StructField("trip_distance", FloatType(), True),
        StructField("total_amount", FloatType(), True),
    ]
)



df = spark.readStream.format("eventhubs").options(**ehConf).load()

df = df.withColumn("body", F.from_json(df.body.cast("string"), json_schema))

df = df.select(
    F.col("body.VendorID"), F.col("body.tpep_pickup_datetime"), F.col("body.tpep_dropoff_datetime"), F.col("body.trip_distance"), F.col("body.total_amount")
)

df = df.writeStream.format("delta") \
		    	.outputMode("append") \
            .option("path","dbfs:/user/hive/warehouse/taxi_database.db/taxi_silver") \
            .option("checkpointLocation", "/tmp/delta/_checkpoints/")\
            .option("database", "taxi_database") \
   			.option("table", "taxi_silver") \
			.start()

df.awaitTermination()

##2. Create Gold Table in Database 'taxi_database' ##

In [0]:
%sql
USE taxi_database

In [0]:
%sql
CREATE TABLE IF NOT EXISTS taxi_gold(
  VENDORID INT,
  TPEP_PICKUP_DATETIME TIMESTAMP,
  TPEP_DROPOFF_DATETIME TIMESTAMP,
  TOTAL_AMOUNT FLOAT
)

##3. Filter Data to Load into Gold Table ##

In [0]:
# Select necessary columns
df_taxi_gold = spark.sql('''SELECT VENDORID, TPEP_PICKUP_DATETIME, TPEP_DROPOFF_DATETIME, TOTAL_AMOUNT
                            FROM taxi_silver''')

In [0]:
# Write into table taxi_gold by df_taxi_gold
df_taxi_gold.write.insertInto("taxi_gold")