# 1. Import Spark Libraries

In [147]:
import findspark
findspark.init("/usr/local/spark-3.2.0-bin-hadoop3.2")
from IPython.display import display, clear_output
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.streaming import DataStreamReader
from pyspark.sql import functions as f
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
import html
import numpy as np
import json
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
pd.options.display.max_columns = None
pd.options.display.max_rows = 250
pd.options.display.max_colwidth = 150
sns.set(color_codes=True)
from pyspark.ml import PipelineModel
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import warnings
warnings.filterwarnings('ignore')

# 2. Build SparkSession

In [49]:
spark = SparkSession.builder\
.master("local[*]") \
.appName("ml") \
.config("spark.memory.fraction","0.8") \
.config("spark.executor.memory","8g") \
.config("spark.driver.memory","8g") \
.config("spark.sql.hive.filesourcePartitionFileCacheSize", "621440000") \
.config("spark.sql.sources.bucketing.maxBuckets", "100000") \
.config("spark.sql.shuffle.partitions", "2000") \
.config("spark.driver.maxResultSize","2g") \
.config("spark.shuffle.file.buffer","64k") \
.config("spark.scheduler.listenerbus.eventqueue.capacity", "1000") \
.config("spark.broadcast.blockSize", "8m") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0')\
.getOrCreate()

# 3. Load Trained Model

In [148]:
MODEL_PATH = "/home/jovyan/work/lr_model/lr_sakarya_twitter_sentiment_analysis_model.pkl"
sentiment_model = PipelineModel.load(MODEL_PATH)

21/12/20 17:16:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
21/12/20 17:16:55 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
21/12/20 17:16:55 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
                                                                                

# 4. Build Spark Stream

In [30]:
df_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "159.223.22.116:9092") \
  .option("subscribe", "tweet") \
  .option("startingOffsets", "latest") \
  .option("kafka.request.timeout.ms", "60000") \
  .option("kafka.session.timeout.ms", "30000") \
  .option("kafkaConsumer.pollTimeoutMs", "5120") \
  .option("failOnDataLoss", "true") \
  .option("fetchOffset.numRetries", "5") \
  .load()

# 5. Convert Data Coming from Kafka

In [None]:
df_schema = StructType() \
        .add("createdAt", IntegerType()) \
        .add("name", StringType()) \
        .add("likeCount", IntegerType()) \
        .add("quoteCount", IntegerType()) \
        .add("replyCount", IntegerType()) \
        .add("retweetCount", IntegerType()) \
        .add("text", StringType())

df_stream_cast = df_stream.selectExpr("CAST(value AS STRING)").select(from_json(col("value"), df_schema).alias("values"))
df = df_stream_cast.select("values.*")

# 6. Clean Data

In [149]:
@udf
def html_unescape(s: str):
    if isinstance(s, str):
        return html.unescape(s)
    return s


def clean_data(df: DataFrame):
    url_regex = r"((https?|ftp|file):\/{2,3})+([-\w+&@#/%=~|$?!:,.]*)|(www.)+([-\w+&@#/%=~|$?!:,.]*)"
    email_regex = r"[\w.-]+@[\w.-]+\.[a-zA-Z]{1,}"
    user_regex = r"(@\w{1,15})"
    
    return (
        df
        
        .withColumn("original_text", f.col("text"))
        
        .withColumn("text", f.regexp_replace(f.col("text"), url_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), email_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), user_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), "#", " "))
        
        .withColumn("text", html_unescape(f.col("text")))
        
        .withColumn("text", f.regexp_replace(f.col("text"), "[^a-zA-Z']", " "))
        .withColumn("text", f.regexp_replace(f.col("text"), " +", " "))
        .withColumn("text", f.trim(f.col("text")))
        
    
        .filter(f.col("text") != "").na.drop(subset="text")
    )

df_clean=clean_data(df)


# 7. Predict Sentiment Tweets 

In [150]:
raw_sentiment = sentiment_model.transform(df_clean)

# 8. Real Time Predicted Sentiment Data

In [151]:
sentiment = raw_sentiment.select(
    "createdAt", "name", "likeCount","quoteCount","replyCount",
    "retweetCount","text", "original_text",
    f.col("prediction").alias("user_sentiment")
)

# 9. Stream Aggregation Queries

In [8]:
negative_sentiment_count = (
    sentiment.filter("user_sentiment == 0")
    .select(f.col("user_sentiment").alias("negative_sentiment"))
    .agg(f.count("negative_sentiment"))
)

positive_sentiment_count = (
    sentiment.filter("user_sentiment == 1")
    .select(f.col("user_sentiment").alias("positive_sentiment"))
    .agg(f.count("positive_sentiment"))
)

average_sentiment = sentiment.agg(f.avg("user_sentiment"))

# 10. Real Time Predicted Data to Sink Kafka

In [None]:
stream_writer = (
    sentiment
    .selectExpr("to_json(struct(*)) AS value")
    .writeStream
    .queryName("emotion_sentiment")
    .format("kafka") \
    .option("kafka.bootstrap.servers", "159.223.22.116:9092")
    .option("topic", "sentiment_topic")
    .option("checkpointLocation", "/home/jovyan/work/logs")
)

query = stream_writer.start()

# 11. Query Information

In [None]:
query.isActive

In [50]:
query.name

'emotion_sentiment'

In [None]:
df_clean.isStreaming

In [None]:
query.explain()

In [None]:
display(query.lastProgress)

# 12. For Blocking Query

In [None]:
query.awaitTermination()

In [None]:
spark.streams.awaitAnyTermination()

# 13. Stop Stream Query

In [80]:
query.stop()

# 14. Stop SparkSession

In [None]:
spark.stop()