In [5]:
#import modules
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

#create Spark session
appName = "Sentiment Analysis in Spark"
spark = SparkSession.builder.appName(appName).getOrCreate()

23/11/23 12:08:57 WARN Utils: Your hostname, Akshays-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.87 instead (on interface en0)
23/11/23 12:08:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/23 12:08:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
tweets_csv = spark.read.csv('tweets.csv', inferSchema=True, header=True)
tweets_csv.show(truncate=False, n=3)

+------+---------+---------------+---------------------------------+
|ItemID|Sentiment|SentimentSource|SentimentText                    |
+------+---------+---------------+---------------------------------+
|1038  |1        |Sentiment140   |that film is fantastic #brilliant|
|1804  |1        |Sentiment140   |this music is really bad #myband |
|1693  |0        |Sentiment140   |winter is terrible #thumbs-down  |
+------+---------+---------------+---------------------------------+
only showing top 3 rows



In [7]:
data = tweets_csv.select("SentimentText", col("Sentiment").cast("Int").alias("label"))
data.show(truncate = False,n=5)

+---------------------------------+-----+
|SentimentText                    |label|
+---------------------------------+-----+
|that film is fantastic #brilliant|1    |
|this music is really bad #myband |1    |
|winter is terrible #thumbs-down  |0    |
|this game is awful #nightmare    |0    |
|I love jam #loveit               |1    |
+---------------------------------+-----+
only showing top 5 rows



## Train Test Split

In [9]:
#divide data, 70% for training, 30% for testing
dividedData = data.randomSplit([0.7, 0.3]) 
trainingData = dividedData[0] #index 0 = data training
testingData = dividedData[1] #index 1 = data testing
train_rows = trainingData.count()
test_rows = testingData.count()
print ("Training data rows:", train_rows, "; Testing data rows:", test_rows)

Training data rows: 1302 ; Testing data rows: 630


## Prepare training data

Separate "SentimentText" into individual words using tokenizer

In [10]:
tokenizer = Tokenizer(inputCol="SentimentText", outputCol="SentimentWords")
tokenizedTrain = tokenizer.transform(trainingData)
tokenizedTrain.show(truncate=False, n=5)

+---------------------------------+-----+---------------------------------------+
|SentimentText                    |label|SentimentWords                         |
+---------------------------------+-----+---------------------------------------+
|I adore cheese #bestever         |1    |[i, adore, cheese, #bestever]          |
|I adore cheese #brilliant        |1    |[i, adore, cheese, #brilliant]         |
|I adore cheese #loveit           |1    |[i, adore, cheese, #loveit]            |
|I adore cheese #thumbs-up        |1    |[i, adore, cheese, #thumbs-up]         |
|I adore classical music #bestever|1    |[i, adore, classical, music, #bestever]|
+---------------------------------+-----+---------------------------------------+
only showing top 5 rows



### Removing stop words (unimportant words to be features)



In [11]:
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="MeaningfulWords")
SwRemovedTrain = swr.transform(tokenizedTrain)
SwRemovedTrain.show(truncate=False, n=5)

+---------------------------------+-----+---------------------------------------+------------------------------------+
|SentimentText                    |label|SentimentWords                         |MeaningfulWords                     |
+---------------------------------+-----+---------------------------------------+------------------------------------+
|I adore cheese #bestever         |1    |[i, adore, cheese, #bestever]          |[adore, cheese, #bestever]          |
|I adore cheese #brilliant        |1    |[i, adore, cheese, #brilliant]         |[adore, cheese, #brilliant]         |
|I adore cheese #loveit           |1    |[i, adore, cheese, #loveit]            |[adore, cheese, #loveit]            |
|I adore cheese #thumbs-up        |1    |[i, adore, cheese, #thumbs-up]         |[adore, cheese, #thumbs-up]         |
|I adore classical music #bestever|1    |[i, adore, classical, music, #bestever]|[adore, classical, music, #bestever]|
+---------------------------------+-----+-------

### Converting words feature into numerical features

In [13]:
hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericTrainData = hashTF.transform(SwRemovedTrain).select(
    'label', 'MeaningfulWords', 'features')
numericTrainData.show(truncate=False, n=3)

+-----+---------------------------+-------------------------------------------+
|label|MeaningfulWords            |features                                   |
+-----+---------------------------+-------------------------------------------+
|1    |[adore, cheese, #bestever] |(262144,[1689,91011,100089],[1.0,1.0,1.0]) |
|1    |[adore, cheese, #brilliant]|(262144,[1689,45361,100089],[1.0,1.0,1.0]) |
|1    |[adore, cheese, #loveit]   |(262144,[1689,100089,254974],[1.0,1.0,1.0])|
+-----+---------------------------+-------------------------------------------+
only showing top 3 rows



In [15]:
lr = LogisticRegression(labelCol="label", featuresCol="features", 
                        maxIter=10, regParam=0.01)
model = lr.fit(numericTrainData)
print ("Training is done!")

Training is done!


In [16]:
tokenizedTest = tokenizer.transform(testingData)
SwRemovedTest = swr.transform(tokenizedTest)
numericTest = hashTF.transform(SwRemovedTest).select(
    'Label', 'MeaningfulWords', 'features')
numericTest.show(truncate=False, n=2)

+-----+---------------------------+-------------------------------------------+
|Label|MeaningfulWords            |features                                   |
+-----+---------------------------+-------------------------------------------+
|1    |[adore, cheese, #favorite] |(262144,[1689,100089,108624],[1.0,1.0,1.0])|
|1    |[adore, cheese, #toptastic]|(262144,[1689,42010,100089],[1.0,1.0,1.0]) |
+-----+---------------------------+-------------------------------------------+
only showing top 2 rows



In [19]:
prediction = model.transform(numericTest)
predictionFinal = prediction.select(
    "MeaningfulWords", "prediction", "Label")
predictionFinal.show(n=40, truncate = False)
correctPrediction = predictionFinal.filter(
    predictionFinal['prediction'] == predictionFinal['Label']).count()
totalData = predictionFinal.count()
print("correct prediction:", correctPrediction, ", total data:", totalData, 
      ", accuracy:", correctPrediction/totalData)

+-------------------------------------+----------+-----+
|MeaningfulWords                      |prediction|Label|
+-------------------------------------+----------+-----+
|[adore, cheese, #favorite]           |1.0       |1    |
|[adore, cheese, #toptastic]          |1.0       |1    |
|[adore, coffee, #brilliant]          |1.0       |1    |
|[adore, coffee, #favorite]           |1.0       |1    |
|[adore, coffee, #toptastic]          |1.0       |1    |
|[adore, jam, #bestever]              |1.0       |1    |
|[adore, jam, #favorite]              |1.0       |1    |
|[adore, pop, music, #thumbs-up]      |1.0       |1    |
|[adore, skiing, #bestever]           |1.0       |1    |
|[adore, skiing, #loveit]             |1.0       |1    |
|[adore, skiing, #toptastic]          |1.0       |1    |
|[adore, summer, #bestever]           |1.0       |1    |
|[adore, summer, #brilliant]          |1.0       |1    |
|[adore, summer, #favorite]           |1.0       |1    |
|[adore, summer, #thumbs-up]   