In [32]:
from pyspark.sql import SparkSession

In [33]:
spark = SparkSession.builder.appName('original_NB').getOrCreate()

In [35]:
start_data = spark.read.format("csv").option("header", "true").load("Data/Happiness_tweets.csv")
start_data.show()

+--------------------+--------------------+-------+-----+
|               index|                text|Country|Class|
+--------------------+--------------------+-------+-----+
|                   0|RT @PeachWaddle: ...| Norway|Happy|
|                   1|These two with su...|   null| null|
|#family #love #ba...|              Norway|  Happy| null|
|                   2|RT @PeachWaddle: ...| Norway|Happy|
|                   3|#dotter och #mamm...| Norway|Happy|
|                   4|Thought of the da...| Norway|Happy|
|                   5|Nothing like ‚òÄÔ...| Norway|Happy|
|                   6|Today was an awes...| Norway|Happy|
|                   7|Thank you ‚ù§Ô∏è ...|   null| null|
|#solpmu #family #...|              Norway|  Happy| null|
|                   8|Dotter och mor. ‚...|   null| null|
|Daughter and Moth...|                null|   null| null|
|#familj #family @...|              Norway|  Happy| null|
|                   9|I helgen spelas s...| Norway|Happy|
|             

In [19]:
# Create a length column to be used as a future feature 
from pyspark.sql.functions import length
data = start_data.withColumn('length', length(start_data['Snippets']))
data.show()

+---+-------+--------------------+-----+------+
|_c0|Country|            Snippets|class|length|
+---+-------+--------------------+-----+------+
|  0| norway|A young indigenou...|happy|   183|
|  1| norway|Charming villages...|happy|   112|
|  2| norway|Animal advocacy g...|happy|   101|
|  3| norway|New rules to comb...|happy|   132|
|  4| norway|The politician, C...|happy|   113|
|  5| norway|A writer finds em...|happy|   128|
|  6| norway|The move signals ...|happy|   104|
|  7| norway|The herder, from ...|happy|   122|
|  8| norway|Berries clothe se...|happy|   125|
|  9| norway|A race that was c...|happy|   142|
| 10| norway|A young indigenou...|happy|   183|
| 11| norway|Charming villages...|happy|   112|
| 12| norway|Animal advocacy g...|happy|   101|
| 13| norway|New rules to comb...|happy|   132|
| 14| norway|The politician, C...|happy|   113|
| 15| norway|A writer finds em...|happy|   128|
| 16| norway|The move signals ...|happy|   104|
| 17| norway|The herder, from ...|happy|

In [20]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

# Create all the features to the data set
happy_unhappy_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="Snippets", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [21]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [22]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[happy_unhappy_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [23]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data)
cleaned = cleaner.transform(data)

In [24]:
# Show label of happy unhappy and resulting features
cleaned.select(['label', 'features']).show(truncate=False)

+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+-----+---------------------------------------------------------------------------------------

In [25]:
# Break data down into a training set and a testing set
(training, testing) = cleaned.randomSplit([0.7, 0.3])

In [26]:
from pyspark.ml.classification import NaiveBayes

# Create a Naive Bayes model and fit training data
nb = NaiveBayes(smoothing=1.0, modelType='multinomial')
unhappy_predictor = nb.fit(training)

In [27]:
# Tranform the model with the testing data
test_results = unhappy_predictor.transform(testing)
test_results.show(1)

+---+-------+--------------------+-----+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|_c0|Country|            Snippets|class|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+---+-------+--------------------+-----+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  0|denmark|This small and sc...|happy|   137|  1.0|[this, small, and...|[small, scholarly...|(262144,[17371,49...|(262144,[17371,49...|(262145,[17371,49...|[-1036.522151551,...|[3.09497230662662...|       1.0|
+---+-------+--------------------+-----+------+-----+--------------------+--------------------+--------------------+--------------------+---------------

In [28]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print(f"Accuracy of model at predicting unahhpiness was: {acc}")

Accuracy of model at predicting unahhpiness was: 0.9863780915723299


In [29]:
test_results

DataFrame[_c0: string, Country: string, Snippets: string, class: string, length: int, label: double, token_text: array<string>, stop_tokens: array<string>, hash_token: vector, idf_token: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [15]:
spark.stop()