In [1]:
# import required libraries
import nltk
from nltk import word_tokenize
from nltk.corpus import stopwords

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split

import pandas as pd
import string

In [2]:
# download punctuation and stopwords from nltk
nltk.download('punkt')
nltk.download("stopwords")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [4]:
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.2.3'
# spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Get:8 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Hit:9 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic I

In [50]:
# load tweets_df and view
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()
from pyspark import SparkFiles
url ="https://tweet-2022.s3.amazonaws.com/Tweets.csv"
spark.sparkContext.addFile(url)
spark_tweets_df = spark.read.csv(SparkFiles.get("Tweets.csv"), sep=",", header=True)
# tweets_df = spark.read.csv(SparkFiles.get("Tweets.csv"), sep=",", header=True)


# tweets_df = pd.read_csv("/content/Tweets.csv")
tweets_df=spark_tweets_df.toPandas()
tweets_df

Unnamed: 0,textID,text,selected_text,sentiment
0,cb774db0d1,"I`d have responded, if I were going","I`d have responded, if I were going",neutral
1,549e992a42,Sooo SAD I will miss you here in San Diego!!!,Sooo SAD,negative
2,088c60f138,my boss is bullying me...,bullying me,negative
3,9642c003ef,what interview! leave me alone,leave me alone,negative
4,358bd9e861,"Sons of ****, why couldn`t they put them on t...","Sons of ****,",negative
...,...,...,...,...
27476,4eac33d1c0,wish we could come see u on Denver husband l...,d lost,negative
27477,4f4c4fc327,I`ve wondered about rake to. The client has ...,", don`t force",negative
27478,f67aae2310,Yay good for both of you. Enjoy the break - y...,Yay good for both of you.,positive
27479,ed167662a5,But it was worth it ****.,But it was worth it ****.,positive


In [51]:
# get dataframe ready for processing

# make sure the tweets in column "text" are strings
tweets_df['text'] = tweets_df['text'].astype('str')

# delete the unneccessary columns
tweets_df = tweets_df.drop(columns=["textID", "selected_text"])
tweets_df=tweets_df.rename(columns={'sentiment':'class'})
tweets_df=tweets_df[['class','text']]


In [52]:
def process_tweets(tweet):
    tweet = tweet.lower()
    final_tweet = "".join(char for char in tweet if char not in string.punctuation)
    # tokenize_tweet = word_tokenize(tweet)
    # stopword = stopwords.words("english")
    # tweet_wo_stop = [word for word in tokenize_tweet if word not in stopword]
    # final_tweet = " ".join(tweet_wo_stop)
    return final_tweet

In [53]:
# # process tweets using above function
tweets_df['text'] = tweets_df['text'].apply(lambda x: process_tweets(x))

tweets_df['length']=""
for index,row in tweets_df.iterrows():
  tweet_length=len(row['text'])
  tweets_df.loc[index,'length']=tweet_length
tweets_df

Unnamed: 0,class,text,length
0,neutral,id have responded if i were going,34
1,negative,sooo sad i will miss you here in san diego,43
2,negative,my boss is bullying me,22
3,negative,what interview leave me alone,30
4,negative,sons of why couldnt they put them on the rel...,69
...,...,...,...
27476,negative,wish we could come see u on denver husband l...,76
27477,negative,ive wondered about rake to the client has ma...,115
27478,positive,yay good for both of you enjoy the break you...,109
27479,positive,but it was worth it,22


In [61]:
tweets_df=tweets_df.dropna()

In [54]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [55]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [56]:
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [62]:
from pyspark.sql import SQLContext
sc = SparkSession.builder.getOrCreate()
sqlContext = SQLContext(sc)

spark_tweets = sqlContext.createDataFrame(tweets_df)



In [63]:
cleaner = data_prep_pipeline.fit(spark_tweets)
cleaned = cleaner.transform(spark_tweets)

In [45]:
# Show label and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[102382,1...|
|  2.0|(262145,[125638,1...|
|  2.0|(262145,[122399,1...|
|  2.0|(262145,[27308,16...|
|  2.0|(262145,[2306,660...|
|  0.0|(262145,[86752,91...|
|  1.0|(262145,[23087,53...|
|  0.0|(262145,[136020,1...|
|  0.0|(262145,[249180,2...|
|  1.0|(262145,[4631,440...|
|  0.0|(262145,[13007,31...|
|  1.0|(262145,[17734,11...|
|  2.0|(262145,[70143,15...|
|  2.0|(262145,[19153,73...|
|  0.0|(262145,[115148,1...|
|  2.0|(262145,[18184,28...|
|  2.0|(262145,[33123,97...|
|  2.0|(262145,[24698,11...|
|  2.0|(262145,[17893,21...|
|  0.0|(262145,[54591,26...|
+-----+--------------------+
only showing top 20 rows



In [64]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [65]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(truncate=False)

+--------+----------------------------------------------------------------------------------------------------------------------+------+-----+------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------

In [66]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.567164


Optimized - Removing tweets marked as 'neutral

In [78]:
optimized_df = spark_tweets_df.toPandas()
optimized_df=optimized_df.loc[optimized_df['sentiment']!="neutral"]
optimized_df=optimized_df.dropna()
optimized_df

Unnamed: 0,textID,text,selected_text,sentiment
1,549e992a42,Sooo SAD I will miss you here in San Diego!!!,Sooo SAD,negative
2,088c60f138,my boss is bullying me...,bullying me,negative
3,9642c003ef,what interview! leave me alone,leave me alone,negative
4,358bd9e861,"Sons of ****, why couldn`t they put them on t...","Sons of ****,",negative
6,6e0c6d75b1,2am feedings for the baby are fun when he is a...,fun,positive
...,...,...,...,...
27475,b78ec00df5,enjoy ur night,enjoy,positive
27476,4eac33d1c0,wish we could come see u on Denver husband l...,d lost,negative
27477,4f4c4fc327,I`ve wondered about rake to. The client has ...,", don`t force",negative
27478,f67aae2310,Yay good for both of you. Enjoy the break - y...,Yay good for both of you.,positive


In [79]:
# get dataframe ready for processing

# make sure the tweets in column "text" are strings
optimized_df['text'] = optimized_df['text'].astype('str')

# delete the unneccessary columns
optimized_df = optimized_df.drop(columns=["textID", "selected_text"])
optimized_df=optimized_df.rename(columns={'sentiment':'class'})
optimized_df=optimized_df[['class','text']]

In [80]:
optimized_df['text'] = optimized_df['text'].apply(lambda x: process_tweets(x))
optimized_df['length']=""
for index,row in optimized_df.iterrows():
  tweet_length=len(row['text'])
  optimized_df.loc[index,'length']=tweet_length
optimized_df

Unnamed: 0,class,text,length
1,negative,sooo sad i will miss you here in san diego,43
2,negative,my boss is bullying me,22
3,negative,what interview leave me alone,30
4,negative,sons of why couldnt they put them on the rel...,69
6,positive,2am feedings for the baby are fun when he is a...,64
...,...,...,...
27475,positive,enjoy ur night,16
27476,negative,wish we could come see u on denver husband l...,76
27477,negative,ive wondered about rake to the client has ma...,115
27478,positive,yay good for both of you enjoy the break you...,109


In [81]:
opt_spark_tweets = sqlContext.createDataFrame(optimized_df)

In [24]:
optimized_df.dtypes

class     object
text      object
length    object
dtype: object

In [82]:
cleaner = data_prep_pipeline.fit(opt_spark_tweets)
cleaned = cleaner.transform(opt_spark_tweets)

In [83]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
opt_training, opt_testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
opt_nb = NaiveBayes()
opt_predictor = opt_nb.fit(opt_training)

In [84]:
opt_test_results = opt_predictor.transform(opt_testing)
opt_test_results.show(truncate=False)

+--------+----------------------------------------------------------------------------------------------------------------------+------+-----+------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------

In [85]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(opt_test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.791064
