In [2]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [3]:
sc

In [12]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier,RandomForestClassifier
from pyspark.ml.feature import HashingTF, Tokenizer,RFormula,StopWordsRemover,RegexTokenizer
from pyspark.sql.functions import col, when
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.sql.types import StructType,StructField
from pyspark.ml.feature import CountVectorizer, IDF 
from pyspark.ml.feature import Word2Vec, Normalizer
from  pyspark.sql.functions import rand,countDistinct

from pyspark.ml import PipelineModel


In [5]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Import Dataset

In [6]:
import os
# assign directory
#directory = 'C:/Users/user/Desktop/spark/notebooks/twitch_data'
directory = '/Users/thibtd/Desktop/spark/notebooks/twitch_data'
direcs = []
# iterate over files in
# that directory
for root, dirs, files in os.walk(directory):
    #print(dirs)
    direcs.append(dirs)
    for filename in files:
        #print(os.path.join(root))
        direcs.append(dirs)
        #print(dirs)
        
#print(direcs[0])
emptyRDD = spark.sparkContext.emptyRDD()
schema = StructType([
  StructField('channel', StringType(), True),
  StructField('datetime', StringType(), True),
  StructField('message', StringType(), True),
  StructField('username', StringType(), True)
  ])
df =  spark.createDataFrame(emptyRDD, schema)
for d in direcs[0]:
    a =  spark.read.json('/Users/thibtd/Desktop/spark/notebooks/twitch_data/{}'.format(d))
    df = df.union(a)

                                                                                

In [7]:
df_c = df.alias('df_c')

df_c.count()

                                                                                

7542

In [8]:
#path to save the model 
path = '/Users/thibtd/Desktop/spark/notebooks'
model_path = path + "/model_nlp_final"

# Cleaning the Dataset

In [9]:
lim = int(df_c.count())
print(lim)

df2 = df_c.select(col("channel").alias("label"),col('message').alias('text'),col('username'))

#Order randomly the rows and select 200 rows
df3= df2.withColumn("label", when(df2.label =="#asmongold",0).otherwise(1)).orderBy(rand()).limit(lim)
df3.show(50, truncate=True)

                                                                                

7542




+-----+--------------------+----------------+
|label|                text|        username|
+-----+--------------------+----------------+
|    1|Chatting link por...|   realriotgrrrl|
|    0|                HUHH|      tobbster__|
|    1| BILLY ON THE STREET|        maw67890|
|    1|                 HUH|    peachington_|
|    0|so they still 'pr...|      grillbadly|
|    1|PLUMBER CRACK SHO...|thembo_swaggins_|
|    0|         LIKE JOHNNY|         ded_men|
|    1|                 HUH|     valkyboi369|
|    1|                 HUH|     ventusnova_|
|    0|          GET FUCKED|         kyorotv|
|    0|                  ja|     ladyinertia|
|    0|it's too much for...|   viciousarchon|
|    1|            OMEGALUL|          kid254|
|    0|               Madge|        forsen15|
|    0|           PepeLaugh|       alzalam13|
|    1|At least listen t...|   samanthonybob|
|    1|                KEKL|       dixonsm64|
|    1|@AustinShow peepoHey| helloitscharlie|
|    1|seth rogan is a f...|      

                                                                                

In [20]:
df3.groupBy('label').count().show()



+-----+-----+
|label|count|
+-----+-----+
|    1| 3343|
|    0| 4199|
+-----+-----+



                                                                                

# Pipeline building

## Model Creation

**Features Extraction**

In [30]:
tokenizer = Tokenizer(inputCol='text',outputCol='mess_tok')
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(),outputCol='filtered')
hashingtf = HashingTF(inputCol=remover.getOutputCol(),outputCol='features')
idf = IDF(inputCol=hashingtf.getOutputCol(), outputCol="title_tf_idf")
regexTokenizer = RegexTokenizer(inputCol='text',outputCol='mess_tok', pattern="\\W")
normalizer = Normalizer(inputCol=idf.getOutputCol(), outputCol="title_tf_idf_norm")

**Models**

In [31]:
#split dataset into train test 
train,test = df3.randomSplit([0.6, 0.4], 24)
train.show()



+-----+--------------------+-----------------+
|label|                text|         username|
+-----+--------------------+-----------------+
|    0|                    |   cody_forsythe_|
|    0|                    |            duohh|
|    0|                    |kevindelacrosss14|
|    0|                    |         mrmoreno|
|    0|                    |       nokrashy99|
|    0|                    |  pladypuslemming|
|    0|                    |          playha1|
|    0|                    |      samueyejack|
|    0|                    |          slopdop|
|    0|                    |          tajcove|
|    0|          !watchtime|       satanrubyx|
|    0|    "He's WONDERFUL"|    justinrampage|
|    0|"I'm here to defe...|             pffq|
|    0|"would you do and...|        jazz_corp|
|    0|#JUISTICEFORAMBER...|          geymear|
|    0|          #rekt time|      unenergetic|
|    0|                 ***|        mindccome|
|    0|           *sips* :)|       kurlymoose|
|    0|      

[Stage 303:>                                                        (0 + 1) / 1]                                                                                

In [32]:
print('train size: ', train.count(), 'test size: ', test.count())



train size:  4521 test size:  3021


                                                                                

Best model is Logistic Regression 

In [33]:
lr = LogisticRegression()

In [34]:
pip1 = Pipeline(stages=[tokenizer,remover,hashingtf,idf,normalizer,lr])
df_pip = pip1.fit(train)

22/05/18 21:32:34 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/05/18 21:32:34 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/05/18 21:32:34 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/05/18 21:32:34 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/05/18 21:32:40 WARN BlockManager: Asked to remove block broadcast_975, which does not exist


In [35]:
predictions = df_pip.transform(test)

In [36]:
tp = predictions.filter((predictions.label==1) & (predictions.prediction ==1)).count()
fp = predictions.filter((predictions.label==0) & (predictions.prediction ==1)).count()
tn = predictions.filter((predictions.label==0) & (predictions.prediction ==0)).count()
fn = predictions.filter((predictions.label==1) & (predictions.prediction ==0)).count()
print('true positive: ',tp, ' false positive: ', fp)
print('false negative: ', fn, 'true negative: ', tn)

                                                                                

true positive:  1051  false positive:  253
false negative:  277 true negative:  1440


In [37]:
predictions.select('label',"prediction").show()



+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 20 rows



                                                                                

In [38]:
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

                                                                                

Test Area Under ROC 0.8895049334965301


In [39]:
#save the trained model 
df_pip.save(model_path)

22/05/18 21:35:20 WARN TaskSetManager: Stage 547 contains a task of very large size (4184 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

# Deployment phase

In [86]:
globals()['models_loaded'] = True
globals()['my_model'] = PipelineModel.load(model_path)

# Toy predict function. Normally you'd use your loaded globals()['my_model'] here
def predict(df):
    return 'predicted-name-of-channel'

predict_udf = udf(predict, StringType())
def clean(df):
    lim = 3000
    df2 = df.select(col("channel").alias("label"),col('message').alias('text'),col('username'))
    df3= df2.withColumn("label", when(df2.label =="#asmongold",0).otherwise(1)).limit(lim)
    return df3

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    df = clean(df)
    
    
    # Utilize our predict function
    #df_withpreds = df.withColumn("pred", predict_udf(
     #   struct([df[x] for x in df.columns])
    #))
    #df_withpreds.show()
    
    # Normally, you wouldn't use a UDF (User Defined Function) Python function to predict as we did here (you can)
    # but an MLlib model you've built and saved with Spark
    # In this case, you need to prevent loading your model in every call to "process" as follows:
    
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = '***' # Replace '***' with:    [...].load('my_logistic_regression')
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model: 
    df_result = globals()['my_model'].transform(df)
    selected = df_result.select("text","probability","prediction",'label' )
    evaluator = BinaryClassificationEvaluator()
    print('Test Area Under ROC', evaluator.evaluate(df_result))
    tp = df_result.filter((df_result.label==1) & (df_result.prediction ==1)).count()
    fp = df_result.filter((df_result.label==0) & (df_result.prediction ==1)).count()
    tn = df_result.filter((df_result.label==0) & (df_result.prediction ==0)).count()
    fn = df_result.filter((df_result.label==1) & (df_result.prediction ==0)).count()
    print('true positive: ',tp, ' false positive: ', fp)
    print('false negative: ', fn, 'true negative: ', tn)
    selected.show(200)

In [91]:
ssc = StreamingContext(sc, 300)

In [92]:
lines = ssc.socketTextStream("localhost", 8889)
lines.foreachRDD(process)

In [93]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

In [94]:
ssc_t.stop()

Test Area Under ROC 0.6724762686660668
----- Stopping... this may take a few seconds -----


22/05/18 22:40:06 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
	at java.base/java.net.SocketInputStream.socketRead0(Native Method)
	at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
	at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:121)
	at org.a

true positive:  746  false positive:  63
false negative:  523 true negative:  176


                                                                                

+--------------------+--------------------+----------+-----+
|                text|         probability|prediction|label|
+--------------------+--------------------+----------+-----+
|    he was miserable|[0.51963015797327...|       0.0|    0|
|wtf a mod just ti...|           [1.0,0.0]|       0.0|    1|
|               Metal|[0.51963015797327...|       0.0|    0|
|yea same when i w...|[2.65942484782595...|       1.0|    0|
|black dress, blac...|[0.25681548436840...|       1.0|    0|
|are all of the wo...|[1.64494251865541...|       1.0|    1|
|Big Structural Ch...|[2.39565107070931...|       1.0|    1|
|johnny, lord of f...|[0.15916707604322...|       1.0|    0|
|                 kek|[0.72990057144581...|       0.0|    0|
|             despair|[0.51963015797327...|       0.0|    0|
|         Damnit elon|[2.00632095575932...|       1.0|    0|
|        asmonMOGGERS|[0.99447407573786...|       0.0|    0|
|          monkaStare|[0.46870293436313...|       1.0|    1|
|I would be that w...|[4

+----------+--------------------+--------------------+---------------+
|   channel|            datetime|             message|       username|
+----------+--------------------+--------------------+---------------+
| #hasanabi|2022-05-18T20:39:...|             GIGAHAS|      eric_ttfy|
| #hasanabi|2022-05-18T20:39:...|              LETSGO|   stefie_rella|
| #hasanabi|2022-05-18T20:39:...|dsaGIANT dsaGIANT...|        dambing|
|#asmongold|2022-05-18T20:39:...|Offline chat is l...|      jackvaine|
| #hasanabi|2022-05-18T20:40:...|LETSGO JOHN FETTE...|         zony66|
| #hasanabi|2022-05-18T20:40:...|             GIGAHAS|    burgah_boy_|
| #hasanabi|2022-05-18T20:40:...|Fetterman won eve...|   rektoroftroy|
| #hasanabi|2022-05-18T20:40:...|@HasanAbi How wou...|     semourhere|
| #hasanabi|2022-05-18T20:40:...|    LETSGO FETTERMAN|         bigg03|
| #hasanabi|2022-05-18T20:40:...|          LETSGOOOOO|    roy_lacroix|
| #hasanabi|2022-05-18T20:40:...|neffWammie neffWa...|  t33nage_squid|
| #has