<a href="https://colab.research.google.com/github/zw2497/Twitter_Stream_Processing/blob/master/PySpark_Structured_Streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Java, Spark, and Findspark
This installs Apache Spark 2.4.0, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [1]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://www-us.apache.org/dist/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz
# !tar xf spark-2.4.2-bin-hadoop2.7.tgz
# !pip -q install findspark

#### Set Environment Variables
Set the locations where Spark and Java are installed.

In [None]:
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-2.4.2-bin-hadoop2.7"

# Start a SparkSession
This will start a local Spark session.

In [None]:
%%writefile ./app.py
from pyspark.sql.functions import udf, get_json_object, explode, window
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import pandas_udf, PandasUDFType

from textblob import TextBlob

from sklearn import linear_model

import pandas as pd
import numpy as np

import time

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
spark.sparkContext.setLogLevel('FATAL')

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "35.243.144.79:9092") \
  .option("subscribe", "tweepyv1") \
  .option("startingOffsets", "latest") \
  .option("failOnDataLoss", "false") \
  .load()

df.createOrReplaceTempView("raw")
df = spark.sql("""select decode(value, 'utf-8') as value, timestamp 
                  from raw""");

@udf(FloatType())
def senti(x):
    blob = TextBlob(x)
    s = []
    for sentence in blob.sentences:
        s.append(sentence.sentiment.polarity)
    return sum(s)/len(s)

df = df.select('timestamp',\
               get_json_object('value', '$.entities.hashtags[0].text').alias("hashtag"), \
               senti(get_json_object('value', '$.text')).alias("sentiment"))
df = df.filter(df.hashtag.isNotNull())
df.createOrReplaceTempView("datas")

"""## Trend detection"""

dftrend = spark.sql("""
select distinct hashtag, count(*) as count_num, avg(sentiment) as sentiment, now() as timestamp
from datas
group by hashtag, window(timestamp, "120 seconds", "30 seconds")
""")

@pandas_udf("key string, value double", PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
def trend_udf(key, pdf):
    reg = linear_model.LinearRegression()
    reg.fit(np.array(pd.to_datetime(pdf.timestamp).astype('int')).reshape(-1,1), np.array(pdf.count_num).reshape(-1,1))
    return pd.DataFrame([key + (reg.coef_[0][0],)])

dftrend = dftrend.groupby('hashtag').apply(trend_udf)

"""# Top k"""

dfslow = spark.sql("""
select distinct concat_ws(' ',hashtag, count(*), avg(sentiment)) as value, now() as key
from datas
group by hashtag, window(timestamp, "600 seconds", "120 seconds")
""")

dffast = spark.sql("""
select distinct concat_ws(' ',hashtag, count(*), avg(sentiment)) as value, now() as key
from datas
group by hashtag, window(timestamp, "30 seconds", "5 seconds")
""")


query = dftrend \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .outputMode("complete") \
  .option("kafka.bootstrap.servers", "35.243.144.79:9092") \
  .option("topic", "trend") \
  .option("checkpointLocation", "./logtrend") \
  .start()

query1 = dfslow \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .outputMode("update") \
  .option("kafka.bootstrap.servers", "35.243.144.79:9092") \
  .option("topic", "slow") \
  .option("checkpointLocation", "./logslow") \
  .start()

query2 = dffast \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .outputMode("update") \
  .option("kafka.bootstrap.servers", "35.243.144.79:9092") \
  .option("topic", "fast") \
  .option("checkpointLocation", "./logfast") \
  .start()

query.awaitTermination()
query1.awaitTermination()
query2.awaitTermination()

In [None]:
!./spark-2.4.2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.2 app.py