In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import length
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark import SparkContext
from pyspark import SparkFiles

import pandas as pd

import os
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/'


In [2]:
spark = SparkSession.builder.appName('twitter').getOrCreate()

In [3]:
# # Load in data
# from pyspark import SparkFiles
# url ="https://s3.amazonaws.com/zepl-trilogy-test/yelp_reviews_quarter.csv"
# spark.sparkContext.addFile(url)
# df = spark.read.csv(SparkFiles.get("yelp_reviews_quarter.csv"), sep=",", header=True)
# df = df.withColumn('original', df['class'])
# df = df.withColumn('tweet', df['text'])
# df.show()

In [4]:
url ="CSV_cleaned/tweets_sample2.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("tweets_sample2.csv"), sep=",", header=True)
df.show()

# url ="CSV_cleaned/tweets_sample.csv"
# spark.sparkContext.addFile(url)
# df = spark.read.csv(SparkFiles.get("tweets_sample.csv"), sep=",", header=True)
# df.show()

+--------------------+---------+--------+
|                text|sentiment|original|
+--------------------+---------+--------+
|Why am I still ho...|        0|negative|
|auch my wrist! it...|        0|negative|
|stoped broadcasti...|        0|negative|
|@Iyarchuleta I'm ...|        4|positive|
|Oh My Word. It sl...|        4|positive|
|@leemtwittin welc...|        4|positive|
|in office...learn...|        0|negative|
|@djdsf really? oh...|        0|negative|
|    Watching up  lol|        4|positive|
|@jakebells I don'...|        0|negative|
|After waiting nea...|        4|positive|
|@ep0pe86 u suppos...|        0|negative|
|Watching 7 Pounds...|        0|negative|
|lost my camera la...|        0|negative|
|@LVM5 morning - n...|        0|negative|
|I hope I don't sh...|        0|negative|
|Oh god help me!!!...|        0|negative|
|@snarkattack kick...|        4|positive|
|@BlueSpirit3 i di...|        0|negative|
|@theboatissinkin ...|        0|negative|
+--------------------+---------+--

In [5]:
# Create a length column to be used as a future feature 
from pyspark.sql.functions import length
# data = df.withColumn('length', length(df['tweet']))
data = df.withColumn('length', length(df['text']))
data.show()

+--------------------+---------+--------+------+
|                text|sentiment|original|length|
+--------------------+---------+--------+------+
|Why am I still ho...|        0|negative|    57|
|auch my wrist! it...|        0|negative|    37|
|stoped broadcasti...|        0|negative|    94|
|@Iyarchuleta I'm ...|        4|positive|    56|
|Oh My Word. It sl...|        4|positive|   106|
|@leemtwittin welc...|        4|positive|    39|
|in office...learn...|        0|negative|    30|
|@djdsf really? oh...|        0|negative|   138|
|    Watching up  lol|        4|positive|    16|
|@jakebells I don'...|        0|negative|    24|
|After waiting nea...|        4|positive|   114|
|@ep0pe86 u suppos...|        0|negative|    39|
|Watching 7 Pounds...|        0|negative|    29|
|lost my camera la...|        0|negative|    61|
|@LVM5 morning - n...|        0|negative|   127|
|I hope I don't sh...|        0|negative|    48|
|Oh god help me!!!...|        0|negative|    79|
|@snarkattack kick..

In [6]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

# Create all the features to the data set
# pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
pos_neg_to_num = StringIndexer(inputCol='original',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
# tokenizer = Tokenizer(inputCol="tweet", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [8]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num,tokenizer, stopremove, hashingTF, idf, clean_up])

In [9]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data)
cleaned = cleaner.transform(data)

In [10]:
cleaned.show()

+--------------------+---------+--------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|sentiment|original|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+--------------------+---------+--------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|Why am I still ho...|        0|negative|    57|  0.0|[why, am, i, stil...|[still, hopeful, ...|(262144,[36200,81...|(262144,[36200,81...|(262145,[36200,81...|
|auch my wrist! it...|        0|negative|    37|  0.0|[auch, my, wrist!...|[auch, wrist!, hu...|(262144,[8679,265...|(262144,[8679,265...|(262145,[8679,265...|
|stoped broadcasti...|        0|negative|    94|  0.0|[stoped, broadcas...|[stoped, broadcas...|(262144,[65844,73...|(262144,[65844,73...|(262145,[65844,73...|
|@Iyarchuleta I'm ...|        4|positive

In [11]:
# Show label of ham spame and resulting features
cleaned = cleaned.select(['label', 'features'])
cleaned.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[36200,81...|
|  0.0|(262145,[8679,265...|
|  0.0|(262145,[65844,73...|
|  1.0|(262145,[62166,11...|
|  1.0|(262145,[105551,1...|
|  1.0|(262145,[30448,81...|
|  0.0|(262145,[135105,1...|
|  0.0|(262145,[5381,912...|
|  1.0|(262145,[31950,63...|
|  0.0|(262145,[568,1409...|
|  1.0|(262145,[4054,326...|
|  0.0|(262145,[92424,99...|
|  0.0|(262145,[63139,77...|
|  0.0|(262145,[5381,191...|
|  0.0|(262145,[10562,11...|
|  0.0|(262145,[34343,68...|
|  0.0|(262145,[57304,88...|
|  1.0|(262145,[47767,52...|
|  0.0|(262145,[70376,91...|
|  0.0|(262145,[53261,64...|
+-----+--------------------+
only showing top 20 rows



In [12]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])


In [13]:
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [14]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(262145,[14,925,2...|[-953.91754589477...|[0.99999893448254...|       0.0|
|  0.0|(262145,[14,2437,...|[-792.39284337774...|[1.95020127735542...|       1.0|
|  0.0|(262145,[14,4200,...|[-615.56084582777...|[0.08890824553060...|       1.0|
|  0.0|(262145,[14,5381,...|[-910.17245083104...|[1.0,1.1113091698...|       0.0|
|  0.0|(262145,[14,6981,...|[-1270.0105351853...|[1.67904095239364...|       1.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [15]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print(f"Accuracy of model at predicting reviews was: {acc}")

Accuracy of model at predicting reviews was: 0.630461761049206
