## Twitter sentiment analysis and prediction using pyspark

In [2]:
#Install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=bb0d31e1278c5cbfe3a8dee587e88627ceee586989c1446842f79402f8bef974
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [3]:
from IPython import display
import math
import pandas as pd
import numpy as np

from pyspark.sql import SQLContext, SparkSession
from pyspark import SparkContext

from pyspark.sql.types import *

### Create Spark Context and load dataset

In [4]:
sc = SparkContext()
sqlContext = SQLContext(sc)



In [5]:
customSchema = StructType([
    StructField("clean_text", StringType()), 
    StructField("category", StringType())])

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
#Import twitter and reddit datasets
filename1 = '/content/drive/MyDrive/BigDataProject/twtr_dataset.csv'
filename2 = '/content/drive/MyDrive/BigDataProject/redt_dataset.csv'

In [8]:
df1 = sqlContext.read.format("csv").option("header", "true").schema(customSchema).load(filename1)
df1.count()

192131

In [9]:
df2 = sqlContext.read.format("csv").option("header", "true").schema(customSchema).load(filename2)
df2.count()

38305

In [10]:
#df = df1.union(df2)
df = df2
df.count() 

38305

In [11]:
data = df.na.drop(how='any')
data.show(5)

+--------------------+--------+
|          clean_text|category|
+--------------------+--------+
| family mormon ha...|       1|
|buddhism has very...|       1|
|seriously don say...|      -1|
|what you have lea...|       0|
|for your own bene...|       1|
+--------------------+--------+
only showing top 5 rows



In [12]:
df.count() 

38305

In [13]:
data.printSchema()

root
 |-- clean_text: string (nullable = true)
 |-- category: string (nullable = true)



## Preprocessing

In [14]:
from pyspark.sql.functions import col

data.groupBy("category").count().orderBy(col("count").desc()).show()

+--------+-----+
|category|count|
+--------+-----+
|       1|15749|
|       0|12895|
|      -1| 8244|
+--------+-----+



# Preprocessing of Tweets


*   Remove punctuations and alphanumeric characters
*   Remove hyperlinks
*   Remove mentions
*   Convert text to lower case


*   Remove stopwords
*   Fix abbreviated text
*   Part of Speech Tagging



*   Lemmatization







In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import re
import string

##Remove punctuations mentions and alphanumeric characters
def remove_features(data_str):
# compile regex
    url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
    punc_re = re.compile('[%s]' % re.escape(string.punctuation))
    num_re = re.compile('(\\d+)')
    mention_re = re.compile('@(\w+)')
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
# convert to lowercase
    data_str = data_str.lower()
# remove hyperlinks
    data_str = url_re.sub(' ', data_str)
# remove @mentions
    data_str = mention_re.sub(' ', data_str)
# remove puncuation
    data_str = punc_re.sub(' ', data_str)
# remove numeric 'words'
    data_str = num_re.sub(' ', data_str)
# remove non a-z 0-9 characters and words shorter than 1 characters
    list_pos = 0
    cleaned_str = ''
    for word in data_str.split():
        if list_pos == 0:
            if alpha_num_re.match(word):
                cleaned_str = word
            else:
                cleaned_str = ' '
        else:
            if alpha_num_re.match(word):
                cleaned_str = cleaned_str + ' ' + word
            else:
                cleaned_str += ' '
        list_pos += 1
# remove unwanted space, *.split() will automatically split on
# whitespace and discard duplicates, the " ".join() joins the
# resulting list into one string.
    return ' '.join(cleaned_str.split())

In [16]:
import nltk
nltk.download('stopwords')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
from nltk.corpus import stopwords
from nltk.stem.wordnet import WordNetLemmatizer
from nltk import pos_tag

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...


In [17]:
# non Ascii remover
remove_features_udf = udf(remove_features, StringType())
data = data.withColumn('clean_text',remove_features_udf(data['clean_text']))

In [18]:
##Fix abbreviations
def fix_abbreviation(data_str):
    data_str = data_str.lower()
    data_str = re.sub(r'\bthats\b', 'that is', data_str)
    data_str = re.sub(r'\bive\b', 'i have', data_str)
    data_str = re.sub(r'\bim\b', 'i am', data_str)
    data_str = re.sub(r'\bya\b', 'yeah', data_str)
    data_str = re.sub(r'\bcant\b', 'can not', data_str)
    data_str = re.sub(r'\bdont\b', 'do not', data_str)
    data_str = re.sub(r'\bwont\b', 'will not', data_str)
    data_str = re.sub(r'\bid\b', 'i would', data_str)
    data_str = re.sub(r'wtf', 'what the fuck', data_str)
    data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
    data_str = re.sub(r'\br\b', 'are', data_str)
    data_str = re.sub(r'\bu\b', 'you', data_str)
    data_str = re.sub(r'\bk\b', 'OK', data_str)
    data_str = re.sub(r'\bsux\b', 'sucks', data_str)
    data_str = re.sub(r'\bno+\b', 'no', data_str)
    data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
    data_str = re.sub(r'rt\b', '', data_str)
    data_str = data_str.strip()
    return data_str

In [19]:
fix_abbreviation_udf = udf(fix_abbreviation, StringType())
data = data.withColumn('clean_text', fix_abbreviation_udf(data['clean_text']))

In [20]:
# Part-of-Speech Tagging
def tag_and_remove(data_str):
    cleaned_str = ''
# noun tags
    nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS']
# adjectives
    jj_tags = ['JJ', 'JJR', 'JJS']
# verbs
    vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
    nltk_tags = nn_tags + jj_tags + vb_tags
# break string into 'words'
    text = data_str.split()
# tag the text and keep only those with the right tags
    tagged_text = pos_tag(text)
    for tagged_word in tagged_text:
        if tagged_word[1] in nltk_tags:
            cleaned_str = cleaned_str + ' ' + tagged_word[0]
    return cleaned_str

In [21]:
tag_and_remove_udf = udf(tag_and_remove, StringType())

In [22]:
data = data.withColumn('clean_text', tag_and_remove_udf(data['clean_text']))

In [23]:
##Lemmatization
def lemmatize(data_str):
# expects a string
    list_pos = 0
    cleaned_str = ''
    lmtzr = WordNetLemmatizer()
    text = data_str.split()
    tagged_words = pos_tag(text)
    for word in tagged_words:
        if 'v' in word[1].lower():
            lemma = lmtzr.lemmatize(word[0], pos='v')
        else:
            lemma = lmtzr.lemmatize(word[0], pos='n')
        if list_pos == 0:
            cleaned_str = lemma
        else:
            cleaned_str = cleaned_str + ' ' + lemma
        list_pos += 1
    return cleaned_str

In [24]:
lemmatize_udf = udf(lemmatize, StringType())
data = data.withColumn('clean_text', lemmatize_udf(data['clean_text']))

## Model Pipeline
Spark Machine Learning Pipelines API is similar to Scikit-Learn. Our pipeline includes three steps:

regexTokenizer: Tokenization (with Regular Expression)

stopwordsRemover: Remove Stop Words

countVectors: Count vectors (“document-term vectors”)

In [25]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="clean_text", outputCol="words", pattern="\\W")

# stop words
stop_words = stopwords.words('english')

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(stop_words)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=30000, minDF=5)

In [26]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+--------------------+--------+--------------------+--------------------+--------------------+-----+
|          clean_text|category|               words|            filtered|            features|label|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
|family mormon hav...|       1|[family, mormon, ...|[family, mormon, ...|(9686,[9,11,18,42...|  0.0|
|buddhism have muc...|       1|[buddhism, have, ...|[buddhism, much, ...|(9686,[2,5,6,10,1...|  0.0|
|don say thing win...|      -1|[don, say, thing,...|[say, thing, win,...|(9686,[0,2,5,10,1...|  2.0|
|have learn yours ...|       0|[have, learn, you...|[learn, want, tea...|(9686,[22,148,182...|  1.0|
|own benefit want ...|       1|[own, benefit, wa...|[benefit, want, r...|(9686,[6,22,28,49...|  0.0|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [236]:
from nltk.tokenize import RegexpTokenizer
from nltk.stem.porter import PorterStemmer
from gensim import corpora, models
import gensim

# Create p_stemmer of class PorterStemmer
p_stemmer = PorterStemmer()

texts = [row.filtered for row in dataset.toLocalIterator()]

# turn our tokenized documents into a id <-> term dictionary
dictionary = corpora.Dictionary(texts)
    
# convert tokenized documents into a document-term matrix
corpus = [dictionary.doc2bow(text) for text in texts]

# generate LDA model
ldamodel = gensim.models.ldamodel.LdaModel(corpus, num_topics=3, id2word = dictionary, passes=20)

In [217]:
tweets = dataset.toPandas()
tweets.head(5)

Unnamed: 0,clean_text,category,words,filtered,features,label
0,family mormon have try explain stare puzzle ti...,1,"[family, mormon, have, try, explain, stare, pu...","[family, mormon, try, explain, stare, puzzle, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0
1,buddhism have much lot compatible christianity...,1,"[buddhism, have, much, lot, compatible, christ...","[buddhism, much, lot, compatible, christianity...","(0.0, 0.0, 2.0, 0.0, 0.0, 1.0, 2.0, 0.0, 0.0, ...",0.0
2,don say thing win get complex explain normal p...,-1,"[don, say, thing, win, get, complex, explain, ...","[say, thing, win, get, complex, explain, norma...","(1.0, 0.0, 2.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0, ...",2.0
3,have learn yours yours want teach different fo...,0,"[have, learn, yours, yours, want, teach, diffe...","[learn, want, teach, different, focus, goal, w...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0
4,own benefit want read live live christ thich h...,1,"[own, benefit, want, read, live, live, christ,...","[benefit, want, read, live, live, christ, thic...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ...",0.0


In [237]:
from nltk.tokenize import word_tokenize
nltk.download('punkt')

topic_distributions = []

for index, row in tweets.iterrows():
    tokens = word_tokenize(row['clean_text'].lower())
    bow = dictionary.doc2bow(tokens)
    topic_distribution = ldamodel.get_document_topics(bow)
    topic_distributions.append(topic_distribution)
    
topic_probs = []

for distribution in topic_distributions:
    topic_probs.append([(topic, prob) for topic, prob in distribution])


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [238]:
distinct_topics = {}
for i in range(len(topic_probs)):
  for (topic, prob) in topic_probs[i]:
    distinct_topics[topic] = 1
print(len(distinct_topics), ldamodel.num_topics)

3 3


## Partition Training & Test sets

In [239]:
from pyspark.sql.functions import udf, lit
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from gensim.models import LdaModel
from gensim.corpora import Dictionary
import pandas as pd

# Set threshold for probability
threshold = 0.05

# Create dictionary to store trained models for each subset and topic
model_dict = {}

# Iterate through each topic
for topic_id in range(ldamodel.num_topics):

    # Select tweets with high probability for current topic
    tweets_subset = pd.DataFrame(columns = ['clean_text', 'category', 'words', 'filtered', 'features', 'label'])
    for index, row in tweets.iterrows():
        tweet_bow = dictionary.doc2bow(row['clean_text'].lower().split())
        topic_prob = ldamodel.get_document_topics(tweet_bow, minimum_probability=0.0)
        topic_prob = dict(topic_prob)
        if topic_id in topic_prob and topic_prob[topic_id] > threshold:
          tweets_subset = tweets_subset.append(row, ignore_index=True)
    
    # Create training subset
    if tweets_subset.shape[0] > 0:
        pyspark_tweets_subset = sqlContext.createDataFrame(tweets_subset)
        
        ml_model = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
        trained_model = ml_model.fit(pyspark_tweets_subset)

        # Store trained model in dictionary
        model_dict[topic_id] = trained_model

  tweets_subset = tweets_subset.append(row, ignore_index=True)
  tweets_subset = tweets_subset.append(row, ignore_index=True)
  tweets_subset = tweets_subset.append(row, ignore_index=True)


In [240]:
print(topic_probs[0])

[(0, 0.060553323), (1, 0.9120973), (2, 0.027349388)]


In [228]:
sentiment_labels = {0: "Positive", 1: "Neutral", 2: "Negative"}

In [229]:
# define sentiment estimation function
def estimate_sentiment(tweet, threshold=0.05):
  
  input_data = pd.DataFrame({"clean_text": [tweet]})
  tweet_df = sqlContext.createDataFrame(input_data)

  # Extract features using HashingTF and IDF
  regexTokenizer = RegexTokenizer(inputCol="clean_text", outputCol="words", pattern="\\W")
  tokenized_data = regexTokenizer.transform(tweet_df)
  hashing_tf = HashingTF(inputCol='words', outputCol='rawFeatures')
  featurized_data = hashing_tf.transform(tokenized_data)
  idf = IDF(inputCol='rawFeatures', outputCol='features')
  idf_model = idf.fit(featurized_data)
  rescaled_data = idf_model.transform(featurized_data)
  # initialize empty array for sentiment probabilities
  sentiment_probs = [0] * ldamodel.num_topics
  
  sentiments = []
  for topic in range(ldamodel.num_topics):
    sentiments.append(model_dict[topic].transform(rescaled_data))

  #Get topics related to the tweet
  tweet_bow = dictionary.doc2bow(tweet.lower().split())
  topic_prob = ldamodel.get_document_topics(tweet_bow, minimum_probability=0.0)
  topic_prob = dict(topic_prob)

  for topic, prob in topic_prob.items():
    if prob > threshold:
      sentiment_probs[topic] = sentiments[topic].select("prediction").collect()[0][0]

  # set sentiment estimation to the one with highest probability
  cnt = {}
  for num in sentiment_probs:
    if num not in cnt:
      cnt[num] = 1
    else:
      cnt[num] += 1
  
  mode = -1
  maxValue = -1
  for key, value in cnt.items():
    if(value > maxValue):
      maxValue = value
      mode = key
  
  sentiment_label = sentiment_labels.get(int(mode), "Unknown")
  return sentiment_label

In [212]:
value = estimate_sentiment("I am awesome!", 0.05)
print(value)

Positive


In [213]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 25892
Test Dataset Count: 10996


## Model Training and Evaluation
Logistic Regression using Count Vector Features 

Our model will make predictions and score on the test set; we then look at the top 10 predictions from the highest probability.

In [None]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0).select("clean_text","category","probability","label","prediction")\
.orderBy("probability", ascending=False).show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|iit iim student be much dif...|       1|[0.9999999999999931,2.60521...|  0.0|       0.0|
|ist aht teh chikcf rom retr...|       1|[0.9999998357271883,5.40996...|  0.0|       0.0|
|chennai super king ipl fini...|       1|[0.9999997571296896,1.19526...|  0.0|       0.0|
|svt murder case except culp...|       1|[0.9999992589312988,1.03962...|  0.0|       0.0|
|draft few point frustration...|       1|[0.9999989701361153,2.91350...|  0.0|       0.0|
|lol acting be good good goo...|       1|[0.9999980311801355,5.06028...|  0.0|       0.0|
|chennai super king season c...|       1|[0.9999979769953309,1.21011...|  0.0|       0.0|
|couple reason see though ha...|       1|[0.9999950052385942,8.21087...|  0.0|       0.0|
|author co

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7117525129668887

# Subset partitioning training result for N = 1.

In [253]:
predictions = model_dict[0].transform(testData)
predictions.filter(predictions['prediction'] == 0).select("clean_text","category","probability","label","prediction")\
.orderBy("probability", ascending=False).show(n = 10, truncate = 30)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|amazonize state bid india i...|       1|[0.9999999927096903,8.04325...|  0.0|       0.0|
|go churlish puncture baloon...|       1|[0.9999999848105883,4.39544...|  0.0|       0.0|
|site attack put modi strong...|       1|[0.9999999788968086,4.57450...|  0.0|       0.0|
|people have right know sour...|       1|[0.9999999514236283,3.95174...|  0.0|       0.0|
|holy shit gand muslim gangs...|      -1|[0.9999999188591101,2.47698...|  2.0|       0.0|
|many point such welfare edu...|       1|[0.9999999179972027,2.63643...|  0.0|       0.0|
|first time be great gesture...|       1|[0.999999885852927,1.060683...|  0.0|       0.0|
|author conclusion buddhism ...|       1|[0.9999998071385555,8.71114...|  0.0|       0.0|
|svt murde

0.7549026625358487

# Subset partitioning training result for N = 3.

In [245]:
predictions = []
for i in range(3):
  predictions.append(model_dict[i].transform(testData))

In [247]:
from pyspark.sql.functions import mode

for i in range(3):
  predictions[i].select("clean_text","category","probability","label","prediction")\
.orderBy("probability", ascending=False).show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|amazonize state bid india i...|       1|[0.9999999927096903,8.04325...|  0.0|       0.0|
|go churlish puncture baloon...|       1|[0.9999999848105883,4.39544...|  0.0|       0.0|
|site attack put modi strong...|       1|[0.9999999788968086,4.57450...|  0.0|       0.0|
|people have right know sour...|       1|[0.9999999514236283,3.95174...|  0.0|       0.0|
|holy shit gand muslim gangs...|      -1|[0.9999999188591101,2.47698...|  2.0|       0.0|
|many point such welfare edu...|       1|[0.9999999179972027,2.63643...|  0.0|       0.0|
|first time be great gesture...|       1|[0.999999885852927,1.060683...|  0.0|       0.0|
|author conclusion buddhism ...|       1|[0.9999998071385555,8.71114...|  0.0|       0.0|
|svt murde

In [248]:
combined_predictions = predictions[0].union(predictions[1]).union(predictions[2])

In [251]:
from pyspark.sql.functions import first
result_predictions = combined_predictions.groupBy('clean_text') \
                      .agg(mode(col('prediction')).alias('mode_prediction'),
                           first(col('category')).alias('category'),
                           first(col('label')).alias('label'))
result_predictions.show(5)

+--------------------+---------------+--------+-----+
|          clean_text|mode_prediction|category|label|
+--------------------+---------------+--------+-----+
|   aaaand streak end|            1.0|       0|  1.0|
|aadhaar privacy i...|            0.0|       1|  0.0|
|              aadhar|            1.0|       0|  1.0|
|aadhar exasperate...|            1.0|       0|  1.0|
|aadhar secure sec...|            0.0|       1|  0.0|
+--------------------+---------------+--------+-----+
only showing top 5 rows



In [252]:
evaluator = MulticlassClassificationEvaluator(predictionCol='mode_prediction')
evaluator.evaluate(result_predictions)

0.7893339287131915

## Logistic Regression using TF-IDF Features

In [None]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=30000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("clean_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|iit iim student be much dif...|       1|[0.9999999999950364,1.92636...|  0.0|       0.0|
|chennai super king ipl fini...|       1|[0.9999999997584104,3.77359...|  0.0|       0.0|
|ist aht teh chikcf rom retr...|       1|[0.9999998028760817,5.94457...|  0.0|       0.0|
|lol acting be good good goo...|       1|[0.9999981055490093,4.86939...|  0.0|       0.0|
|svt murder case except culp...|       1|[0.9999962708666431,1.02497...|  0.0|       0.0|
|author conclusion buddhism ...|       1|[0.9999952314738392,1.12138...|  0.0|       0.0|
|be good good good good good...|       1|[0.9999934857018989,1.68422...|  0.0|       0.0|
|actor be good good good goo...|       1|[0.9999921351462316,2.06716...|  0.0|       0.0|
|think sam

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.6937969783479848

## Cross-Validation
Let’s now try cross-validation to tune our hyper parameters, and we will only tune the count vectors Logistic Regression.

In [None]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
#print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

0.752211153632236

## Naive Bayes

In [None]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("clean_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|respect sir sar madam sar g...|       1|[1.0,9.182746314341452E-17,...|  0.0|       0.0|
|respect sir sar madam good ...|       1|[1.0,1.7338009126881107E-17...|  0.0|       0.0|
|respect sar ayam subash sar...|       1|[1.0,1.6868856139650975E-17...|  0.0|       0.0|
|respect sir sar madam ayam ...|       1|[1.0,5.650218080012626E-18,...|  0.0|       0.0|
|team be gun srh kane willia...|       1|[1.0,1.0491190511005182E-18...|  0.0|       0.0|
|csk dhoni raina be support ...|       1|[1.0,3.7655437920326383E-19...|  0.0|       0.0|
|team be gun srh start back ...|       1|[1.0,1.219565138353032E-19,...|  0.0|       0.0|
|team be gun toss csk srh ta...|       1|[1.0,1.2271904441956659E-20...|  0.0|       0.0|
|delhi dar

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7002298764346058

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(trainingData)
predictions = dtModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("clean_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|agree everyone get somethin...|      -1|[0.9114864864864864,0.00270...|  2.0|       0.0|
|accord be good reliable sou...|       1|[0.9114864864864864,0.00270...|  0.0|       0.0|
|bhechodh mirgi diya aur bol...|       1|[0.9114864864864864,0.00270...|  0.0|       0.0|
|agree say fail state countr...|       1|[0.9114864864864864,0.00270...|  0.0|       0.0|
|agree people give vote basi...|       1|[0.9114864864864864,0.00270...|  0.0|       0.0|
|accord role lose arm tyr fe...|       1|[0.9114864864864864,0.00270...|  0.0|       0.0|
|damn rahul good lookin dude...|       1|[0.9114864864864864,0.00270...|  0.0|       0.0|
|accurate learn high school ...|       1|[0.9114864864864864,0.00270...|  0.0|       0.0|
|arnabs la

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.27266082368144756

## Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("clean_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|iit iim student be much dif...|       1|[0.6153750155588668,0.13838...|  0.0|       0.0|
|brilliant point point other...|       1|[0.596061641963918,0.162264...|  0.0|       0.0|
|draft few point frustration...|       1|[0.5884963895732044,0.19201...|  0.0|       0.0|
|be simpleton have be victim...|       1|[0.5743760151164825,0.15607...|  0.0|       0.0|
|dhruv frustration misinform...|       1|[0.5570935262753622,0.23898...|  0.0|       0.0|
|spent sta afghanistan first...|       1|[0.5564944588299858,0.18814...|  0.0|       0.0|
|bjp different other politic...|       1|[0.5558701803561679,0.19732...|  0.0|       0.0|
|chennai super king season c...|       1|[0.5552815543887953,0.22082...|  0.0|       0.0|
|see pitch

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.27207601721338165

In [None]:
spark = SparkSession.builder.appName("StreamingExample").getOrCreate()
csv_stream = spark.readStream.format("csv") \
    .option("header", "true") \
    .option("maxFilesPerTrigger", 1) \
    .schema(customSchema) \
    .load(filename1)