# Twitter Sentiment Analysis using PySpark

### Entry notes :

- Spark has three different data structures available through its APIs: RDD, Dataframe (this is different from Pandas data frame), Dataset.
- Dataframe is much faster than RDD because it has metadata (some information about data) associated with it, which allows Spark to optimize query plan.
-  SparkMLLib is used with RDD, while SparkML supports Dataframe.

In [1]:
import findspark
findspark.init()
import pyspark as ps
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import StructType,StructField,IntegerType
from pyspark.ml.feature import CountVectorizer,HashingTF,IDF,Tokenizer,StringIndexer,Imputer

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression,NaiveBayes

import warnings

warnings.filterwarnings('ignore')

In [2]:
findspark.find()

'C:\\spark'

In [5]:
# local[*] : Run Spark locally with as many worker threads as logical cores on your machine.
conf = SparkConf().setMaster("local[4]").setAppName("Sentiment_Analysis").set("spark.executor.memory", "4g").set("spark.driver.memory", "4g")
sc = SparkContext(conf = conf)
sqlcontext = SQLContext(sc)

In [6]:
sc

#### I will use the cleaned data from my Amazon Food Review project, that you can find in the same repository

In [7]:
path = "./Data/clean_tweet.csv"
data = sqlcontext.read.format('com.databricks.spark.csv').\
                 options(header='true',inferschema='true').load(path)
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- target: integer (nullable = true)



In [8]:
data.show(5,truncate=True)

+---+--------------------+------+
|_c0|                text|target|
+---+--------------------+------+
|  0|awww that s a bum...|     0|
|  1|is upset that he ...|     0|
|  2|i dived many time...|     0|
|  3|my whole body fee...|     0|
|  4|no it s not behav...|     0|
+---+--------------------+------+
only showing top 5 rows



#### change the columns types

In [9]:
data.columns

['_c0', 'text', 'target']

In [10]:
# data_cast = data.withColumn('HelpfulnessNumerator',
#                              data['HelpfulnessNumerator'].cast(IntegerType()))

In [11]:
# data_cast = data_cast.withColumn('HelpfulnessDenominator',
#                              data['HelpfulnessDenominator'].cast(IntegerType()))

In [12]:
# data_cast = data_cast.withColumn('Score',
#                              data['Score'].cast(IntegerType()))

In [13]:
# data_cast.printSchema

In [14]:
data.count()

200000

In [15]:
data = data.dropna()
data.count()

199642

In [16]:
# def null_removal(df,column_name): 
#     null_before_removal = df.where(col(column_name).isNull()).count()
#     df=df.na.drop(subset=[column_name])
#     null_after_removal = df.where(col(column_name).isNull()).count()
#     df.printSchema()
#     return df,null_before_removal,null_after_removal

# data_cleaned,null_before_removal,null_after_removal = null_removal(data_cast , 'Text')
# print('Null values to be removed:',null_before_removal)

#### Split into 
- train: 80 %
- validation: 15 %
- test: 5 %

In [17]:
( train_set ,test_set ,validation_set ) = data.randomSplit([0.8,0.15,0.05],seed = 4)

In [18]:
# train_set,null_before_removal,null_after_removal = null_removal(train_set , 'Text')
# print('Null values to be removed:',null_before_removal)

In [19]:
# tokenizer = Tokenizer(inputCol = 'Text',outputCol = 'words')
# tokenized = tokenizer.transform(train_set)

In [20]:
# tokenized.select('words').show(5)

In [21]:
# hashtf = HashingTF(inputCol = 'words',outputCol = 'tf')
# hashed = hashtf.transform(tokenized)

In [22]:
# hashed.select('tf').show(5)

In [23]:
# hashed,null_before_removal,null_after_removal = null_removal(hashed , 'tf')
# print('Null values to be removed:',null_before_removal)

#### TF-IDF Preprocessing

In [24]:
tokenizer = Tokenizer(inputCol = 'text',outputCol = 'words')

hashtf = HashingTF(numFeatures=2**16,inputCol = 'words',outputCol = 'tf')
# cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')

idf = IDF(inputCol = 'tf', outputCol = 'features' ,minDocFreq =5)

label_string_indexer = StringIndexer(inputCol = 'target',outputCol= 'label' )
label_string_indexer.setHandleInvalid("skip")

# log_reg = LogisticRegression(maxIter=100)
# nb = NaiveBayes()
pipeline = Pipeline(stages = [tokenizer,hashtf,idf,label_string_indexer])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
validation_set_df = pipelineFit.transform(validation_set)
train_df.show(5)

+---+--------------------+------+--------------------+--------------------+--------------------+-----+
|_c0|                text|target|               words|                  tf|            features|label|
+---+--------------------+------+--------------------+--------------------+--------------------+-----+
|  0|awww that s a bum...|     0|[awww, that, s, a...|(65536,[8436,8847...|(65536,[8436,8847...|  0.0|
|  1|is upset that he ...|     0|[is, upset, that,...|(65536,[1444,2071...|(65536,[1444,2071...|  0.0|
|  2|i dived many time...|     0|[i, dived, many, ...|(65536,[2548,2888...|(65536,[2548,2888...|  0.0|
|  3|my whole body fee...|     0|[my, whole, body,...|(65536,[158,11650...|(65536,[158,11650...|  0.0|
|  4|no it s not behav...|     0|[no, it, s, not, ...|(65536,[1968,4488...|(65536,[1968,4488...|  0.0|
+---+--------------------+------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



#### Logistic Regression

In [25]:
log_reg = LogisticRegression(maxIter=100)
log_reg_model=log_reg.fit(train_df)
preds = log_reg_model.transform(validation_set_df)

Py4JJavaError: An error occurred while calling o279.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 11.0 failed 1 times, most recent failure: Lost task 1.0 in stage 11.0 (TID 30, localhost, executor driver): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1098)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1092)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1161)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1137)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1.apply(LogisticRegression.scala:520)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1.apply(LogisticRegression.scala:494)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:185)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:185)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:494)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:489)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:279)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()