# Sentiment Analysis of Twitter Data using PySpark

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Twitter_sentiment_analysis').getOrCreate()

In [3]:
data = spark.read.csv('twitter_dataset.csv', inferSchema=True)

In [4]:
data.show()

+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|  0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|  0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  0|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nop

In [5]:
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: long (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)



In [6]:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType

In [7]:
data = data.withColumnRenamed('_c0','target').withColumnRenamed('_c5','text')
data = data.withColumn('target', col('target').cast(StringType()))

In [8]:
data.show()

+------+----------+--------------------+--------+---------------+--------------------+
|target|       _c1|                 _c2|     _c3|            _c4|                text|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|     0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|     0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|     0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|     0|1467811795|Mon Apr 06 22:20:...|NO_

In [9]:
data.printSchema()

root
 |-- target: string (nullable = true)
 |-- _c1: long (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- text: string (nullable = true)



Selecting only few records from the total data

In [10]:
from pyspark.sql.functions import rand

data = data.orderBy(rand()).limit(600000)

In [11]:
data = data.select('target','text')

Removing special characters

In [12]:
from pyspark.sql.functions import col, lower, regexp_replace, split

def clean_text(c):
    c = lower(c)
    c = regexp_replace(c, "^rt ", "")
    c = regexp_replace(c, "(https?\://)\S+", "")
    c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
    #c = split(c, "\\s+") tokenization...
    return c

In [13]:
clean_df = data.select(clean_text(col("text")).alias("text"),'target')

In [14]:
clean_df.show()

+--------------------+------+
|                text|target|
+--------------------+------+
|having coffee and...|     4|
|strawberry banana...|     4|
|enjoying amp conf...|     4|
|loljohnjk oh okay...|     4|
|kwymore09 yay  to...|     4|
|geminitwisted im ...|     4|
|running late for ...|     0|
|thanks for my new...|     4|
|insidebooks ah i ...|     4|
|   mtv movie awards |     4|
|going to tenerife...|     0|
|ladytwiglet thank...|     4|
|how am i meant to...|     0|
|im sorry j3ssucka...|     0|
|done swimming not...|     0|
|mrssosbourne welc...|     4|
|the hangover is p...|     4|
|amanilouella bad ...|     0|
|fbmook  aww thnx ...|     4|
|great idea tennis...|     4|
+--------------------+------+
only showing top 20 rows



In [15]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer

tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")

Creating a pipeline to tokenize and extract features

In [16]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[tokenizer,stopremove,count_vec,idf,label_stringIdx])

In [17]:
d = pipeline.fit(clean_df).transform(clean_df)

In [18]:
d = d.withColumnRenamed('tf_idf','features')

In [19]:
d = d.select('features','label')

In [20]:
d.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(262144,[12,117,2...|  1.0|
|(262144,[1,49,53,...|  1.0|
|(262144,[0,28,80,...|  1.0|
|(262144,[1,5,16,2...|  1.0|
|(262144,[0,13,42,...|  1.0|
|(262144,[0,1,88,2...|  1.0|
|(262144,[0,205,33...|  0.0|
|(262144,[25,30,69...|  1.0|
|(262144,[107,218,...|  1.0|
|(262144,[131,604,...|  1.0|
|(262144,[0,3,9,84...|  0.0|
|(262144,[0,30,509...|  1.0|
|(262144,[0,2,24,2...|  0.0|
|(262144,[1,54,150...|  0.0|
|(262144,[62,101,1...|  0.0|
|(262144,[0,4,42,2...|  1.0|
|(262144,[0,25,259...|  1.0|
|(262144,[52,220,2...|  0.0|
|(262144,[0,16,224...|  1.0|
|(262144,[38,73,75...|  1.0|
+--------------------+-----+
only showing top 20 rows



In [21]:
train,test = d.randomSplit([0.7,0.3])

Evaluation on a Logistic Regression Model

In [22]:
from pyspark.ml.classification import LogisticRegression

In [23]:
lr = LogisticRegression()

In [24]:
lrmodel = lr.fit(train)

In [25]:
lr_result = lrmodel.transform(test)

In [26]:
lr_result.show()

+--------------+-----+--------------------+--------------------+----------+
|      features|label|       rawPrediction|         probability|prediction|
+--------------+-----+--------------------+--------------------+----------+
|(262144,[],[])|  0.0|[-0.1504192736087...|[0.46246592529082...|       1.0|
|(262144,[],[])|  0.0|[-0.1504192736087...|[0.46246592529082...|       1.0|
|(262144,[],[])|  0.0|[-0.1504192736087...|[0.46246592529082...|       1.0|
|(262144,[],[])|  0.0|[-0.1504192736087...|[0.46246592529082...|       1.0|
|(262144,[],[])|  0.0|[-0.1504192736087...|[0.46246592529082...|       1.0|
|(262144,[],[])|  0.0|[-0.1504192736087...|[0.46246592529082...|       1.0|
|(262144,[],[])|  0.0|[-0.1504192736087...|[0.46246592529082...|       1.0|
|(262144,[],[])|  0.0|[-0.1504192736087...|[0.46246592529082...|       1.0|
|(262144,[],[])|  0.0|[-0.1504192736087...|[0.46246592529082...|       1.0|
|(262144,[],[])|  0.0|[-0.1504192736087...|[0.46246592529082...|       1.0|
|(262144,[],

In [27]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [28]:
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(lr_result)
print("Accuracy of model is: {}".format(acc))

Accuracy of model is: 0.7178642057053932


Evaluation on a Naive Bayes Classifier

In [29]:
from pyspark.ml.classification import NaiveBayes

In [30]:
nb = NaiveBayes()

In [31]:
nbmodel = nb.fit(train)

nb_result = nbmodel.transform(test)

In [32]:
nb_result.show()

+--------------+-----+--------------------+--------------------+----------+
|      features|label|       rawPrediction|         probability|prediction|
+--------------+-----+--------------------+--------------------+----------+
|(262144,[],[])|  0.0|[-0.6937760125253...|[0.49968568285399...|       1.0|
|(262144,[],[])|  0.0|[-0.6937760125253...|[0.49968568285399...|       1.0|
|(262144,[],[])|  0.0|[-0.6937760125253...|[0.49968568285399...|       1.0|
|(262144,[],[])|  0.0|[-0.6937760125253...|[0.49968568285399...|       1.0|
|(262144,[],[])|  0.0|[-0.6937760125253...|[0.49968568285399...|       1.0|
|(262144,[],[])|  0.0|[-0.6937760125253...|[0.49968568285399...|       1.0|
|(262144,[],[])|  0.0|[-0.6937760125253...|[0.49968568285399...|       1.0|
|(262144,[],[])|  0.0|[-0.6937760125253...|[0.49968568285399...|       1.0|
|(262144,[],[])|  0.0|[-0.6937760125253...|[0.49968568285399...|       1.0|
|(262144,[],[])|  0.0|[-0.6937760125253...|[0.49968568285399...|       1.0|
|(262144,[],

In [33]:
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(nb_result)
print("Accuracy of model is: {}".format(acc))

Accuracy of model is: 0.7275316721987981
