In [1]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [2]:
sc

In [3]:
spark

In [133]:
import pandas as pd
import numpy as np
from textblob import TextBlob
from nltk.corpus import stopwords
from nltk import download
from nltk import word_tokenize
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors
import pyarrow as pa # for creating spark dataframe
from nltk import sent_tokenize
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from gensim import models
import gensim.downloader as api
from gensim.models import TfidfModel
from gensim.corpora import Dictionary

In [5]:
download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\peter\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [6]:
# Print list of stopwords for informational purposes
stops = set(stopwords.words('english'))
print(stops)
# We might consider removing some of these or making our own list since our text is weird

{'out', 'we', 'was', 'how', 'myself', 'for', 'they', 'about', "hasn't", 'then', 'both', 'so', 're', 'don', 'm', 'as', 'any', 'mightn', 'after', 'you', 'wouldn', 'why', 'been', 'where', 'by', "isn't", 'yourself', 'wasn', 'a', "haven't", 'did', "hadn't", 'their', 'hasn', 'doing', 'be', 'further', 'ours', 'now', 'am', 'her', "you'll", 'yourselves', 'that', 'my', 'what', 'to', 'd', 'not', "won't", "couldn't", 'own', 'there', 'this', 'each', 'all', 'haven', 'more', 'me', 've', 'weren', 'which', 'himself', 'nor', 'other', "shouldn't", 'who', "should've", 'same', 'at', 'such', 't', 'up', 'than', 'can', "you've", 'too', 'these', 'while', "wasn't", 'ourselves', 'before', 'i', 'he', "didn't", 'our', 'its', 'but', 'with', "wouldn't", 'those', 'because', 'the', 'y', 'shouldn', 'it', 'mustn', 'hers', 'just', 'doesn', 'ain', 'between', 'over', 'had', 'aren', "mightn't", 'does', 'have', 'and', 'or', 'some', "mustn't", 'only', 'won', 'when', 'needn', 'below', 'in', 'if', 'theirs', "needn't", "aren't",

<a href="https://pandas.pydata.org/Pandas_Cheat_Sheet.pdf" target="_blank">Pandas cheat sheet</a><BR>

<A HREF="https://spark.apache.org/docs/latest/ml-guide.html" target="_blank">Pyspark ML guide</A><BR>

In [7]:
df = pd.read_csv("messages.csv")
# Replace message NaNs with zero length string ''
df.fillna('', inplace=True)

In [8]:
df.head()

Unnamed: 0,username,channel,message
0,y0urm0mg4y_19391,#xqcow,PagMan 🪣
1,heroo117,#xqcow,THIS IS JUST LIKE OW2 BatChest
2,deeznipzzz,#summit1g,blue is crashing at some point can already tel...
3,masterchiefkief,#xqcow,sheep
4,bugballo,#xqcow,press e


In [9]:
df

Unnamed: 0,username,channel,message
0,y0urm0mg4y_19391,#xqcow,PagMan 🪣
1,heroo117,#xqcow,THIS IS JUST LIKE OW2 BatChest
2,deeznipzzz,#summit1g,blue is crashing at some point can already tel...
3,masterchiefkief,#xqcow,sheep
4,bugballo,#xqcow,press e
...,...,...,...
6003,upgraydded,#summit1g,Pornhub in VR Pog
6004,loukangbang,#summit1g,INSTALL ITTTT PauseChamp
6005,drethaprince,#summit1g,Did something to Fortnite?
6006,okiasmegalos,#xqcow,The daleks


In [10]:
# Only for 2 channel's of data
# codes channel in 1st row as 0, other channel is 1
df['target'] = pd.DataFrame(np.where(df.channel==df.channel[0], 0, 1))
print('channel 0 is ',df.channel[0])

channel 0 is  #xqcow


<a href="https://www.programiz.com/python-programming/list-comprehension" target="_blank">List comprehension</a><br>

In [11]:
# Tokenize words
df['tokens'] = [word_tokenize(string) for string in df['message']]
df['tokens'] = [[word.lower() for word in token] for token in df['tokens'] ]

In [12]:
# Remove stop words
df['tokens'] = [[word for word in token if word not in stopwords.words('english') ] for token in df['tokens']]

In [13]:
# remove blanks ''
df['tokens'] = [[word for word in token if len(word) > 0] for token in df['tokens']]

In [14]:
# Count number of words in tokens
df['count'] = [len(token) for token in df['tokens']]

Since many of these aren't words, I think stemming and lemmatization are not useful here

In [15]:
df

Unnamed: 0,username,channel,message,target,tokens,count
0,y0urm0mg4y_19391,#xqcow,PagMan 🪣,0,"[pagman, 🪣]",2
1,heroo117,#xqcow,THIS IS JUST LIKE OW2 BatChest,0,"[like, ow2, batchest]",3
2,deeznipzzz,#summit1g,blue is crashing at some point can already tel...,1,"[blue, crashing, point, already, tell, wait, lol]",7
3,masterchiefkief,#xqcow,sheep,0,[sheep],1
4,bugballo,#xqcow,press e,0,"[press, e]",2
...,...,...,...,...,...,...
6003,upgraydded,#summit1g,Pornhub in VR Pog,1,"[pornhub, vr, pog]",3
6004,loukangbang,#summit1g,INSTALL ITTTT PauseChamp,1,"[install, itttt, pausechamp]",3
6005,drethaprince,#summit1g,Did something to Fortnite?,1,"[something, fortnite, ?]",3
6006,okiasmegalos,#xqcow,The daleks,0,[daleks],1


In [16]:
ch0 = df.query('target == 0')
ch1 = df.query('target == 1')

In [17]:
ch1.head()

Unnamed: 0,username,channel,message,target,tokens,count
2,deeznipzzz,#summit1g,blue is crashing at some point can already tel...,1,"[blue, crashing, point, already, tell, wait, lol]",7
31,jabrony67,#summit1g,HELL YEAH,1,"[hell, yeah]",2
46,mekerakesh,#summit1g,"Tyres up to temp, should be a nice smooth race...",1,"[tyres, temp, ,, nice, smooth, race]",6
59,temporarilyoutoforder,#summit1g,holy shit i havnt seen him play this much but ...,1,"[holy, shit, havnt, seen, play, much, man, got...",9
70,filcher___,#summit1g,1,1,[1],1


OK, try to make a Bag-O-Words for both channels' tokens

In [18]:
allwords0 = [[word for word in token[0].split() ] for token in ch0['tokens'] if len(token)>0]
# flat_list = [item for sublist in list_of_lists for item in sublist]
allwords0 = np.unique(np.array([item for sublist in allwords0 for item in sublist]))
allwords1 = [[word for word in token[0].split() ] for token in ch1['tokens'] if len(token)>0]
# flat_list = [item for sublist in list_of_lists for item in sublist]
allwords1 = np.unique(np.array([item for sublist in allwords1 for item in sublist]))

In [19]:
# Ok let's make a master list of all words
allwords = [[word for word in token[0].split() ] for token in df['tokens'] if len(token)>0 ]
# flat_list = [item for sublist in list_of_lists for item in sublist]
allwords = np.unique(np.array([item for sublist in allwords for item in sublist]))

In [20]:
allwords

array(['\x01action', '!', '#', ..., '🧹', '🩰', '🪣'], dtype='<U30')

In [96]:
tfidf_vectorizer = TfidfVectorizer(max_df=0.95, min_df=2,stop_words='english')
tfidf_matrix = tfidf_vectorizer.fit_transform(df['message'])
dfTFIDF = tfidf_matrix.toarray()
tfid_feature_names = tfidf_vectorizer.get_feature_names_out()
for i in range(dfTFIDF.shape[0]):
    dfTFIDF[i].flatten
dfTFIDF

array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]])

<A HREF="https://radimrehurek.com/gensim/models/tfidfmodel.html?highlight=tfidfmodel">TF-IDF Model docs</A>

#### Ok, time to try a spark ML model. I'm working off the example <a href='https://spark.apache.org/docs/latest/ml-pipeline.html#code-examples'>here</a>

I'm disliking pandas' dataframe it doesn't understand what I want, using numpy
<A HREF="http://www.cheat-sheets.org/saved-copy/numpy-cheat-sheet.20210604.pdf">Numpy cheat sheet</A>

In [140]:
# move pandas df to spark df and tell it what are the features and what is the label
df4spark = np.array(dfTFIDF)
print(df4spark.shape)
target = np.array(df['target'])
target = np.reshape(target, (len(dfTFIDF), 1))
print(target.shape)
df4spark = np.concatenate((df4spark,target),axis=1) 
df4spark = pd.DataFrame(df4spark)
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', "true")
training = spark.createDataFrame(df4spark, ["features", "label"])


(6008, 1132)
(6008, 1)


In [126]:
training

DataFrame[features: array<bigint>, label: int]

In [127]:
# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)

In [128]:
# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)

IllegalArgumentException: requirement failed: Column features must be of type class org.apache.spark.ml.linalg.VectorUDT:struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually class org.apache.spark.sql.types.ArrayType:array<bigint>.

# All Spark All the Time!

<A HREF="https://spark.apache.org/docs/latest/ml-features.html#feature-extractors">Spark examples with text</A><BR>
<A HREF="https://datacamp-community-prod.s3.amazonaws.com/02213cb4-b391-4516-adcd-57243ced8eed">Spark dataframe cheat sheet</A><BR>

In [179]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql.functions import *

In [201]:
sdf = spark.read.csv('messages.csv', header=True)
sdf.na.drop()
print(sdf)

DataFrame[username: string, channel: string, message: string]


In [202]:
sdf1 = sdf.withColumn('target', F.when(sdf.channel == df.channel[0], 0).otherwise(1) )
sdf1.show()

+----------------+---------+--------------------+------+
|        username|  channel|             message|target|
+----------------+---------+--------------------+------+
|y0urm0mg4y_19391|   #xqcow|           PagMan 🪣|     0|
|        heroo117|   #xqcow|THIS IS JUST LIKE...|     0|
|      deeznipzzz|#summit1g|blue is crashing ...|     1|
| masterchiefkief|   #xqcow|               sheep|     0|
|        bugballo|   #xqcow|             press e|     0|
|          sevrys|   #xqcow|because its the s...|     0|
|          xannyi|   #xqcow|         GIGACHAD 🪣|     0|
|         xfiishy|   #xqcow|           PagMan 🪣|     0|
|         thawght|   #xqcow|The bucket can ch...|     0|
|            dqzz|   #xqcow|           PepeLaugh|     0|
|         malrysn|   #xqcow|PeepoGlad remembe...|     0|
|    gamestarioni|   #xqcow|           PagMan 🪣|     0|
| johnnystick5738|   #xqcow|               RIGHT|     0|
|         sharis3|   #xqcow|       OW 2 OMEGALUL|     0|
| swedish_remixer|   #xqcow|just li

In [203]:
sdf1 = sdf1.withColumn('messages', ltrim(sdf.message) ).show()

+----------------+---------+--------------------+------+--------------------+
|        username|  channel|             message|target|            messages|
+----------------+---------+--------------------+------+--------------------+
|y0urm0mg4y_19391|   #xqcow|           PagMan 🪣|     0|           PagMan 🪣|
|        heroo117|   #xqcow|THIS IS JUST LIKE...|     0|THIS IS JUST LIKE...|
|      deeznipzzz|#summit1g|blue is crashing ...|     1|blue is crashing ...|
| masterchiefkief|   #xqcow|               sheep|     0|               sheep|
|        bugballo|   #xqcow|             press e|     0|             press e|
|          sevrys|   #xqcow|because its the s...|     0|because its the s...|
|          xannyi|   #xqcow|         GIGACHAD 🪣|     0|         GIGACHAD 🪣|
|         xfiishy|   #xqcow|           PagMan 🪣|     0|           PagMan 🪣|
|         thawght|   #xqcow|The bucket can ch...|     0|The bucket can ch...|
|            dqzz|   #xqcow|           PepeLaugh|     0|           Pep

In [204]:

tokenizer = Tokenizer(inputCol="message", outputCol="words")
wordsDat = tokenizer.transform(sdf1)

AttributeError: 'NoneType' object has no attribute '_jdf'

In [182]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

AttributeError: 'NoneType' object has no attribute '_jdf'