In [4]:
import os
import shutil
import zipfile

In [5]:
base_folder = os.getcwd()
temporary_folder = os.path.join(os.getcwd(), "tmp")

In [6]:
def unzip_files():
# Unzip file on a temporary folder
    if os.path.exists(temporary_folder):
        shutil.rmtree(temporary_folder)
        
    if not os.path.exists(temporary_folder):
        os.makedirs(temporary_folder)
        
    local_file_name = os.path.join(base_folder, "training_dataset", "trainingandtestdata.zip")
    with zipfile.ZipFile(local_file_name, 'r') as zip_ref:
        zip_ref.extractall(temporary_folder)

In [7]:
def cleansing_and_tokenizing(tweet):
# Cleansing and tokenizing tweet
    from nltk.tokenize import word_tokenize
    from nltk.corpus import stopwords 
    from string import punctuation 
    from bs4 import BeautifulSoup
    import re
    
    terms_to_remove = set(stopwords.words("english") + ["USERTAGGING","URL"])
    tweet = BeautifulSoup(tweet, 'html.parser').get_text() # Extracts text from HTML (just in case!)
    tweet = tweet.lower() # Converts text to lower-case
    tweet = re.sub("((www\.[^\s]+)|(https?://[^\s]+))", "URL", tweet) # Replces URLs by URL constan
    tweet = re.sub("@[^\s]+", "USERTAGGING", tweet) # Replaces usernames by USERTAGGING constant 
    tweet = re.sub(r"#([^\s]+)", r"\1", tweet) # Removes the # in #hashtag
    for p in punctuation: 
        tweet = tweet.replace(p, "") # Removes punctiation
    tweet = word_tokenize(tweet) # Creates a list of words
    words = ""
    for each_word in tweet:
        if each_word not in terms_to_remove:
            words = words + " " + each_word
    # return [word for word in tweet if word not in terms_to_remove]
    return words[1:]

In [8]:
unzip_files()

In [20]:
from pyspark.sql import SparkSession, functions


spark = SparkSession.builder.master("local").appName("Training Twitter Sentiment Analysis").getOrCreate()
test_data = spark.read.load(
    "tmp/testdata.manual.2009.06.14.csv",
    format="csv")
test_data = test_data.withColumnRenamed("_c0", "label") \
    .withColumnRenamed("_c1", "tweet_id") \
    .withColumnRenamed("_c2", "date") \
    .withColumnRenamed("_c3", "query") \
    .withColumnRenamed("_c4", "user") \
    .withColumnRenamed("_c5", "tweet")
test_data = test_data.withColumn("label", functions.col("label").cast("integer"))

In [21]:
udf_cleansing_and_tokenizing = functions.udf(cleansing_and_tokenizing)
test_data = test_data.withColumn("tweet_cleansed", udf_cleansing_and_tokenizing(functions.col("tweet")))

In [22]:
test_data.show(5)

+-----+--------+--------------------+-------+--------+--------------------+--------------------+
|label|tweet_id|                date|  query|    user|               tweet|      tweet_cleansed|
+-----+--------+--------------------+-------+--------+--------------------+--------------------+
|    4|       3|Mon May 11 03:17:...|kindle2|  tpryan|@stellargirl I lo...|loooooooovvvvvvee...|
|    4|       4|Mon May 11 03:18:...|kindle2|  vcu451|Reading my kindle...|reading kindle2 l...|
|    4|       5|Mon May 11 03:18:...|kindle2|  chadfu|Ok, first assesme...|ok first assesmen...|
|    4|       6|Mon May 11 03:19:...|kindle2|   SIX15|@kenburbary You'l...|youll love kindle...|
|    4|       7|Mon May 11 03:21:...|kindle2|yamarama|@mikefish  Fair e...|fair enough kindl...|
+-----+--------+--------------------+-------+--------+--------------------+--------------------+
only showing top 5 rows



In [23]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="tweet_cleansed", outputCol="words")
test_data = tokenizer.transform(test_data)

In [24]:
from pyspark.ml.feature import HashingTF
hashingTF = HashingTF(inputCol="words", outputCol="term_freq")
test_data = hashingTF.transform(test_data)
test_data.show(5)

+-----+--------+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+
|label|tweet_id|                date|  query|    user|               tweet|      tweet_cleansed|               words|           term_freq|
+-----+--------+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+
|    4|       3|Mon May 11 03:17:...|kindle2|  tpryan|@stellargirl I lo...|loooooooovvvvvvee...|[loooooooovvvvvve...|(262144,[12524,83...|
|    4|       4|Mon May 11 03:18:...|kindle2|  vcu451|Reading my kindle...|reading kindle2 l...|[reading, kindle2...|(262144,[53570,73...|
|    4|       5|Mon May 11 03:18:...|kindle2|  chadfu|Ok, first assesme...|ok first assesmen...|[ok, first, asses...|(262144,[41748,12...|
|    4|       6|Mon May 11 03:19:...|kindle2|   SIX15|@kenburbary You'l...|youll love kindle...|[youll, love, kin...|(262144,[1546,218...|
|    4|       7|Mon May 11 

In [25]:
from pyspark.ml.feature import IDF 
idf = IDF(inputCol="term_freq", outputCol="tfidf")
idfModel = idf.fit(test_data)
test_data = idfModel.transform(test_data)
test_data.show(5)

+-----+--------+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|tweet_id|                date|  query|    user|               tweet|      tweet_cleansed|               words|           term_freq|               tfidf|
+-----+--------+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    4|       3|Mon May 11 03:17:...|kindle2|  tpryan|@stellargirl I lo...|loooooooovvvvvvee...|[loooooooovvvvvve...|(262144,[12524,83...|(262144,[12524,83...|
|    4|       4|Mon May 11 03:18:...|kindle2|  vcu451|Reading my kindle...|reading kindle2 l...|[reading, kindle2...|(262144,[53570,73...|(262144,[53570,73...|
|    4|       5|Mon May 11 03:18:...|kindle2|  chadfu|Ok, first assesme...|ok first assesmen...|[ok, first, asses...|(262144,[41748,12...|(262144,[41748,12...|
|    4|       6|Mon May 11 03:19:...|kin

In [26]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="label", outputCol="labelIndex")
model = stringIndexer.fit(test_data)
test_data = model.transform(test_data)
test_data.show(5)

+-----+--------+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|tweet_id|                date|  query|    user|               tweet|      tweet_cleansed|               words|           term_freq|               tfidf|labelIndex|
+-----+--------+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    4|       3|Mon May 11 03:17:...|kindle2|  tpryan|@stellargirl I lo...|loooooooovvvvvvee...|[loooooooovvvvvve...|(262144,[12524,83...|(262144,[12524,83...|       0.0|
|    4|       4|Mon May 11 03:18:...|kindle2|  vcu451|Reading my kindle...|reading kindle2 l...|[reading, kindle2...|(262144,[53570,73...|(262144,[53570,73...|       0.0|
|    4|       5|Mon May 11 03:18:...|kindle2|  chadfu|Ok, first assesme...|ok first assesmen...|[ok, first, asses...|(262144,[41748,12...|(262144

In [27]:
predicted = test_data.select("tfidf", "label")
predicted.show(5)

+--------------------+-----+
|               tfidf|label|
+--------------------+-----+
|(262144,[12524,83...|    4|
|(262144,[53570,73...|    4|
|(262144,[41748,12...|    4|
|(262144,[1546,218...|    4|
|(262144,[32392,11...|    4|
+--------------------+-----+
only showing top 5 rows



In [36]:
model_folder = os.path.join(os.getcwd(), 'saved_models')
model_full_path = os.path.join(model_folder, "twitter_sentiment_spark")
if not os.path.exists(model_folder):
    print("model does not exists")

from pyspark.ml.classification import NaiveBayes
loadModel = NaiveBayes.load(model_full_path)
predicted = loadModel.transform(predicted)

Py4JJavaError: An error occurred while calling o401.load.
: java.lang.NoSuchMethodException: org.apache.spark.ml.classification.NaiveBayesModel.<init>(java.lang.String)
	at java.base/java.lang.Class.getConstructor0(Class.java:3349)
	at java.base/java.lang.Class.getConstructor(Class.java:2151)
	at org.apache.spark.ml.util.DefaultParamsReader.load(ReadWrite.scala:468)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [31]:
model_folder = os.path.join(os.getcwd(), 'saved_models')
model_full_path = os.path.join(model_folder, "twitter_sentiment_spark")
if not os.path.exists(model_folder):
    print("model does not exists")
from pyspark.ml.pipeline import PipelineModel
persistedModel = PipelineModel.load(model_full_path)
predicted = persistedModel.transform(predicted)

IllegalArgumentException: requirement failed: Error loading metadata: Expected class name org.apache.spark.ml.PipelineModel but found class name org.apache.spark.ml.classification.NaiveBayes

In [15]:
from pyspark.ml.classification import NaiveBayes
#Naive bayes
nb = NaiveBayes(featuresCol="tfidf", labelCol="labelIndex", predictionCol="NB_pred",
                probabilityCol="NB_prob", rawPredictionCol="NB_rawPred")
nbModel = nb.fit(training)
cv = nbModel.transform(test)
total = cv.count()
correct = cv.where(test['labelIndex'] == cv['NB_pred']).count()
accuracy = correct/total

In [16]:
print(
    "\nTotal:", total, 
    "\nCorrect:", correct, 
    "\nAccuracy:", accuracy)


Total: 10105 
Correct: 6875 
Accuracy: 0.6803562592775854


In [24]:
model_folder = os.path.join(os.getcwd(), 'saved_models')

if not os.path.exists(model_folder):

    os.makedirs(model_folder)

In [27]:
model_full_path = os.path.join(model_folder, "twitter_sentiment_spark.pk")
import shutil
shutil.rmtree(model_full_path) 

In [28]:
nb.save(model_full_path)

In [31]:
cv.show()

+--------------------+----------+--------------------+--------------------+-------+
|               tfidf|labelIndex|          NB_rawPred|             NB_prob|NB_pred|
+--------------------+----------+--------------------+--------------------+-------+
|(262144,[18,42059...|       1.0|[-252.72281533249...|[0.13013379281339...|    1.0|
|(262144,[42,3386,...|       1.0|[-375.86466509495...|[0.03119412210768...|    1.0|
|(262144,[81,39216...|       1.0|[-440.84634663110...|[0.99999942585461...|    0.0|
|(262144,[101,3437...|       1.0|[-332.86250322387...|[0.59229784306963...|    0.0|
|(262144,[104,1758...|       1.0|[-591.18590544612...|[0.00701729009458...|    1.0|
|(262144,[104,2436...|       1.0|[-453.58083478952...|[5.44060865770748...|    1.0|
|(262144,[119,8804...|       1.0|[-552.86652919912...|[0.28571853232588...|    1.0|
|(262144,[145,8929...|       1.0|[-658.59185497054...|[0.99999999999900...|    0.0|
|(262144,[150,1002...|       1.0|[-529.54002759273...|[0.00407730535488...| 

In [32]:
test.show()

+--------------------+----------+
|               tfidf|labelIndex|
+--------------------+----------+
|(262144,[18,42059...|       1.0|
|(262144,[42,3386,...|       1.0|
|(262144,[81,39216...|       1.0|
|(262144,[101,3437...|       1.0|
|(262144,[104,1758...|       1.0|
|(262144,[104,2436...|       1.0|
|(262144,[119,8804...|       1.0|
|(262144,[145,8929...|       1.0|
|(262144,[150,1002...|       1.0|
|(262144,[161,822,...|       1.0|
|(262144,[161,6533...|       0.0|
|(262144,[161,1072...|       1.0|
|(262144,[161,2430...|       1.0|
|(262144,[161,3101...|       1.0|
|(262144,[161,3434...|       1.0|
|(262144,[161,9265...|       1.0|
|(262144,[220,1512...|       1.0|
|(262144,[281,2032...|       1.0|
|(262144,[285,7096...|       1.0|
|(262144,[298,5619...|       1.0|
+--------------------+----------+
only showing top 20 rows



In [53]:
from pyspark.sql.functions import monotonically_increasing_id 
cv_indexed = cv.select("*").withColumn("id_b", monotonically_increasing_id())

cv_indexed.show()

+--------------------+----------+--------------------+--------------------+-------+----+
|               tfidf|labelIndex|          NB_rawPred|             NB_prob|NB_pred|id_b|
+--------------------+----------+--------------------+--------------------+-------+----+
|(262144,[18,42059...|       1.0|[-252.72281533249...|[0.13013379281339...|    1.0|   0|
|(262144,[42,3386,...|       1.0|[-375.86466509495...|[0.03119412210768...|    1.0|   1|
|(262144,[81,39216...|       1.0|[-440.84634663110...|[0.99999942585461...|    0.0|   2|
|(262144,[101,3437...|       1.0|[-332.86250322387...|[0.59229784306963...|    0.0|   3|
|(262144,[104,1758...|       1.0|[-591.18590544612...|[0.00701729009458...|    1.0|   4|
|(262144,[104,2436...|       1.0|[-453.58083478952...|[5.44060865770748...|    1.0|   5|
|(262144,[119,8804...|       1.0|[-552.86652919912...|[0.28571853232588...|    1.0|   6|
|(262144,[145,8929...|       1.0|[-658.59185497054...|[0.99999999999900...|    0.0|   7|
|(262144,[150,1002...

In [54]:
from pyspark.sql.functions import monotonically_increasing_id 
test_indexed = test.select("*").withColumn("id_b", monotonically_increasing_id())

test_indexed.show()

+--------------------+----------+----+
|               tfidf|labelIndex|id_b|
+--------------------+----------+----+
|(262144,[18,42059...|       1.0|   0|
|(262144,[42,3386,...|       1.0|   1|
|(262144,[81,39216...|       1.0|   2|
|(262144,[101,3437...|       1.0|   3|
|(262144,[104,1758...|       1.0|   4|
|(262144,[104,2436...|       1.0|   5|
|(262144,[119,8804...|       1.0|   6|
|(262144,[145,8929...|       1.0|   7|
|(262144,[150,1002...|       1.0|   8|
|(262144,[161,822,...|       1.0|   9|
|(262144,[161,6533...|       0.0|  10|
|(262144,[161,1072...|       1.0|  11|
|(262144,[161,2430...|       1.0|  12|
|(262144,[161,3101...|       1.0|  13|
|(262144,[161,3434...|       1.0|  14|
|(262144,[161,9265...|       1.0|  15|
|(262144,[220,1512...|       1.0|  16|
|(262144,[281,2032...|       1.0|  17|
|(262144,[285,7096...|       1.0|  18|
|(262144,[298,5619...|       1.0|  19|
+--------------------+----------+----+
only showing top 20 rows



In [61]:
from pyspark.sql.functions import col
# merged = test_indexed.join(cv_indexed, on=col("columnindex"), how='inner') #.drop("id") 
merged = test_indexed.join(cv_indexed, test_indexed.id_b == cv_indexed.id_b)
merged.show()

+--------------------+----------+----+--------------------+----------+--------------------+--------------------+-------+----+
|               tfidf|labelIndex|id_b|               tfidf|labelIndex|          NB_rawPred|             NB_prob|NB_pred|id_b|
+--------------------+----------+----+--------------------+----------+--------------------+--------------------+-------+----+
|(262144,[18,42059...|       1.0|   0|(262144,[18,42059...|       1.0|[-252.72281533249...|[0.13013379281339...|    1.0|   0|
|(262144,[42,3386,...|       1.0|   1|(262144,[42,3386,...|       1.0|[-375.86466509495...|[0.03119412210768...|    1.0|   1|
|(262144,[81,39216...|       1.0|   2|(262144,[81,39216...|       1.0|[-440.84634663110...|[0.99999942585461...|    0.0|   2|
|(262144,[101,3437...|       1.0|   3|(262144,[101,3437...|       1.0|[-332.86250322387...|[0.59229784306963...|    0.0|   3|
|(262144,[104,1758...|       1.0|   4|(262144,[104,1758...|       1.0|[-591.18590544612...|[0.00701729009458...|    1.