# MACHINE LEARNING WITH SPARK

### Is it a reddit or twitter post?

We decided to do classification with spark. Since we couldn't label if tweet is pro-trump or pro-biden, which was our main idea, we decided to classify texts in reddit posts and tweets. <br>
and try to predict if some given text is whether it came from reddit or twitter.

In [1]:
# Dependecies for whole notebook
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.functions import monotonically_increasing_id, desc, udf, lit, rand, when, col
import re


# ML
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
# SparkContext and SqlContext
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

In [3]:
# Reading our reddit data
data_r = sqlContext.read.csv(
    "./data/reddit.csv",
    header=True,
    mode="DROPMALFORMED",
    multiLine=True,
    inferSchema=True,
    sep=',',
    escape='"'
)

In [4]:
data_r = data_r.select('comment')
data_r = data_r.filter("comment != '[removed]'") # for some reason sql "or" doesnt work so we splited this into two steps -> some redit posts were deleted or removed
data_r = data_r.filter("comment != '[deleted]'")
data_r = data_r.selectExpr("comment as text")
data_r = data_r.withColumn('label', lit(0.0))
data_r.show(6)
data_r.count()

+--------------------+-----+
|                text|label|
+--------------------+-----+
|Is there a bettin...|  0.0|
|DC and PR should ...|  0.0|
|Good statement Mi...|  0.0|
|I was wondering i...|  0.0|
|Usually that's th...|  0.0|
|Yeah, I honestly ...|  0.0|
+--------------------+-----+
only showing top 6 rows



80621

In [5]:
# Reading our twitter data
data_tw = sqlContext.read.csv(
    "./data/tweets.csv",
    header=True,
    mode="DROPMALFORMED",
    multiLine=True,
    inferSchema=True,
    sep=',',
    escape='"'
)

data_tw = data_tw.select('tweet_text').limit(data_r.count()) # we want the same amout of rows as in reddit dataframe
data_tw = data_tw.selectExpr('tweet_text as text')
data_tw = data_tw.withColumn('label', lit(1.0))
data_tw.show(5)
data_tw.count()

+--------------------+-----+
|                text|label|
+--------------------+-----+
|@TscookT your Twe...|  1.0|
|@HollejHolle @Mat...|  1.0|
|@chen88888899 Tru...|  1.0|
|@thehorrorchick S...|  1.0|
|@jamesechoeson @e...|  1.0|
+--------------------+-----+
only showing top 5 rows



80621

In [6]:
df = data_tw.union(data_r) # appending dataframes
# head
df.count()

161242

In [7]:
# lets clean our text column
def cleaner(text):
    """Removes mentiones, links and hashtags from text"""
    
    if text is None: # in order for select method to work we must escape NoneType case
        return 0
    
    words = text.split(' ')
    cleaned = []
    for word in words:
        if '@' in word or word.startswith('https') or word.startswith('#') or word == 'RT':
            continue
        
        else:
            cleaned.append(word)
    return ' '.join(cleaned)

In [8]:
remove_udf = udf(cleaner)
cleaned = df.withColumn('text', remove_udf(df.text))

In [9]:
cleaned.show(10)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|your Tweets are g...|  1.0|
|Their no match fo...|  1.0|
|     Trump will win.|  1.0|
|SON OF A BITCH. B...|  1.0|
|My Grandmother su...|  1.0|
|After Failed Atte...|  1.0|
|This is excellent...|  1.0|
|🚨🚨🚨
Homeland S...|  1.0|
|This brought me t...|  1.0|
|With that said Pr...|  1.0|
+--------------------+-----+
only showing top 10 rows



In [10]:
# lets randomize order of texts, so we have shuffeled dataframe for later spliting of training and testing datasets, since randomSplit() doesnt work sometimes
shuffle = cleaned.orderBy(rand())
shuffle.show(19)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|That was a great ...|  0.0|
|      Had--&gt; lost|  0.0|
|Trump ran out of ...|  1.0|
|Mini tree is for ...|  1.0|
|Meltzer being ant...|  1.0|
|Can’t wait to see...|  1.0|
|CNN has had it wi...|  0.0|
|Am I the only one...|  1.0|
|Ah yes Lady Gaga!...|  0.0|
|If you can’t beat...|  0.0|
|Fauci and the CDC...|  0.0|
|Following Trump's...|  1.0|
|. Should have bou...|  1.0|
|Joe Biden es un c...|  1.0|
|TRUMP IS YOUR PRE...|  1.0|
|I like the way Bi...|  1.0|
|It's called diplo...|  0.0|
|The impeachment t...|  0.0|
|Shouldn't it be t...|  0.0|
+--------------------+-----+
only showing top 19 rows



In [11]:
# Spliting without randomSplit() -> since randomSplit() sometimes works and sometimes it returns blank columns
training_df = shuffle.limit(110000)
test_df = shuffle.subtract(training_df)

training_df.show(5)
test_df.show(5)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|That was a great ...|  0.0|
|      Had--&gt; lost|  0.0|
|Trump ran out of ...|  1.0|
|Mini tree is for ...|  1.0|
|Meltzer being ant...|  1.0|
+--------------------+-----+
only showing top 5 rows

+--------------------+-----+
|                text|label|
+--------------------+-----+
|Trump WON 79Milli...|  1.0|
|It is despicable ...|  1.0|
|Georgia prosecuto...|  1.0|
|Basically the lef...|  1.0|
|No, Trump is NOT ...|  1.0|
+--------------------+-----+
only showing top 5 rows



In [12]:
test_df = test_df.orderBy(rand()) # we shuffled it because for some reason after subtract method we get sorted df, so we just wanted to double check
test_df.show(5)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|Donald Trump it’s...|  1.0|
|He didnt help Bid...|  1.0|
|No but there has ...|  0.0|
|Hi `Mamacrass`. T...|  0.0|
|Wow Mitch Mcconne...|  0.0|
+--------------------+-----+
only showing top 5 rows



## Naive Bayes

First we will apply naive bayes algorithm, this code is mostly taken from lecture slides since this code was our starting point into ML with spark

In [13]:
# Building our pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashing_tf = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
idf = IDF(minDocFreq=3, inputCol="features", outputCol="idf")
nb = NaiveBayes()

pipeline = Pipeline(stages=[tokenizer,
                            hashing_tf,
                            idf,
                            nb])


paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 1.0]).build()

# Crossvalidation -> if we understood it right, with this approach we will train our model through crossvalidation so it kinda gets "double checked" and can perform better than without cross validation
cv = CrossValidator(estimator = pipeline, 
                    estimatorParamMaps = paramGrid,
                    evaluator = MulticlassClassificationEvaluator(),
                    numFolds = 2
                   )

In [14]:
# Running model
print('Model started')
cvModel = cv.fit(training_df)
print('Model has finished')

Model started
Model has finished


In [15]:
# Running model on test data
result = cvModel.transform(test_df)
result.select("text", "label", "prediction").show(10)

+--------------------+-----+----------+
|                text|label|prediction|
+--------------------+-----+----------+
|Donald Trump it’s...|  1.0|       1.0|
|He didnt help Bid...|  1.0|       1.0|
|No but there has ...|  0.0|       0.0|
|Hi `Mamacrass`. T...|  0.0|       0.0|
|Wow Mitch Mcconne...|  0.0|       0.0|
|Ted Cruz For his ...|  1.0|       1.0|
|Probably need to ...|  1.0|       1.0|
|Ignore that apolo...|  0.0|       0.0|
|Which is why ther...|  0.0|       0.0|
|New Capitol Video...|  1.0|       1.0|
+--------------------+-----+----------+
only showing top 10 rows



In [16]:
# Evaluating our results
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print('Accuracy:', evaluator.evaluate(result, {evaluator.metricName: "accuracy"}))

Accuracy: 0.8255926361802286


In [17]:
print('f1: ', evaluator.evaluate(result, {evaluator.metricName: "f1"}))

f1:  0.8256098029596373


## Logistic Regression

In the code below we will apply logistic regression model to help us distinguish between reddit and twitter posts.

In [18]:
# Building our pipeline
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
add_stopwords = ["http","https","amp","rt","t","c","the", "s", '&gt']
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

pipeline = Pipeline(stages=[regexTokenizer,
                            stopwordsRemover,
                            countVectors])


# pushing dataframe through pipeline
pipelineFit = pipeline.fit(shuffle)
dataset = pipelineFit.transform(shuffle)

training_df = dataset.limit(130000)
test_df = dataset.subtract(training_df)


print('Training Dataset len:', training_df.count()) # we will use same data we already splitted
print('Test Dataset len:', test_df.count())

Training Dataset len: 130000
Test Dataset len: 28975


In [19]:
# Building and training our model without cross validation
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(training_df)

# Running model on test df
predictions_df = lrModel.transform(test_df)

In [20]:
predictions_df.select('text', 'label','prediction').show(10)

+--------------------+-----+----------+
|                text|label|prediction|
+--------------------+-----+----------+
|Biden’s propagand...|  1.0|       1.0|
|I hope to god tha...|  0.0|       0.0|
|What a coincidenc...|  1.0|       1.0|
|It should be done...|  1.0|       1.0|
|Tomorrow is the *...|  1.0|       1.0|
|He went to the WH...|  1.0|       1.0|
|What a waste of s...|  1.0|       1.0|
|Listen if Biden’s...|  1.0|       0.0|
|I can give you de...|  1.0|       1.0|
|It's almost as if...|  0.0|       0.0|
+--------------------+-----+----------+
only showing top 10 rows



In [21]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print('Accuracy:', evaluator.evaluate(predictions_df, {evaluator.metricName: "accuracy"}))

Accuracy: 0.8951164797239
