# Sentiment Analysis of twitter data using SparkML

Aim: To understand and explore the SparkML Library of PySpark. 
Dataset: Annotated tweets from “http://help.sentiment140.com/for-students/” provided by Stanford.

In [3]:
import pyspark as ps
import warnings
from pyspark.sql import SQLContext

In [4]:
import pandas as pd  
import numpy as np

In [5]:
import re
from bs4 import BeautifulSoup
from nltk.tokenize import WordPunctTokenizer

In [7]:
df = pd.read_csv("data/training.1600000.processed.noemoticon.csv",header=None, encoding = "ISO-8859-1",
                 usecols=[0,5],names=['senti','data'])
df['senti'] = df['senti'].map({0: 0, 4: 1})
df.head() 

Unnamed: 0,senti,data
0,0,"@switchfoot http://twitpic.com/2y1zl - Awww, t..."
1,0,is upset that he can't update his Facebook by ...
2,0,@Kenichan I dived many times for the ball. Man...
3,0,my whole body feels itchy and like its on fire
4,0,"@nationwideclass no, it's not behaving at all...."


In [9]:
df.senti.value_counts()

1    800000
0    800000
Name: senti, dtype: int64

# Cleaning the dataset

In [10]:
tok = WordPunctTokenizer()

p1 = r'@[A-Za-z0-9_]+'
p2 = r'https?://[^ ]+'
pat = r'|'.join((p1, p2))
www_p = r'www.[^ ]+'
neg_d = {"isn't":"is not", "aren't":"are not", "wasn't":"was not", "weren't":"were not",
                "haven't":"have not","hasn't":"has not","hadn't":"had not","won't":"will not",
                "wouldn't":"would not", "don't":"do not", "doesn't":"does not","didn't":"did not",
                "can't":"can not","couldn't":"could not","shouldn't":"should not","mightn't":"might not",
                "mustn't":"must not"}
neg_p = re.compile(r'\b(' + '|'.join(neg_d.keys()) + r')\b')

In [11]:
def clean_tweets(data):
    soup = BeautifulSoup(data, 'lxml')
    souped = soup.get_text()
    try:
        bom_removed = souped.decode("utf-8-sig").replace(u"\ufffd", "?")
    except:
        bom_removed = souped
    stp = re.sub(pat, '', bom_removed)
    stp = re.sub(www_p, '', stp)
    lower_case = stp.lower()
    neg_handled = neg_p.sub(lambda x: neg_d[x.group()], lower_case)
    letters_only = re.sub("[^a-zA-Z]", " ", neg_handled)
    words = [x for x  in tok.tokenize(letters_only) if len(x) > 1]
    return (" ".join(words)).strip()

In [12]:
%%time
print("Tweets Cleaning\n")
clean_tweet = []
for i in range(0,len(df)):
    if( (i+1)%100000 == 0 ):
        print("Done %d of %d" % ( i+1, len(df) ))                                                                    
    clean_tweet.append(clean_tweets(df['data'][i]))

Tweets Cleaning

Done 100000 of 1600000
Done 200000 of 1600000
Done 300000 of 1600000
Done 400000 of 1600000
Done 500000 of 1600000
Done 600000 of 1600000
Done 700000 of 1600000
Done 800000 of 1600000
Done 900000 of 1600000
Done 1000000 of 1600000
Done 1100000 of 1600000
Done 1200000 of 1600000
Done 1300000 of 1600000
Done 1400000 of 1600000
Done 1500000 of 1600000
Done 1600000 of 1600000
CPU times: user 11min 21s, sys: 12.8 s, total: 11min 34s
Wall time: 13min 24s


In [14]:
cdf = pd.DataFrame(clean_tweet,columns=['data'])
cdf['class'] = df.senti

In [15]:
cdf.to_csv('data/cleaned_tweets.csv',encoding='utf-8')

# Creating SparkContext and loading the data as DataFrame

In [16]:
try:
    sc = ps.SparkContext('local[2]')
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")

  


In [None]:
sc.master

In [1]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data/cleaned_tweets.csv')

In [2]:
type(df)

pyspark.sql.dataframe.DataFrame

In [3]:
df.show(5)

+---+--------------------+-----+
|_c0|                data|class|
+---+--------------------+-----+
|  0|awww that bummer ...|    0|
|  1|is upset that he ...|    0|
|  2|dived many times ...|    0|
|  3|my whole body fee...|    0|
|  4|no it not behavin...|    0|
+---+--------------------+-----+
only showing top 5 rows



In [4]:
df = df.dropna()

In [5]:
df.count()

1596041

In [6]:
# Cross-validation - splitting dataset into training and testing data for strong evaluation of models
(train_data, test_data) = df.randomSplit([0.98, 0.02], seed = 2000)

# Exploring SparkML

# TF-IDF and Logistic Regression
TF-IDF to get the features of the tweets and Logistic Regression to train the model and perform classification of positive and negative tweets. The accuracy is also evaluated.

In [7]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

In [27]:
%%time
tokenizer = Tokenizer(inputCol="data", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "class", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_data)
predictions = pipelineFit.transform(test_data)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test_data.count())



print("Accuracy :{0:.3%}".format(accuracy))

Accuracy :78.934%
CPU times: user 91.6 ms, sys: 9.22 ms, total: 101 ms
Wall time: 5min 54s


# CountVectorizer, IDF and Logistic Regression

In [8]:
from pyspark.ml.feature import CountVectorizer

In [9]:
%%time
tokenizer = Tokenizer(inputCol="data", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "class", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_data)
predictions = pipelineFit.transform(test_data)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test_data.count())
print("Accuracy :{0:.4f}".format(accuracy))

Accuracy :0.7946
CPU times: user 86.9 ms, sys: 14.8 ms, total: 102 ms
Wall time: 6min 35s


# Word2vec and Logistic Regression

In [13]:
from pyspark.ml.feature import Word2Vec

#Incase you get java.lang.OutOfMemoryError, use the following command when starting pyspark or 
# set setting permanently in spark-defaults.conf file 
#pyspark --driver-memory 5g --executor-memory 7g

In [14]:
%%time
tokenizer = Tokenizer(inputCol="data", outputCol="words")
word2Vec = Word2Vec(vectorSize=1000, minCount=5, inputCol="words", outputCol="features")
label_stringIdx = StringIndexer(inputCol = "class", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, word2Vec, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_data)
predictions = pipelineFit.transform(test_data)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test_data.count())

print("Accuracy :{0:.4f}".format(accuracy))

Accuracy :0.7625
CPU times: user 1.83 s, sys: 332 ms, total: 2.16 s
Wall time: 6h 41min 6s


# NGram Features and Logistic Regression

In [9]:
from pyspark.ml.feature import NGram, VectorAssembler

In [12]:

%%time
def gen_ngram(inputCol=["data","class"], n=3):
    tokenizer = [Tokenizer(inputCol="data", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    crossValidator = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "class", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + crossValidator + idf+ assembler + label_stringIdx + lr)


ngram_pipelineFit = gen_ngram().fit(train_data)
predictions = ngram_pipelineFit.transform(test_data)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test_data.count())

print("Accuracy :{0:.4f}".format(accuracy))

Accuracy :0.8107
CPU times: user 249 ms, sys: 54 ms, total: 303 ms
Wall time: 14min 34s


# Random Forest

Among the above models, the one with higher accuracy(NGram) is selected and trained with randomforest classifier. 

In [10]:
from pyspark.ml.classification import RandomForestClassifier

In [11]:
def ngram_features(inputCol=["data","class"], n=3):
    #create a transformer -tokenizer & ngrams
    tokenizer = [Tokenizer(inputCol="data", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    crossValidator = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    #create a single column with all the features collated together
    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "class", outputCol = "label")]
    return Pipeline(stages=tokenizer + ngrams + crossValidator + idf+ assembler + label_stringIdx)

In [21]:
%%time
ngram_pipelineFit = ngram_features().fit(train_data)
training_df = ngram_pipelineFit.transform(train_data)
testing_df = ngram_pipelineFit.transform(test_data)
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees = 100, maxDepth = 4, maxBins = 32)
rfModel = rf.fit(training_df)
predictions = rfModel.transform(testing_df)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(testing_df.count())
print("Accuracy :{0:.4f}".format(accuracy))

Accuracy :0.6998
CPU times: user 565 ms, sys: 52.8 ms, total: 618 ms
Wall time: 1h 4min 17s


# Naive Bayes Classifier
Among the above models, the one with higher accuracy(NGram) is selected and trained with Naive Bayes

In [1]:
from pyspark.ml.classification import NaiveBayes

In [17]:
%%time
ngram_pipelineFit = ngram_features().fit(train_data)
training_df = ngram_pipelineFit.transform(train_data)
testing_df = ngram_pipelineFit.transform(test_data)
nb = NaiveBayes(smoothing=1, modelType="multinomial")
nbmodel = nb.fit(training_df)
predictions = nbmodel.transform(testing_df)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(testing_df.count())
print("Accuracy :{0:.4f}".format(accuracy))

Accuracy :0.7820
CPU times: user 287 ms, sys: 77.5 ms, total: 364 ms
Wall time: 17min 35s


# Gradient Boosting Machines
Among the above models, the one with higher accuracy(NGram) is selected and trained with GBTClassifier. 

In [14]:

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
%%time
ngram_pipelineFit = ngram_features().fit(train_data)
training_df = ngram_pipelineFit.transform(train_data)
testing_df = ngram_pipelineFit.transform(test_data)
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=50)
model = gbt.fit(training_df)
predictions = model.transform(testing_df)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(testing_df.count())
print("Accuracy :{0:.4f}".format(accuracy))
