In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('spark_pipeline').getOrCreate()

In [3]:
df=spark.read.csv('Tweets.csv',header=True,inferSchema=True)

In [4]:
df.show(5)

+----------+--------------------+--------------------+---------+
|    textID|                text|       selected_text|sentiment|
+----------+--------------------+--------------------+---------+
|cb774db0d1| I`d have respond...|I`d have responde...|  neutral|
|549e992a42| Sooo SAD I will ...|            Sooo SAD| negative|
|088c60f138|my boss is bullyi...|         bullying me| negative|
|9642c003ef| what interview! ...|      leave me alone| negative|
|358bd9e861| Sons of ****, wh...|       Sons of ****,| negative|
+----------+--------------------+--------------------+---------+
only showing top 5 rows



In [5]:
tweets=df.select('selected_text','sentiment')

In [6]:
tweets.show(5,False)

+-----------------------------------+---------+
|selected_text                      |sentiment|
+-----------------------------------+---------+
|I`d have responded, if I were going|neutral  |
|Sooo SAD                           |negative |
|bullying me                        |negative |
|leave me alone                     |negative |
|Sons of ****,                      |negative |
+-----------------------------------+---------+
only showing top 5 rows



In [7]:
tweets.count()

27481

In [8]:
#tweets=tweets.toPandas()

In [9]:
#tweets.isna().sum()

In [10]:
#tweets=tweets.dropna()

In [11]:
#tweets.isna().sum()

In [12]:
tweets=df.dropna()

In [13]:
tweets.count()

27478

In [14]:
from pyspark.ml.feature import StringIndexer 

In [15]:
tweets=StringIndexer(inputCol='sentiment',outputCol='label').fit(tweets).transform(tweets)


In [16]:
tweets.show(4,False)

+----------+----------------------------------------------+-----------------------------------+---------+-----+
|textID    |text                                          |selected_text                      |sentiment|label|
+----------+----------------------------------------------+-----------------------------------+---------+-----+
|cb774db0d1| I`d have responded, if I were going          |I`d have responded, if I were going|neutral  |0.0  |
|549e992a42| Sooo SAD I will miss you here in San Diego!!!|Sooo SAD                           |negative |2.0  |
|088c60f138|my boss is bullying me...                     |bullying me                        |negative |2.0  |
|9642c003ef| what interview! leave me alone               |leave me alone                     |negative |2.0  |
+----------+----------------------------------------------+-----------------------------------+---------+-----+
only showing top 4 rows



In [17]:
tweets.printSchema()

root
 |-- textID: string (nullable = true)
 |-- text: string (nullable = true)
 |-- selected_text: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- label: double (nullable = false)



In [18]:
train_df,test_df=tweets.randomSplit([0.7,0.3])

In [19]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF,IDF
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.classification import LogisticRegression 
from pyspark.ml import Pipeline

In [20]:
tokenizer=Tokenizer(inputCol='selected_text',outputCol='tokens')
stopwordsremover=StopWordsRemover(inputCol='tokens',outputCol='tokenized_words')
hashingtf=HashingTF(inputCol='tokenized_words',outputCol='hash_num')
countvectorizer=CountVectorizer(inputCol='tokenized_words',outputCol='vec_num')
idf=IDF(inputCol='vec_num',outputCol='inverse_num')
lr=LogisticRegression(featuresCol='inverse_num',labelCol='label')

In [21]:
pipeline=Pipeline(stages=[tokenizer,stopwordsremover,countvectorizer,idf,lr])

In [22]:
model=pipeline.fit(train_df)

In [23]:
predictions=model.transform(test_df)

In [24]:
predictions.select('tokenized_words','label','prediction').show(10,False)

+----------------------------------------------------------------------------------------+-----+----------+
|tokenized_words                                                                         |label|prediction|
+----------------------------------------------------------------------------------------+-----+----------+
|[sorry]                                                                                 |2.0  |2.0       |
|[headed, eat, hubby, n, mommy!!, , ready, drinks...]                                    |0.0  |0.0       |
|[love]                                                                                  |1.0  |1.0       |
|[music, collection, coming, 250gb, soon., , haha]                                       |0.0  |1.0       |
|[bless]                                                                                 |1.0  |1.0       |
|[last, night, fun, w]                                                                   |1.0  |2.0       |
|[im, kind, sad..]          

In [25]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [26]:
evaluator=MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction',metricName='accuracy')

In [27]:
model_accuracy=evaluator.evaluate(predictions)

In [28]:
 model_accuracy

0.7173093297652354