# Reference
`https://spark.apache.org/docs/latest/streaming-programming-guide.html`

In [1]:
# Imports
import findspark
findspark.init()
findspark.find()


'/home/prod/spark-3.3.1-bin-hadoop3'

In [2]:
from pyspark import SparkConf
conf = SparkConf()
conf.setAppName('Structured Streaming Model')
conf.setMaster('spark://spark-master:7077');

In [3]:
# Setup spark environment
from pyspark import SparkContext
from pyspark.sql.types import *
from pprint import pprint, pformat
sc = SparkContext.getOrCreate(conf)
print('Spark web UI link: ', sc._jsc.sc().uiWebUrl().get())


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/08 14:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark web UI link:  http://spark-master:4040


## Streaming

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)

In [5]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

dStream = spark.readStream \
  .format("socket")\
  .option("host", "spark-master")\
  .option("port", 9999)\
  .load()

lines = dStream.select(
  explode(
    split(dStream.value, '\n')
  ).alias('text')
)



22/11/08 14:47:40 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


### Model setup

In [6]:
# Unpickle, pkl file
model_rdd_pkl = sc.binaryFiles("./models/SentimentIntensityAnalyzer.pkl")
model_rdd_data = model_rdd_pkl.collect()

                                                                                

In [7]:
# Load and broadcast python object over spark nodes
import pickle

_model = pickle.loads(model_rdd_data[0][1]) # local
model = sc.broadcast(_model) # broadcasted
print(model.value)

<nltk.sentiment.vader.SentimentIntensityAnalyzer object at 0x7f2dfebfd990>


In [8]:
from pyspark.sql.functions import udf

def predict(text):
    prediction = model.value.polarity_scores(text)['compound']
    return float(prediction)

predict_udf = udf(predict, DoubleType())


### Output Merging function

In [12]:
import os

out_file = '/data/model-output.csv'
cwd = os.getcwd()
data_output_path = cwd + out_file
print('output path: ', data_output_path)
try:
    os.remove(data_output_path)
except Exception as e:
    print(e)


def mergeResult(batchDF, batchID):
    print('batchSize: ', batchDF.count(), 'id: ', batchID)
    batchDF.show()
    masterDf = batchDF.toPandas().to_csv(data_output_path, mode='a', index=False, header=False)

output path:  /home/prod/sparkimental/data/model-output.csv
[Errno 2] No such file or directory: '/home/prod/sparkimental/data/model-output.csv'


## Predict on the stream query

In [13]:
from pyspark.sql.functions import col
df = lines.select(
    col('text'),
    predict_udf(col('text')).alias('score')
)


## Start Stream Query

In [14]:
from time import sleep
import time

query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .queryName('stream-model-query') \
    .foreachBatch(mergeResult) \
    .start()


def stop_stream_query(query, wait_time):
    """Stop a running streaming query"""
    while query.isActive:
        msg = query.status['message']
        data_avail = query.status['isDataAvailable']
        trigger_active = query.status['isTriggerActive']
        if not data_avail and not trigger_active and msg != "Initializing StreamExecution":
            print('Stopping query...')
            query.stop()
        time.sleep(0.5)

    # Okay wait for the stop to happen
    print('Awaiting termination...')
    query.awaitTermination(wait_time)

stop_stream_query(query, 5000)

22/11/08 14:49:17 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-71778d2f-3ab4-4dc0-b169-6b6da85beeb0. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/11/08 14:49:17 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
batchSize:  0 id:  0
+----+-----+
|text|score|
+----+-----+
+----+-----+



                                                                                

batchSize:  11 id:  1
+--------------------+-------+
|                text|  score|
+--------------------+-------+
|Animal Crossing; ...| 0.6605|
|With a game this ...| 0.9769|
|Above all else, A...| 0.9628|
|Nintendo's comfor...| 0.4404|
|Animal Crossing: ...| 0.9929|
|Animal Crossing: ...| 0.6908|
|Know that if you’...|-0.2023|
|Animal Crossing: ...| 0.9423|
|Animal Crossing: ...| 0.9042|
|A beautiful, welc...| 0.8591|
|Similar to how Br...| 0.8807|
+--------------------+-------+

batchSize:  26 id:  2
+--------------------+-------+
|                text|  score|
+--------------------+-------+
|New Horizons is t...| 0.8271|
|If you've never p...| 0.9463|
|Animal Crossing: ...| 0.8834|
|Animal Crossing: ...| 0.9525|
|Start from scratc...| 0.9403|
|With fully custom...| 0.9424|
|Animal Crossing N...| 0.8316|
|The fresh thrill ...| 0.8735|
|Animal Crossing: ...| 0.9329|
|Animal Crossing N...|-8.0E-4|
|Animal Crossing: ...| 0.9659|
|Animal Crossing: ...| 0.2263|
|Animal Crossing: ...| 0.

[Stage 22:>                                                         (0 + 1) / 1]

batchSize:  7 id:  3


                                                                                

+--------------------+------+
|                text| score|
+--------------------+------+
|Animal Crossing N...|0.7783|
|Animal Crossing r...|0.8225|
|Animal Crossing: ...|0.2144|
|Animal Crossing: ...|0.8316|
|Under the illusio...|0.9001|
|The series has ma...| 0.841|
|Based on what I h...|   0.0|
+--------------------+------+

batchSize:  10 id:  4
+--------------------+------+
|                text| score|
+--------------------+------+
|Animal Crossing d...|   0.0|
|It’s a blissfully...|0.8779|
|In case of Animal...|0.9538|
|As in the past ga...|0.9578|
|Playing New Horiz...|0.9699|
|Nintendo's island...|0.7677|
|While New Horizon...|0.7769|
|New Horizons is p...|0.3716|
|Animal Crossing: ...| 0.541|
|Animal Crossing: ...| 0.872|
+--------------------+------+

batchSize:  7 id:  5
+--------------------+------+
|                text| score|
+--------------------+------+
|Animal Crossing: ...|0.8402|
|Animal Crossing: ...|0.9673|
|Animal Crossing: ...|0.8832|
|There’s no arguin...|0.2

In [15]:
import pandas as pd

df_merged = pd.read_csv('.'+out_file, header = None)
df_init = pd.read_csv('./data/animal-crossing.csv')

In [16]:
# df_merged
print(df_merged.shape, df_init.shape)
print('Valid stream? ', df_merged.shape[0] == df_init.shape[0])

(107, 2) (107, 4)
Valid stream?  True


In [22]:
df_final = df_merged.rename(columns={0: 'text', 1: 'score'})
df_final.head()

Unnamed: 0,text,score
0,"Animal Crossing; New Horizons, much like its p...",0.6605
1,"With a game this broad and lengthy, there’s mo...",0.9769
2,"Above all else, Animal Crossing: New Horizons ...",0.9628
3,Nintendo's comforting life sim is a tranquil h...,0.4404
4,Animal Crossing: New Horizons takes Animal Cro...,0.9929


In [24]:
df_final.to_csv(data_output_path, mode='w', index=False)