## **Setting up a PySpark environment in Colab**

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz
!tar xf spark-3.2.2-bin-hadoop3.2.tgz

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.2-bin-hadoop3.2"

!pip install -q findspark

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
            Hit:2 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
[33m0% [Connecting to archive.ubuntu.com (91.189.91.39)] [1 InRelease 43.1 kB/88.7 [0m[33m0% [2 InRelease gpgv 15.9 kB] [Connecting to archive.ubuntu.com (91.189.91.39)][0m                                                                               Hit:3 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
[33m0% [2 InRelease gpgv 15.9 kB] [Connecting to archive.ubuntu.com (91.189.91.39)][0m                                                                               Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
[33m0% [2 InRelease gpgv 15.9 kB] [Connecting to archive.ubuntu.com (91.189.91.39)][0m[33m0% [2 InRelease gpgv 15.9 kB] [Connecting to archive.ubuntu.com (91.189.91.39)][0m[33m0% [2 InRelease gpgv 15.9 kB] [

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
# set up spark session 
spark = SparkSession.builder\
        .master("local[10]")\
        .appName("twitter Sentiment Analysis")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

## **Loading data**

In [3]:
import zipfile
!wget http://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip

with zipfile.ZipFile('trainingandtestdata.zip') as zfile:
  zfile.extractall('Dataset/')

--2022-10-22 01:24:58--  http://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip
Resolving cs.stanford.edu (cs.stanford.edu)... 171.64.64.64
Connecting to cs.stanford.edu (cs.stanford.edu)|171.64.64.64|:80... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip [following]
--2022-10-22 01:24:58--  https://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip
Connecting to cs.stanford.edu (cs.stanford.edu)|171.64.64.64|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 81363704 (78M) [application/zip]
Saving to: ‘trainingandtestdata.zip’


2022-10-22 01:25:03 (15.7 MB/s) - ‘trainingandtestdata.zip’ saved [81363704/81363704]



## **Understanding data**

In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("sentiment", IntegerType(), True),
    StructField("id", IntegerType(), True),
    StructField("date", StringType(), True),
    StructField("query_string", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True)],
  )

df=spark.read.csv('Dataset/training.1600000.processed.noemoticon.csv',header=False,schema=schema)
df.show(5)

+---------+----------+--------------------+------------+---------------+--------------------+
|sentiment|        id|                date|query_string|           user|                text|
+---------+----------+--------------------+------------+---------------+--------------------+
|        0|1467810369|Mon Apr 06 22:19:...|    NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|        0|1467810672|Mon Apr 06 22:19:...|    NO_QUERY|  scotthamilton|is upset that he ...|
|        0|1467810917|Mon Apr 06 22:19:...|    NO_QUERY|       mattycus|@Kenichan I dived...|
|        0|1467811184|Mon Apr 06 22:19:...|    NO_QUERY|        ElleCTF|my whole body fee...|
|        0|1467811193|Mon Apr 06 22:19:...|    NO_QUERY|         Karoli|@nationwideclass ...|
+---------+----------+--------------------+------------+---------------+--------------------+
only showing top 5 rows



In [5]:
df = df.drop(*('id','date','query_string','user'))
df.show(5)

+---------+--------------------+
|sentiment|                text|
+---------+--------------------+
|        0|@switchfoot http:...|
|        0|is upset that he ...|
|        0|@Kenichan I dived...|
|        0|my whole body fee...|
|        0|@nationwideclass ...|
+---------+--------------------+
only showing top 5 rows



In [6]:
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan('sentiment') | col('sentiment').isNull() , True))]).show()

+------------------------------------------------------------------------+
|count(CASE WHEN (isnan(sentiment) OR (sentiment IS NULL)) THEN true END)|
+------------------------------------------------------------------------+
|                                                                       0|
+------------------------------------------------------------------------+



In [7]:
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan('text') | col('text').isNull() , True))]).show()

+--------------------------------------------------------------+
|count(CASE WHEN (isnan(text) OR (text IS NULL)) THEN true END)|
+--------------------------------------------------------------+
|                                                             0|
+--------------------------------------------------------------+



In [8]:
df.groupBy('sentiment').count().orderBy('count',ascending=False).show()

+---------+------+
|sentiment| count|
+---------+------+
|        0|800000|
|        4|800000|
+---------+------+



In [9]:
df.filter(df.sentiment == 0).select("text").take(20)

[Row(text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D"),
 Row(text="is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!"),
 Row(text='@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds'),
 Row(text='my whole body feels itchy and like its on fire '),
 Row(text="@nationwideclass no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over there. "),
 Row(text='@Kwesidei not the whole crew '),
 Row(text='Need a hug '),
 Row(text="@LOLTrish hey  long time no see! Yes.. Rains a bit ,only a bit  LOL , I'm fine thanks , how's you ?"),
 Row(text="@Tatiana_K nope they didn't have it "),
 Row(text='@twittera que me muera ? '),
 Row(text="spring break in plain city... it's snowing "),
 Row(text='I just re-pierced my ears '),
 Row(text="@caregiving I couldn't bear to watch it.  And I thought the UA loss

In [10]:
df.filter(df.sentiment == 4).select("text").take(20)

[Row(text='I LOVE @Health4UandPets u guys r the best!! '),
 Row(text='im meeting up with one of my besties tonight! Cant wait!!  - GIRL TALK!!'),
 Row(text='@DaRealSunisaKim Thanks for the Twitter add, Sunisa! I got to meet you once at a HIN show here in the DC area and you were a sweetheart. '),
 Row(text='Being sick can be really cheap when it hurts too much to eat real food  Plus, your friends make you soup'),
 Row(text='@LovesBrooklyn2 he has that effect on everyone '),
 Row(text='@ProductOfFear You can tell him that I just burst out laughing really loud because of that  Thanks for making me come out of my sulk!'),
 Row(text='@r_keith_hill Thans for your response. Ihad already find this answer '),
 Row(text="@KeepinUpWKris I am so jealous, hope you had a great time in vegas! how did you like the ACM's?! LOVE YOUR SHOW!! "),
 Row(text='@tommcfly ah, congrats mr fletcher for finally joining twitter '),
 Row(text='@e4VoIP I RESPONDED  Stupid cat is helping me type. Forgive errors '),


In [11]:
from pyspark.sql.functions import udf
@udf
def len_row(text):
  return len(text.split())
df = df.withColumn("pre_clean_len", len_row(df.text))
df = df.withColumn("pre_clean_len", df.pre_clean_len.cast(IntegerType()))
df.show(5)

+---------+--------------------+-------------+
|sentiment|                text|pre_clean_len|
+---------+--------------------+-------------+
|        0|@switchfoot http:...|           19|
|        0|is upset that he ...|           21|
|        0|@Kenichan I dived...|           18|
|        0|my whole body fee...|           10|
|        0|@nationwideclass ...|           21|
+---------+--------------------+-------------+
only showing top 5 rows



In [12]:
# import plotly.express as px

# fig = px.box(df.toPandas(), y="pre_clean_len")
# fig.show()

## **Data Cleaning**

In [13]:
import nltk
import numpy as np
import re
import spacy
from nltk.corpus import stopwords
from numpy.core.defchararray import isnumeric
import string
from nltk.stem import WordNetLemmatizer
nlp = spacy.load("en_core_web_sm")
nltk.download('stopwords')
stop_words=list(stopwords.words('english'))
stop_words.remove('not')
nltk.download('wordnet')
nltk.download('omw-1.4')


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...


True

In [14]:
mydict = {"isn't":"is not", "aren't":"are not", "wasn't":"was not", "weren't":"were not",
                "haven't":"have not","hasn't":"has not","hadn't":"had not","won't":"will not",
                "wouldn't":"would not", "don't":"do not", "doesn't":"does not","didn't":"did not",
                "can't":"can not","couldn't":"could not","shouldn't":"should not","mightn't":"might not",
                "mustn't":"must not","im":"i am"}

In [15]:
def replace_dict(text):
  text2=""
  for word in text.lower().split():
    if word in mydict.keys():
      text2=text2+" "+mydict[word]
    else:
      text2=text2+" "+word
  return text2.split()

@udf
def clean(text):
  #Removing urls
  text=re.sub(r'http\S+', '', text)

  #Removing html elements
  text = re.sub(r'<.*?>', '', text)

  #Removing whitespaces
  text = re.sub(r'\s+', ' ', text).strip()

  #Removing punctuations
  text=text.translate(str.maketrans('', '', string.punctuation))

  #lemmatize
  lemmatizer = WordNetLemmatizer()
  lemmatized_text=[lemmatizer.lemmatize(word.lower()) for word in replace_dict(text) if word not in stop_words if not np.char.isnumeric(word)  ]
  
  return ' '.join(lemmatized_text)


In [16]:
df = df.withColumn("text", clean(df.text))
df = df.withColumn("pre_clean_len", len_row(df.text))
df.show(5)

+---------+--------------------+-------------+
|sentiment|                text|pre_clean_len|
+---------+--------------------+-------------+
|        0|switchfoot awww t...|           10|
|        0|upset cant update...|           12|
|        0|kenichan dived ma...|           10|
|        0|whole body feel i...|            6|
|        0|nationwideclass n...|            6|
+---------+--------------------+-------------+
only showing top 5 rows



In [17]:
# from pyspark.sql.functions import isnan, when, count, col

# df.select([count(when(isnan('text') | col('text').isNull() , True))]).show()

In [18]:
import pyspark.sql.functions as F
df2=df.where(df.pre_clean_len>0).select("*").orderBy(F.rand())

## **Processing**

In [19]:
(train_set, val_set, test_set) = df2.randomSplit([0.98, 0.01, 0.01], seed = 200)

In [20]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import  IDF, Tokenizer, CountVectorizer, StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="text", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)
label_stringIdx = StringIndexer(inputCol = "sentiment", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_set)
predictions_train = pipelineFit.transform(train_set)
predictions_val = pipelineFit.transform(val_set)
predictions_test = pipelineFit.transform(test_set)

In [21]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

accuracy_train = predictions_train.filter(predictions_train.label == predictions_train.prediction).count() / float(train_set.count())
roc_auc_train = evaluator.evaluate(predictions_train)

accuracy_val = predictions_val.filter(predictions_val.label == predictions_val.prediction).count() / float(val_set.count())
roc_auc_val = evaluator.evaluate(predictions_val)

accuracy_test = predictions_test.filter(predictions_test.label == predictions_test.prediction).count() / float(test_set.count())
roc_auc_test = evaluator.evaluate(predictions_test)

print("train: Accuracy Score: {0:.4f}".format(accuracy_train))
print("train: ROC-AUC: {0:.4f}".format(roc_auc_train))

print("val: Accuracy Score: {0:.4f}".format(accuracy_val))
print("val: ROC-AUC: {0:.4f}".format(roc_auc_val))

print("test: Accuracy Score: {0:.4f}".format(accuracy_test))
print("test: ROC-AUC: {0:.4f}".format(roc_auc_test))

train: Accuracy Score: 0.8090
train: ROC-AUC: 0.8851
val: Accuracy Score: 0.7790
val: ROC-AUC: 0.8516
test: Accuracy Score: 0.7860
test: ROC-AUC: 0.8548
