In [2]:
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf,col
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

import random
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [3]:
# Initialize Spark session object

def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark Naive Bayes CountVectorizer") \
        .getOrCreate()
    return spark
spark = init_spark()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/27 22:26:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/27 22:26:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
#Read lemmatized dataset created by WallStreetBets-CreateLemmas.ipynb 
data = spark.read.csv("../data/lemma.csv", header=True)

#id,label,lemmas
#ks1tzw,1,all|right|artist|.....
function_array = udf(lambda r: r.split("|"), ArrayType(StringType()))
function_toNumerical = udf(lambda r: int(r), IntegerType())
text_lemmas = data.withColumn('finished_lemmas', function_array('text')).drop('text').withColumn('label', function_toNumerical('label'))
print("Number of rows: ",text_lemmas.count())

Number of rows:  14827


In [5]:
# Get the Corpus.
# Removing stop words from the text lemmas. 

remover = StopWordsRemover(inputCol="finished_lemmas", outputCol="text")
filtered_df = remover.transform(text_lemmas)

In [6]:
# Create Document-Term Matrix by vectorizing the filtered text.
# - returns the features column: 
# (total nb of words, indices of each word in total vocab, count of each word)

to_vectorize = filtered_df.select('id', 'label', 'text')
cv = CountVectorizer(inputCol="text", outputCol="features")
model_vec = cv.fit(to_vectorize)
result_vec = model_vec.transform(to_vectorize)
print("Total count of vocabulary:", len(model_vec.vocabulary))
selectedData = result_vec.select('id', 'label','features', 'text')

                                                                                

Total count of vocabulary: 53067


In [7]:
"""
Define TruePositive, FalsePositive and FalseNegative
x = prediction, y = label
"""
TP = udf(lambda x,y: int(x==1 and y==1))
FP = udf(lambda x,y: int(x==1 and y==0))
FN = udf(lambda x,y: int(x==0 and y==1))

In [8]:
# Naive-Bayes following from CountVectorizer

def NAIVEBAYES_CV(smooth=1, model_type="multinomial"): 
  # separating train/test data
  training_zero, test_zero = selectedData.where(selectedData.label == 0).randomSplit([0.7, 0.3])
  training_one, test_one = selectedData.where(selectedData.label == 1).randomSplit([0.7, 0.3])

  training = training_zero.union(training_one)
  test = test_zero.union(test_one)

  # create trainer with parameters then train
  # smoothing: smooth probabilities of 0 to the input
  nb = NaiveBayes(smoothing=smooth, modelType=model_type)
  model_NB = nb.fit(training)

  # display on test set: appends a prediction column
  predictions = model_NB.transform(test)

  # diagnostic testing
  prela_df = predictions.select("prediction","label")
  prela_df=prela_df.withColumn("TP", TP(prela_df.prediction,prela_df.label))
  prela_df=prela_df.withColumn("FP", FP(prela_df.prediction,prela_df.label))
  prela_df=prela_df.withColumn("FN", FN(prela_df.prediction,prela_df.label))

  TP_ = prela_df.where(prela_df.TP==1).count()
  FP_ = prela_df.where(prela_df.FP==1).count()
  FN_ = prela_df.where(prela_df.FN==1).count()

  precision = TP_/(TP_+FP_)
  recall = TP_/(TP_+FN_)
  F1 = 2*(precision*recall)/(precision+recall)

  # compute accuracy of on test set: compares labelCol and predictionCol
  evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
  accuracy = evaluator.evaluate(predictions)

  # return test results and model object
  return (accuracy,precision,recall,F1,model_NB)

In [9]:
# Examples

acc,precision,recall,F1,modelNB = NAIVEBAYES_CV(0.2684835187532758,"complement")
print("Accuracy: ",acc)
print("Precision: ",precision)
print("Recall: ",recall)
print("F1 Score: ",F1)
print()
acc,precision,recall,F1,modelNB2 = NAIVEBAYES_CV(0.2684835187532758,"multinomial")
print("Accuracy: ",acc)
print("Precision: ",precision)
print("Recall: ",recall)
print("F1 Score: ",F1)

                                                                                

22/08/27 22:26:41 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/08/27 22:26:41 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/08/27 22:26:41 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
22/08/27 22:26:42 WARN DAGScheduler: Broadcasting large task binary with size 1519.3 KiB


                                                                                

22/08/27 22:26:47 WARN DAGScheduler: Broadcasting large task binary with size 1519.3 KiB


                                                                                

22/08/27 22:26:52 WARN DAGScheduler: Broadcasting large task binary with size 1519.3 KiB


                                                                                

22/08/27 22:26:56 WARN DAGScheduler: Broadcasting large task binary with size 1534.2 KiB


                                                                                

Accuracy:  0.6245503597122302
Precision:  0.15915414579855314
Recall:  0.6426966292134831
F1 Score:  0.2551293487957181



                                                                                

22/08/27 22:27:05 WARN DAGScheduler: Broadcasting large task binary with size 1519.4 KiB


                                                                                

22/08/27 22:27:10 WARN DAGScheduler: Broadcasting large task binary with size 1519.4 KiB


                                                                                

22/08/27 22:27:17 WARN DAGScheduler: Broadcasting large task binary with size 1519.4 KiB


                                                                                

22/08/27 22:27:22 WARN DAGScheduler: Broadcasting large task binary with size 1534.2 KiB




Accuracy:  0.7523122039251071
Precision:  0.20368663594470046
Recall:  0.4857142857142857
F1 Score:  0.28701298701298705


                                                                                

In [10]:

result = modelNB.transform(selectedData)
result.show(500)

22/08/27 22:27:27 WARN DAGScheduler: Broadcasting large task binary with size 1470.7 KiB


[Stage 36:>                                                         (0 + 1) / 1]

22/08/27 22:27:31 WARN PythonUDFRunner: Detected deadlock while completing task 0.0 in stage 36 (TID 94): Attempting to kill Python Worker


                                                                                

+--------------------+-----+--------------------+---------------------------+--------------------+--------------------+----------+
|                  id|label|            features|                       text|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+---------------------------+--------------------+--------------------+----------+
|RecognitionBasic9521|    0|(53067,[17,18,318...|       [Chart, look, goo...|[-3.2148496826725...|[0.04016137031976...|       1.0|
|            whttevrr|    0|(53067,[5,12,19,2...|       [Low, float, beco...|[-2.7207162770537...|[0.99972796538045...|       0.0|
|      CuriousDev1012|    0|(53067,[1,5,8,9,1...|       [Hey, Ive, code, ...|[-0.1816653146671...|[0.83388038125061...|       0.0|
|         Marketspike|    0|(53067,[1,18,20,2...|       [Solar, Integrate...|[-10.887806680468...|[1.86846788927847...|       1.0|
|    bananahammock699|    0|(53067,[2,3,5,8,1...|       [Positive, earnin...|[-1.89

In [11]:
result2 = modelNB2.transform(selectedData)
result2.show(500)

22/08/27 22:27:32 WARN DAGScheduler: Broadcasting large task binary with size 1470.7 KiB


[Stage 37:>                                                         (0 + 1) / 1]

22/08/27 22:27:36 WARN PythonUDFRunner: Detected deadlock while completing task 0.0 in stage 37 (TID 95): Attempting to kill Python Worker


                                                                                

+--------------------+-----+--------------------+---------------------------+--------------------+--------------------+----------+
|                  id|label|            features|                       text|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+---------------------------+--------------------+--------------------+----------+
|RecognitionBasic9521|    0|(53067,[17,18,318...|       [Chart, look, goo...|[-62.845355487126...|[0.54731048683157...|       0.0|
|            whttevrr|    0|(53067,[5,12,19,2...|       [Low, float, beco...|[-321.53817035756...|[0.99993102519642...|       0.0|
|      CuriousDev1012|    0|(53067,[1,5,8,9,1...|       [Hey, Ive, code, ...|[-636.15795384095...|[0.99884581475593...|       0.0|
|         Marketspike|    0|(53067,[1,18,20,2...|       [Solar, Integrate...|[-541.04001021023...|[4.21020659563539...|       1.0|
|    bananahammock699|    0|(53067,[2,3,5,8,1...|       [Positive, earnin...|[-452.

In [12]:
# convert to pandas dataframe
result = result.toPandas()

# save to csv
result.to_csv('../data/naive_bayes.csv', index=False)

22/08/27 22:27:38 WARN DAGScheduler: Broadcasting large task binary with size 1479.1 KiB


                                                                                

In [14]:
# convert to pandas dataframe
result2 = result2.toPandas()

# save to csv
result2.to_csv('../data/naive_bayes2.csv', index=False)

22/08/27 22:30:06 WARN DAGScheduler: Broadcasting large task binary with size 1479.1 KiB


                                                                                

In [15]:

# '''
# Iteration tests on Naive-Bayes

# iter_total: iterations for different smoothing nb
# iter_each: iterations for the same smoothing nb
# '''
# import statistics


# extract_method = "CountVectorizer"
# iter_each = 10
# iter_total = 50
# m_types = ["complement", "multinomial"]
# accs = []
# f1s = []
# means = []
# for model_type in m_types:
#   for k in range(iter_total):
#     accuracies = []
#     smoothing = random.uniform(0.01, 0.8)
#     for i in range(iter_each):
#       acc,precision,recall,F1,modelNB = NAIVEBAYES_CV(smoothing, model_type)
#       accs.append(acc)
#       f1s.append(F1)
#     mean_acc = statistics.mean(accs)
#     mean_f1 = statistics.mean(f1s)
#     print("=> Mean_acc: ", mean_acc," => Mean_f1: ",mean_f1, "- Smoothing:", smoothing, "- Model:", model_type)
#     means.append((mean_acc,mean_f1, smoothing, model_type, extract_method))
    