In [1]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [2]:
sc

In [3]:
spark

In [4]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType
from pyspark.ml.feature import RegexTokenizer,Tokenizer,CountVectorizer
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover, HashingTF, IDF,Word2Vec
from pyspark.sql.functions import col, udf,regexp_replace,isnull
from pyspark.sql.types import StringType,IntegerType
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, LogisticRegression, DecisionTreeClassifier, GBTClassifier,OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from sklearn.metrics import classification_report, confusion_matrix

In [11]:
def pred(df):
    df = df.filter((df.label != 'null') | (df.review_text != 'null')|(df.review_text !=".")) 
    df=df.filter((df.label==1)|(df.label==0))
    df=df.dropna()  
    df=df.withColumn("text",regexp_replace(col('review_text'), '\d+', ''))
    df = df.withColumn("text2", regexp_replace(col('text'), "[\"$#,<>+@=?!'/%-]",''))
    df = df.dropna()
    tokenize = RegexTokenizer(inputCol="text2", outputCol="text3")
    df = tokenize.transform(df)
    remove = StopWordsRemover(inputCol="text3", outputCol="text4")
    df = remove.transform(df)
    wv = Word2Vec(vectorSize=100, minCount=0, inputCol="text4", outputCol="wv")
    wv1 = wv.fit(df)
    df = wv1.transform(df)
    model_lr = LogisticRegression(featuresCol = 'wv', labelCol='label')
    model_lr2=model_lr.fit(df)
    res = model_lr2.transform(df)
    res = res.select("label","review_text","prediction")
    res.show()
    return(res)

predict_udf = udf(pred, StringType())

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    
    # Utilize our predict function
    predictions = pred(df)
    predictions.show()

In [12]:
ssc = StreamingContext(sc, 10)

In [13]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [14]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|good game but a b...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|good game but a b...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|This is such a wo...|       1.0|
|    1|Everyone fights, ...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|This is such a wo...|       1.0|
|    1|Everyone fights, ...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+--------



In [15]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|Suffer not the he...|       1.0|
|    1|the gun makes lou...|       1.0|
|    1|Just an awesome r...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|Suffer not the he...|       1.0|
|    1|the gun makes lou...|       1.0|
|    1|Just an awesome r...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|Yesssssssssssssss...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|Yesssssssssssssss...|       1.0|
+-----+------------------

In [11]:
def pred2(df):
    df = df.filter((df.label != 'null') | (df.review_text != 'null')|(df.review_text !=".")) 
    df=df.filter((df.label==1)|(df.label==0))
    df=df.dropna()  
    df=df.withColumn("text",regexp_replace(col('review_text'), '\d+', ''))
    df = df.withColumn("text2", regexp_replace(col('text'), "[\"$#,<>+@=?!'/%-]",''))
    token=RegexTokenizer(inputCol="text2", outputCol="text3", pattern="\\W")
    remove= StopWordsRemover(inputCol="text3", outputCol="text4")
    htf=HashingTF(inputCol="text4", outputCol="features")
    idf=IDF(inputCol="features", outputCol="td")
    model=LogisticRegression(featuresCol="td",labelCol="label")
    pipeline = Pipeline(stages=[token,remove,htf,idf,model])
    model_lr2=pipeline.fit(df)
    res = model_lr2.transform(df)
    res = res.select("label","review_text","prediction")
    res.show()
    return(res)

predict_udf = udf(pred2, StringType())

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    
    # Utilize our predict function
    predictions = pred2(df)
    predictions.show()

In [12]:
ssc = StreamingContext(sc, 10)

In [13]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [14]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

Exception in thread Thread-13:
Traceback (most recent call last):
  File "C:\Users\RimJhim\anaconda3\lib\threading.py", line 973, in _bootstrap_inner
    self.run()
  File "C:\Users\RimJhim\AppData\Local\Temp/ipykernel_30444/265281298.py", line 10, in run
  File "C:\Users\RimJhim\Desktop\sp2\spark\spark-3.3.2-bin-hadoop2\python\pyspark\streaming\context.py", line 214, in start
    self._jssc.start()
  File "C:\Users\RimJhim\anaconda3\lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "C:\Users\RimJhim\Desktop\sp2\spark\spark-3.3.2-bin-hadoop2\python\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\RimJhim\anaconda3\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o652.start.
: java.lang.IllegalStateException: Only one StreamingContext may be started in this JVM. Currently running StreamingContext

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|I have no idea ho...|       1.0|
|    1|               great|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|I have no idea ho...|       1.0|
|    1|               great|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1| A fun, short, demo.|       1.0|
|    1|          Best Game.|       1.0|
|    1|It shares a name ...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1| A fun, short, demo.|       1.0|
|    1|          Best Game.|       1.0|
|    1|It shares a name ...|       1.

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|It's everything I...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|Perhaps the real ...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|Perhaps the real ...|       1.0|
+-----+--------------------+----------+



In [16]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|I'd recommend thi...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|I'd recommend thi...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|Watch the full re...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|Watch the full re...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|                 

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|You will be die n...|       1.0|
|    1|It's Town of Sale...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|You will be die n...|       1.0|
|    1|It's Town of Sale...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|This is a very gr...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|This is a very gr...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+--------

+-----+------------+----------+
|label| review_text|prediction|
+-----+------------+----------+
|    1|8/10 weeowek|       1.0|
+-----+------------+----------+

+-----+------------+----------+
|label| review_text|prediction|
+-----+------------+----------+
|    1|8/10 weeowek|       1.0|
+-----+------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|I enjoy running a...|       1.0|
|    1|I give it 7/10. I...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|I enjoy running a...|       1.0|
|    1|I give it 7/10. I...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|Warhammer 40k mix...|       1.0|
|    1|Holy Fuck i cant ...|       1

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|delete version.dl...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|delete version.dl...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|Beautiful graphic...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|Beautiful graphic...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|A fantastic littl...|       1.0|
+-----+--------------------+--------

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    0|The guns are terr...|       0.0|
|    1|                   8|       1.0|
+-----+--------------------+----------+

+-----+-----------+----------+
|label|review_text|prediction|
+-----+-----------+----------+
|    1|  very good|       1.0|
+-----+-----------+----------+

+-----+-----------+----------+
|label|review_text|prediction|
+-----+-----------+----------+
|    1|  very good|       1.0|
+-----+-----------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|At the current Ea...|       1.0|
|    1|               Scary|       1.0|
|    1|         Scawwy game|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|At the current Ea...|       1.0|
|    1

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|Gameplay is fun a...|       1.0|
|    1|Narrative game, w...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    0|First up I want t...|       0.0|
|    0|Played it for an ...|       0.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    0|First up I want t...|       0.0|
|    0|Played it for an ...|       0.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|The game is very ...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|predicti

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|it's everything I...|       1.0|
|    1|I've seen plenty ...|       1.0|
|    1|Game has definite...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|it's everything I...|       1.0|
|    1|I've seen plenty ...|       1.0|
|    1|Game has definite...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|I`M DOING MY PART...|       1.0|
|    1|I love this chaos...|       1.0|
|    1|I really like the...|       1.0|
|    1|Starship Troopers...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+---------

In [7]:
##Checking again:

def pred(df):
    df = df.filter((df.label != 'null') | (df.review_text != 'null')|(df.review_text !=".")) 
    df=df.filter((df.label==1)|(df.label==0))
    df=df.dropna()  
    df=df.withColumn("text",regexp_replace(col('review_text'), '\d+', ''))
    df = df.withColumn("text2", regexp_replace(col('text'), "[\"$#,<>+@=?!'/%-]",''))
    df = df.dropna()
    tokenize = RegexTokenizer(inputCol="text2", outputCol="text3")
    df = tokenize.transform(df)
    remove = StopWordsRemover(inputCol="text3", outputCol="text4")
    df = remove.transform(df)
    wv = Word2Vec(vectorSize=100, minCount=0, inputCol="text4", outputCol="wv")
    wv1 = wv.fit(df)
    df = wv1.transform(df)
    model_lr = LogisticRegression(featuresCol = 'wv', labelCol='label')
    model_lr2=model_lr.fit(df)
    res = model_lr2.transform(df)
    res = res.select("label","review_text","prediction")
    res.show()
    return(res)

predict_udf = udf(pred, StringType())

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    
    # Utilize our predict function
    predictions = pred(df)
    predictions.show()

In [8]:
ssc = StreamingContext(sc, 10)

In [9]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [11]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    0|Unless you have a...|       0.0|
|    1|Great game! Been ...|       1.0|
|    1|Outlast trials is...|       1.0|
|    1|As an outlast fan...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    0|Unless you have a...|       0.0|
|    1|Great game! Been ...|       1.0|
|    1|Outlast trials is...|       1.0|
|    1|As an outlast fan...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+--------------------+----------+
|    1|Probably one of t...|       1.0|
+-----+--------------------+----------+

+-----+--------------------+----------+
|label|         review_text|prediction|
+-----+------------------