## Initialize Spark and import necessary packages

In [2]:
# create a local SparkContext object named "MyApp" that you can use to interact with Spark // for working with RDDs
from pyspark import SparkContext
sc = SparkContext("local", "MyApp")

23/05/01 22:15:05 WARN Utils: Your hostname, thinkpad-t470 resolves to a loopback address: 127.0.1.1; using 192.168.0.161 instead (on interface wlp4s0)
23/05/01 22:15:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/01 22:15:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# create a SparkSession object named spark with an app name of "MyApp". You can then use it to interact with Spark. // for working with dataframes
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()

In [4]:
import threading
from pyspark.streaming import StreamingContext
from pyspark.sql import DataFrame
from pyspark.sql.types import * 
from pyspark.sql import functions as F
from pyspark import StorageLevel
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from functools import reduce
import pandas 
import numpy as np
import os

## Constructing data set using the provided stream

In [5]:
# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)
        

# Start streaming and saving
ssc = StreamingContext(sc, 10)
lines = ssc.socketTextStream("seppe.net", 7778)
lines.saveAsTextFiles("/home/linas/Desktop/kul/$em 2/advanced analytics in business/assignment 3/streaming_test/")
ssc_t = StreamingThread(ssc)
ssc_t.start()

23/05/01 22:15:09 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.


In [6]:
# Stop streaming
ssc_t.stop()

----- Stopping... this may take a few seconds -----


23/05/01 22:15:11 WARN ReceiverSupervisorImpl: Skip stopping receiver because it has not yet stared
                                                                                

In [7]:
# Define empty RDD
empty_rdd = spark.sparkContext.emptyRDD()

# Define schema for the dataframe
columns = StructType([StructField('review_id', StringType(), False),
                        StructField('app_id', StringType(), False), 
                        StructField('review_text', StringType(), False), 
                        StructField('label', StringType(), False)])

# Create empty dataframe
df = spark.createDataFrame(data=empty_rdd, schema=columns)
df.show()

+---------+------+-----------+-----+
|review_id|app_id|review_text|label|
+---------+------+-----------+-----+
+---------+------+-----------+-----+



In [8]:
# Populate empty dataframe by reading the files saved after streaming
def scan_folder(parent, schema):
    for file_name in os.listdir(parent):
        if file_name.startswith("-168"):
            for text_name in os.listdir(parent+"/"+file_name):
                if text_name.startswith("part"):
                    sub_df=spark.read.format("json").schema(schema).load(parent+"/"+file_name+"/"+text_name)
                    globals()["df"]=globals()["df"].union(sub_df)

scan_folder("/home/linas/Desktop/kul/$em 2/advanced analytics in business/assignment 3/streaming_test/", columns)

In [9]:
# Check if the dataframe is populated correctly
df.show()
print((df.count(), len(df.columns)))
# NOTE: shape() is not valid for pyspark dataframes

+---------+-------+--------------------+-----+
|review_id| app_id|         review_text|label|
+---------+-------+--------------------+-----+
|137107618|2302820|The game is cool ...|    1|
|137109815|1663220|Thank you Toge fo...|    1|
|137109044|1928870|My son is a minec...|    1|
|137110988|1764870|[quote] Love RPG?...|    1|
|137109572|1457080|Love the way they...|    1|
|137108291|1681060|Good for about 10...|    1|
|137108423|1457080|I don't recommend...|    0|
|137109720|1862690|v fn,lj/m,naBWhsj...|    1|
|136902437|2337890|     Fuскіng awesome|    1|
|137108867|2282480|spoiler alert: no...|    0|
|137107440|1537830|                  ok|    1|
|137110497|1663220|Same setting/game...|    1|
|137110332|1743650|DEVELOPER FEAR NO...|    1|
|137110615|1457080|This game is fine...|    0|
|137110548|1457080|Oyun bazı hikayes...|    1|
|137109712|1967630|Personally I did ...|    0|
|137109649|1608640|You can play fetc...|    1|
+---------+-------+--------------------+-----+

(17, 4)


In [10]:
# Set label column to integer type
df = df.withColumn("label", df["label"].cast(IntegerType()))

## Constructing predictive model

In [11]:
# Split into train and test set
df, test= df.randomSplit(weights = [0.70, 0.30])

In [12]:
# Preprocess review data: tokenize
step1 = RegexTokenizer(inputCol="review_text", outputCol="tokens", pattern="\\W")

# Preprocess review data: remove stopwords and add slang words generated by ChatGPT
swr = StopWordsRemover(inputCol='tokens', outputCol='filtered_words')
w_slang = swr.getStopWords() + ['af', 'aight', 'amirite', 'anyways', 'awesomesauce', 'bae', 'bruh', 'btw', 'dope', 'fam', 'fr', 'gg', 'gj', 'glhf', 'grats', 'imho', 'imo', 'lmao', 'lmfao', 'lol', 'nvm', 'omg', 'pls', 'rip', 'salty', 'savage', 'smh', 'tbh', 'thx', 'ttyl', 'ty', 'wp', 'wtf', 'yeet', 'yolo']
step2 = StopWordsRemover(inputCol='tokens', outputCol='filtered_words', stopWords=w_slang)

# Preprocess review data: vectorize
step3 = CountVectorizer(inputCol="filtered_words", outputCol="features")

# Initiate logistic regression model
step4 = LogisticRegression(featuresCol='features', labelCol='label')

# Create a pipeline
pipeline = Pipeline(stages=[step1, step2, step3, step4])

In [13]:
# Fit pipeline to training data
pipelineFit = pipeline.fit(df)

                                                                                

In [None]:
# Validate model on test data
predictions = pipelineFit.transform(test)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test.count())
accuracy

In [17]:
# Save trained model
pipelineFit.save("/home/linas/Desktop/kul/$em 2/advanced analytics in business/assignment 3/model/")

                                                                                

## Using trained model to make predictions as the stream comes in

In [20]:
# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)
        

# Add capability to predict as the stream comes in
globals()['models_loaded'] = False
globals()['my_model'] = None

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        globals()['my_model'] = PipelineModel.load('/home/linas/Desktop/kul/$em 2/advanced analytics in business/assignment 3/model/')
        globals()['models_loaded'] = True
    # Make predictions with a loaded model
    df_result = globals()['my_model'].transform(df)
    df_result.select("app_id", "label", "review_id", "review_text", "prediction").show()

# Start streaming and predicting
ssc = StreamingContext(sc, 10)
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)
ssc_t = StreamingThread(ssc)
ssc_t.start()

23/05/01 22:25:05 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.


23/05/01 22:25:06 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/05/01 22:25:06 WARN BlockManager: Block input-0-1682969106200 replicated to only 0 peer(s) instead of 1 peers
23/05/01 22:25:08 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/05/01 22:25:08 WARN BlockManager: Block input-0-1682969108200 replicated to only 0 peer(s) instead of 1 peers
23/05/01 22:25:10 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/05/01 22:25:10 WARN BlockManager: Block input-0-1682969110200 replicated to only 0 peer(s) instead of 1 peers
23/05/01 22:25:14 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/05/01 22:25:14 WARN BlockManager: Block input-0-1682969114200 replicated to only 0 peer(s) instead of 1 peers
23/05/01 22:25:17 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
23/05/01 22:25:17 WARN BlockManager: Block input-0-1682969117200 replicated to

In [21]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----


23/05/01 22:26:00 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:171)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.io.InputStreamReader.read(InputStreamReader.java:184)
	at java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.io.BufferedReader.readLine(BufferedReader.java:324)
	at java.io.BufferedReader.readLine(BufferedReader.java:389)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:121)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:119)
	at org.apache.spar

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|1324130|    1|137627587|Much more shallow...|
|1324130|    1|137627505|                   .|
+-------+-----+---------+--------------------+

+-------+-----+---------+--------------------+----------+
| app_id|label|review_id|         review_text|prediction|
+-------+-----+---------+--------------------+----------+
|1324130|    1|137627587|Much more shallow...|       1.0|
|1324130|    1|137627505|                   .|       1.0|
+-------+-----+---------+--------------------+----------+

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|2166060|    1|137630077|a very great game...|
|2166060|    1|137629952|Awesome horror in...|
|2166060|    1|137628445|bro, i beat the g...|
+-------+-----+---------+--------------------+

+-------+-----+---------+-------------