## 1. Read Streaming Data, Define Schema for Streaming Data and Load into Silver Table ##

In [None]:
# Databricks notebook source
from pyspark.sql.types import *
import pyspark.sql.functions as F
from datetime import datetime as dt
import json

In [None]:
connectionString = "Endpoint=sb://cloud-namespace.servicebus.windows.net/;SharedAccessKeyName=policy;SharedAccessKey=jFkGlYICLs65V/xKnC0mBg4D4UL+S/uVI+AEhB+83vw=;EntityPath=cloud-hub"
ehConf = {}
startOffset = "-1"
endTime = dt.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
startingEventPosition = {
    "offset": startOffset,
    "seqNo": -1,  # not in use
    "enqueuedTime": None,  # not in use
    "isInclusive": True,
}
endingEventPosition = {
    "offset": None,  # not in use
    "seqNo": -1,  # not in use
    "enqueuedTime": endTime,
    "isInclusive": True,
}
ehConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
ehConf["eventhubs.endingPosition"] = json.dumps(endingEventPosition)

ehConf[
    "eventhubs.connectionString"
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
ehConf["eventhubs.consumerGroup"] = "$Default"

json_schema = StructType(
    [
        StructField("Tweet", StringType(), True),
        StructField("polarity", FloatType(), True),
        StructField("subjectivity", FloatType(), True),
        StructField("Sentiment", StringType(), True)
    ]
)


df = spark.readStream.format("eventhubs").options(**ehConf).load()

df = df.withColumn("body", F.from_json(df.body.cast("string"), json_schema))

df = df.select(
    F.col("body.Tweet"), F.col("body.polarity"), F.col("body.subjectivity"), F.col("body.Sentiment")
)

df = df.writeStream.format("delta") \
		    	.outputMode("append") \
            .option("path","dbfs:/user/hive/warehouse/twitter_database.db/twitter_bronze") \
            .option("checkpointLocation", "/tmp/delta/_checkpoints/")\
            .option("database", "twitter_database") \
   			.option("table", "twitter_bronze") \
			.start()

df.awaitTermination()

In [None]:
%sql
USE twitter_database

## 2. Drop null data ##

In [None]:
# Drop null data
df_twitter_silver = spark.sql('''SELECT *
                                 FROM twitter_bronze
                                 WHERE Tweet IS NOT NULL AND  polarity IS NOT NULL
                                                AND subjectivity IS NOT NULL
                                                AND Sentiment IS NOT NULL''')

## 3. Create silver table for not null data ##

In [None]:
%sql
CREATE TABLE IF NOT EXISTS twitter_silver(
  Tweet STRING,
  polarity FLOAT,
  subjectivity FLOAT,
  Sentiment STRING
)

In [None]:
df_twitter_silver.write.insertInto("twitter_silver")

## 4. Create Gold Table in Database 'twitter_database' ##

In [None]:
%sql
CREATE TABLE IF NOT EXISTS twitter_gold(
  Sentiment STRING,
  NumberOfSentiment INT
)

## 5. This Gold Table wil store sentiment and numberofsentiment ##

In [None]:
# Select necessary columns
df_twitter_gold = spark.sql('''SELECT Sentiment, Count(Sentiment) AS NumberOfSentiment
                                FROM twitter_silver
                                GROUP BY Sentiment''')

In [None]:
# Write into table taxi_gold by df_taxi_gold
df_twitter_gold.write.insertInto("twitter_gold")