## Twitter sentiment analysis and prediction using pyspark

In [1]:
from IPython import display
import math
import pandas as pd
import numpy as np

from pyspark.sql import SQLContext
from pyspark import SparkContext

from pyspark.sql.types import *

### Create Spark Context and load dataset

In [2]:
sc =SparkContext()
sqlContext = SQLContext(sc)



In [3]:
customSchema = StructType([
    StructField("clean_text", StringType()), 
    StructField("category", StringType())])

In [4]:
#modi_data.csv file contains 10000 tweets with seach query modi
filename1 = 'twtr_dataset.csv'

filename2 = 'redt_dataset.csv'

In [5]:
df1 = sqlContext.read.format("csv").option("header", "true").schema(customSchema).load(filename1)
df1.count()

192131

In [6]:
df2 = sqlContext.read.format("csv").option("header", "true").schema(customSchema).load(filename2)
df2.count()

38305

In [7]:
df = df1.union(df2)#, emp_acc_LoadCsvDF("acc_id").equalTo(emp_info_LoadCsvDF("info_id")), "inner").selectExpr("acc_id", "name", "salary", "dept_id", "phone", "address", "email")
df.count() 

230436

In [8]:
data = df.na.drop(how='any')
data.show(5)

+--------------------+--------+
|          clean_text|category|
+--------------------+--------+
|when modi promise...|      -1|
|talk all the nons...|       0|
|what did just say...|       1|
|asking his suppor...|       1|
|answer who among ...|       1|
+--------------------+--------+
only showing top 5 rows



In [9]:
df.count() 

230436

In [10]:
data.printSchema()

root
 |-- clean_text: string (nullable = true)
 |-- category: string (nullable = true)



## Preprocessing

In [11]:
from pyspark.sql.functions import col

data.groupBy("category").count().orderBy(col("count").desc()).show()

+--------+-----+
|category|count|
+--------+-----+
|       1|86224|
|       0|66446|
|      -1|42908|
+--------+-----+



## Model Pipeline
Spark Machine Learning Pipelines API is similar to Scikit-Learn. Our pipeline includes three steps:

regexTokenizer: Tokenization (with Regular Expression)

stopwordsRemover: Remove Stop Words

countVectors: Count vectors (“document-term vectors”)

In [12]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="clean_text", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=30000, minDF=5)

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+--------------------+--------+--------------------+--------------------+--------------------+-----+
|          clean_text|category|               words|            filtered|            features|label|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
|when modi promise...|      -1|[when, modi, prom...|[when, modi, prom...|(24654,[0,1,7,30,...|  2.0|
|talk all the nons...|       0|[talk, all, the, ...|[talk, all, nonse...|(24654,[0,1,2,8,1...|  1.0|
|what did just say...|       1|[what, did, just,...|[what, did, just,...|(24654,[0,2,3,20,...|  0.0|
|asking his suppor...|       1|[asking, his, sup...|[asking, his, sup...|(24654,[0,6,7,8,1...|  0.0|
|answer who among ...|       1|[answer, who, amo...|[answer, who, amo...|(24654,[0,22,69,1...|  0.0|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



## Partition Training & Test sets¶

In [14]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 136742
Test Dataset Count: 58836


## Model Training and Evaluation
Logistic Regression using Count Vector Features 

Our model will make predictions and score on the test set; we then look at the top 10 predictions from the highest probability.

In [15]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0).select("clean_text","category","probability","label","prediction")\
.orderBy("probability", ascending=False).show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|part you have something min...|       1|[1.0,9.776313922009844E-46,...|  0.0|       0.0|
|even though iit and iim stu...|       1|[1.0,1.1625365806360648E-47...|  0.0|       0.0|
| what your opinion about so...|       1|[0.9999999999995639,4.19716...|  0.0|       0.0|
|she right she right she rig...|       1|[0.9999999999970124,4.54714...|  0.0|       0.0|
| 2002 narendra modi the cur...|       1|[0.9999999999903655,1.47477...|  0.0|       0.0|
| chennai super kings for th...|       1|[0.9999999999814224,3.28327...|  0.0|       0.0|
|ist aht teh chikcf rom retr...|       1|[0.9999999925389519,9.00234...|  0.0|       0.0|
|upa had one the highest gro...|       1|[0.9999999839171612,7.22355...|  0.0|       0.0|
|svt 1989 

In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7706588617183189

## Logistic Regression using TF-IDF Features¶

In [17]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=30000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("clean_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|part you have something min...|       1|[0.9999999999999978,8.00813...|  0.0|       0.0|
|even though iit and iim stu...|       1|[0.9999999999999871,3.72158...|  0.0|       0.0|
| what your opinion about so...|       1|[0.9999999999992681,1.11682...|  0.0|       0.0|
|she right she right she rig...|       1|[0.9999999999968667,4.01727...|  0.0|       0.0|
| 2002 narendra modi the cur...|       1|[0.9999999999350917,3.78642...|  0.0|       0.0|
|upa had one the highest gro...|       1|[0.9999999987477581,2.07874...|  0.0|       0.0|
|thanks for doing this ama t...|       1|[0.9999999729296293,1.95190...|  0.0|       0.0|
|this the first time was gre...|       1|[0.9999999344994687,2.21122...|  0.0|       0.0|
|ist aht t

In [18]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.743005205397842

## Cross-Validation
Let’s now try cross-validation to tune our hyper parameters, and we will only tune the count vectors Logistic Regression.

In [19]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

In [20]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
#print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

0.8266595436822157

## Naive Bayes

In [25]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("clean_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|respected sir sar and madam...|       1|[1.0,7.308939495977858E-18,...|  0.0|       0.0|
| team strengths retained co...|       1|[1.0,1.2380549341069472E-18...|  0.0|       0.0|
|respected sir sar and madam...|       1|[1.0,2.0607743285077304E-19...|  0.0|       0.0|
|respected sir sar and madam...|       1|[1.0,1.2877618878208888E-21...|  0.0|       0.0|
|respected sir sar and madam...|       1|[1.0,9.280454666117545E-22,...|  0.0|       0.0|
|respected sir sar and madam...|       1|[1.0,2.0018099334290323E-22...|  0.0|       0.0|
|erosnow com has good actor ...|       1|[1.0,9.177524435135607E-23,...|  0.0|       0.0|
|respected sar ayam subash s...|       1|[1.0,9.08439754684448E-23,1...|  0.0|       0.0|
| don thin

In [26]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.764224901722392

In [27]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(trainingData)
predictions = dtModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("clean_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
| bache log kab sudharenge h...|       1|[0.9040262941659819,0.01511...|  0.0|       0.0|
| more lies from this mahach...|       1|[0.9040262941659819,0.01511...|  0.0|       0.0|
| jawahar lal nehru was the ...|       1|[0.9040262941659819,0.01511...|  0.0|       0.0|
|acche din aane wale haivote...|       1|[0.9040262941659819,0.01511...|  0.0|       0.0|
| more than can handle she  ...|       1|[0.9040262941659819,0.01511...|  0.0|       0.0|
| naaa india  needs modi kha...|       1|[0.9040262941659819,0.01511...|  0.0|       0.0|
|after modi came nepal more ...|       1|[0.9040262941659819,0.01511...|  0.0|       0.0|
| wonderful modi once more t...|       1|[0.9040262941659819,0.01511...|  0.0|       0.0|
| imran kh

In [28]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.2719464740349054

## Random Forest

In [29]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("clean_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
| drafted few the points and...|       1|[0.6294088357828965,0.15557...|  0.0|       0.0|
| what your opinion about so...|       1|[0.6292305850361721,0.11076...|  0.0|       0.0|
|part you have something min...|       1|[0.6104036983813099,0.15759...|  0.0|       0.0|
|upa had one the highest gro...|       1|[0.5920418318663144,0.18309...|  0.0|       0.0|
| india approve the road nor...|       1|[0.5903777813784719,0.18743...|  0.0|       0.0|
|even though iit and iim stu...|       1|[0.5881194790622174,0.16108...|  0.0|       0.0|
|thanks for doing this few q...|       1|[0.5861969734854111,0.17704...|  0.0|       0.0|
|congratulations you underst...|       1|[0.5820136427703471,0.18347...|  0.0|       0.0|
| chennai 

In [30]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.27065361002281063