# 1. Import Spark Libraries

In [147]:
import findspark
findspark.init("/usr/local/spark-3.2.0-bin-hadoop3.2")
from IPython.display import display, clear_output
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.streaming import DataStreamReader
from pyspark.sql import functions as f
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
import html
import numpy as np
import json
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
pd.options.display.max_columns = None
pd.options.display.max_rows = 250
pd.options.display.max_colwidth = 150
sns.set(color_codes=True)
from pyspark.ml import PipelineModel
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import warnings
warnings.filterwarnings('ignore')

# 2. Build SparkSession

In [49]:
spark = SparkSession.builder\
.master("local[*]") \
.appName("ml") \
.config("spark.memory.fraction","0.8") \
.config("spark.executor.memory","8g") \
.config("spark.driver.memory","8g") \
.config("spark.sql.hive.filesourcePartitionFileCacheSize", "621440000") \
.config("spark.sql.sources.bucketing.maxBuckets", "100000") \
.config("spark.sql.shuffle.partitions", "2000") \
.config("spark.driver.maxResultSize","2g") \
.config("spark.shuffle.file.buffer","64k") \
.config("spark.scheduler.listenerbus.eventqueue.capacity", "1000") \
.config("spark.broadcast.blockSize", "8m") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0')\
.getOrCreate()

# 3. Load Trained Model

In [148]:
MODEL_PATH = "/home/jovyan/work/lr_model/lr_sakarya_twitter_sentiment_analysis_model.pkl"
sentiment_model = PipelineModel.load(MODEL_PATH)

21/12/20 17:16:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
21/12/20 17:16:55 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
21/12/20 17:16:55 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
                                                                                

# 4. Build Spark Stream

In [30]:
df_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "159.223.22.116:9092") \
  .option("subscribe", "tweet") \
  .option("startingOffsets", "latest") \
  .option("kafka.request.timeout.ms", "60000") \
  .option("kafka.session.timeout.ms", "30000") \
  .option("kafkaConsumer.pollTimeoutMs", "5120") \
  .option("failOnDataLoss", "true") \
  .option("fetchOffset.numRetries", "5") \
  .load()

# 5. Convert Data Coming from Kafka

In [None]:
df_schema = StructType() \
        .add("createdAt", IntegerType()) \
        .add("name", StringType()) \
        .add("likeCount", IntegerType()) \
        .add("quoteCount", IntegerType()) \
        .add("replyCount", IntegerType()) \
        .add("retweetCount", IntegerType()) \
        .add("text", StringType())

df_stream_cast = df_stream.selectExpr("CAST(value AS STRING)").select(from_json(col("value"), df_schema).alias("values"))
df = df_stream_cast.select("values.*")

# 6. Clean Data

In [149]:
@udf
def html_unescape(s: str):
    if isinstance(s, str):
        return html.unescape(s)
    return s


def clean_data(df: DataFrame):
    url_regex = r"((https?|ftp|file):\/{2,3})+([-\w+&@#/%=~|$?!:,.]*)|(www.)+([-\w+&@#/%=~|$?!:,.]*)"
    email_regex = r"[\w.-]+@[\w.-]+\.[a-zA-Z]{1,}"
    user_regex = r"(@\w{1,15})"
    
    return (
        df
        
        .withColumn("original_text", f.col("text"))
        
        .withColumn("text", f.regexp_replace(f.col("text"), url_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), email_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), user_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), "#", " "))
        
        .withColumn("text", html_unescape(f.col("text")))
        
        .withColumn("text", f.regexp_replace(f.col("text"), "[^a-zA-Z']", " "))
        .withColumn("text", f.regexp_replace(f.col("text"), " +", " "))
        .withColumn("text", f.trim(f.col("text")))
        
    
        .filter(f.col("text") != "").na.drop(subset="text")
    )

df_clean=clean_data(df)


# 7. Predict Sentiment Tweets 

In [150]:
raw_sentiment = sentiment_model.transform(df_clean)

# 8. Real Time Predicted Sentiment Data

In [151]:
sentiment = raw_sentiment.select(
    "createdAt", "name", "likeCount","quoteCount","replyCount",
    "retweetCount","text", "original_text",
    f.col("prediction").alias("user_sentiment")
)

# 9. Stream Aggregation Queries

In [8]:
negative_sentiment_count = (
    sentiment.filter("user_sentiment == 0")
    .select(f.col("user_sentiment").alias("negative_sentiment"))
    .agg(f.count("negative_sentiment"))
)

positive_sentiment_count = (
    sentiment.filter("user_sentiment == 1")
    .select(f.col("user_sentiment").alias("positive_sentiment"))
    .agg(f.count("positive_sentiment"))
)

average_sentiment = sentiment.agg(f.avg("user_sentiment"))

# 10. Real Time Predicted Data to Sink Kafka

In [84]:
stream_writer = (
    sentiment
    .selectExpr("to_json(struct(*)) AS value")
    .writeStream
    .queryName("emotion_sentiment")
    .format("kafka") \
    .option("kafka.bootstrap.servers", "159.223.22.116:9092")
    .option("topic", "sentiment_topic")
    .option("checkpointLocation", "/home/jovyan/work/logs")
)

query = stream_writer.start()

21/12/20 12:58:13 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/12/20 12:58:16 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 12:58:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 12:58:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 12:58:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 12:58:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 12:58:16 WARN KafkaDataCo

# 11. Real Time Agg Queries

In [None]:
if df_clean.isStreaming:
    display(query.lastProgress)

In [87]:
stream_writer2 = (
    positive_sentiment_count
    .selectExpr("to_json(struct(*)) AS value")
    .writeStream
    .outputMode("complete")
    .queryName("emotion_sentiment_agg")
    .format("kafka") \
    .option("kafka.bootstrap.servers", "159.223.22.116:9092")
    .option("topic", "sentiment_agg_topic")
    .option("checkpointLocation", "/home/jovyan/work/logs2")
)

query2 = stream_writer2.start()

21/12/20 13:01:08 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/12/20 13:01:08 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:01:08 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:01:09 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:01:09 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:01:09 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:01:09 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:01:10 WARN DAGScheduler: Broadcast

In [50]:
query.name

'emotion_sentiment'

In [89]:
df_stream_cast2 = df_stream.selectExpr("CAST(value AS STRING)", "timestamp") \
.select(from_json(col("value"), df_schema) \
        .alias("values"),"timestamp")
df2 = df_stream_cast2.select("values.*", "timestamp")

df_clean2=clean_data(df2)

raw_sentiment2 = sentiment_model.transform(df_clean2)


sentiment2 = raw_sentiment2.select(
    "createdAt", "name", "likeCount","quoteCount","replyCount",
    "retweetCount","text", "original_text", "timestamp",
    f.col("prediction").alias("user_sentiment")
)

21/12/20 13:04:45 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:04:45 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:04:45 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:04:46 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:04:46 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:04:46 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:04:46 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:04:46 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It 

In [90]:
sentiment_count = \
  sentiment2 \
    .withWatermark("timestamp", "2 minutes") \
    .groupBy(
      "user_sentiment", "timestamp") \
    .count()

21/12/20 13:04:53 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:04:53 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:04:53 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:04:53 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:04:53 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:04:53 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:04:53 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:04:54 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:04:54 

In [91]:
stream_writer3 = (
    sentiment_count
    .selectExpr("to_json(struct(*)) AS value")
    .writeStream
    .outputMode("append")
    .queryName("emotion_sentiment_agg2")
    .format("kafka") \
    .option("kafka.bootstrap.servers", "159.223.22.116:9092")
    .option("topic", "sentiment_agg_groupby_topic")
    .option("checkpointLocation", "/home/jovyan/work/logs3")
)

query3 = stream_writer3.start()

21/12/20 13:05:18 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/12/20 13:05:20 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:05:20 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:05:20 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:05:20 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:05:21 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:05:22 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:05:22 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's method

In [93]:
words = df_clean2.withColumn('word', f.explode(f.split(f.col('original_text'), ' '))) \
    .withWatermark('timestamp', '2 minutes')
wordCounts = words.groupBy('word', 'timestamp').count()

[Stage 1588:(478 + 4) / 2000][Stage 1590:> (0 + 0) / 1][Stage 1592:> (0 + 0) / 1]

In [94]:
stream_writer4 = (
    wordCounts
    .selectExpr("to_json(struct(*)) AS value")
    .writeStream
    .outputMode("append")
    .queryName("emotion_sentiment_word_agg")
    .format("kafka") \
    .option("kafka.bootstrap.servers", "159.223.22.116:9092")
    .option("topic", "sentiment_word_agg_topic")
    .option("checkpointLocation", "/home/jovyan/work/logs4")
)

query4 = stream_writer4.start()

21/12/20 13:11:14 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
[Stage 1588:(1977 + 4) / 2000][Stage 1590:> (0 + 0) / 1][Stage 1592:> (0 + 0) / 1]

In [138]:
words2 = df_clean2.withColumn('word', f.explode(f.split(f.col('original_text'), ' ')))
wordCounts2 = words2.groupBy('word').count()

21/12/20 13:45:14 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:45:15 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


In [142]:
stream_writer5 = (
    wordCounts2
    .selectExpr("to_json(struct(*)) AS value")
    .writeStream
    .outputMode("complete")
    .queryName("emotion_sentiment_word_agg3")
    .format("kafka") \
    .option("kafka.bootstrap.servers", "159.223.22.116:9092")
    .option("topic", "sentiment_word_agg_topic2")
    .option("checkpointLocation", "/home/jovyan/work/logs5")
)

query5 = stream_writer5.start()

21/12/20 13:46:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/12/20 13:46:41 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:46:41 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:46:42 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:46:42 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:46:42 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:46:42 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:46:42 WARN DAGScheduler: Broadcast

In [85]:
query.isActive

True

21/12/20 12:58:21 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 12:58:21 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 12:58:22 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 12:58:22 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 12:58:22 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 12:58:23 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 12:58:23 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 12:58:23 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It 

In [102]:
query2.isActive

False

21/12/20 13:23:44 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:23:44 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                                

In [103]:
query3.isActive

False

In [120]:
query4.isActive

False

21/12/20 13:33:25 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:33:25 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:33:25 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:33:25 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:33:25 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:33:25 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA

In [137]:
query5.isActive

21/12/20 13:44:39 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB


False

21/12/20 13:44:39 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:44:39 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:44:39 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:44:40 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:44:40 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:44:40 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:44:40 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/

In [None]:
df_clean.isStreaming

In [86]:
query.explain()

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5ff18ca9, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3800/0x0000000841669840@27382c44
+- Project [to_json(struct(createdAt, from_json(StructField(createdAt,IntegerType,true), cast(value#2908 as string), Some(Etc/UTC)).createdAt, name, from_json(StructField(name,StringType,true), cast(value#2908 as string), Some(Etc/UTC)).name, likeCount, from_json(StructField(likeCount,IntegerType,true), cast(value#2908 as string), Some(Etc/UTC)).likeCount, quoteCount, from_json(StructField(quoteCount,IntegerType,true), cast(value#2908 as string), Some(Etc/UTC)).quoteCount, replyCount, from_json(StructField(replyCount,IntegerType,true), cast(value#2908 as string), Some(Etc/UTC)).replyCount, retweetCount, from_json(StructField(retweetCount,IntegerType,true), cast(value#2908 as string), Some(Etc/UTC)).retweetCount, text, trim(regexp_replace(regexp_replace(pythonUDF0#413

21/12/20 12:59:24 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 12:59:24 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 12:59:25 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 12:59:25 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 12:59:25 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 12:59:25 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 12:59:30 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 12:59:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It 

In [None]:
display(query.lastProgress)

In [None]:
query.awaitTermination()

In [None]:
spark.streams.awaitAnyTermination()

In [80]:
query.stop()

In [99]:
query2.stop()

In [117]:
query3.stop()



In [118]:
query4.stop()

21/12/20 13:32:09 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:32:10 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:32:10 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:32:10 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:32:10 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:32:10 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA

In [134]:
query5.stop()

21/12/20 13:44:05 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:44:05 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:44:06 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:44:06 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:44:06 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:44:06 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/12/20 13:44:07 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
21/12/20 13:44:07 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It 

In [146]:
spark.stop()

21/12/20 14:00:35 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:591)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:577)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:442)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.