In [3]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.0.2'
# spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"]=f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
            Hit:2 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.39)] [Co                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [1 InRelease gpgv 242 kB] [3 InRelease 14.2 kB/88.7 kB 16%] [Connecting to s                                                                               Hit:4 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [1 InRelease gpgv 242 kB] [3 InRelease 36.9 kB/88.7 kB 42%] [Connecting to s0% [1 InRelease gpgv 242 kB] [Connecting to security.ubuntu.com (91.189.91.39)]                                                                               Hit:5 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
                                             

In [4]:
# Start Spark session
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("NaiveBayes").getOrCreate()

In [6]:
from pyspark.sql.types import StructField, StringType, StructType
columns=['label', 'text']
schema=[StructField(column_header, StringType(), True) for column_header in columns]
final=StructType(fields=schema)

In [7]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
# url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/yelp_reviews.csv"
# url='/content/training-1600000-processed-noemoticon.csv'
url='/content/sample.csv'

# url='/content/Airlines.csv'
spark.sparkContext.addFile(url)
# columns=['label', 'time', 'date', 'query', 'username', 'text']
df=spark.read.csv(SparkFiles.get("sample.csv"), sep=",", schema=final)#,) header=True)
# df = spark.read.csv(SparkFiles.get("Airlines.csv"), sep=",", header=True)#,) header=True)

# Show DataFrame
# df.show()

In [None]:
# df.limit(5).toPandas().to_csv('sample.csv')

In [8]:
# Create all the features to the data set
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer, VectorAssembler

pos_neg_to_num=StringIndexer(inputCol='target', outputCol='label')
tokenizer=Tokenizer(inputCol='text', outputCol='words')
stop=StopWordsRemover(inputCol='words', outputCol='clean')
hashingTF=HashingTF(inputCol='clean', outputCol='hash')
idf=IDF(inputCol='hash', outputCol='idf')

In [14]:

from pyspark.ml.feature import VectorAssembler
# from pyspark.ml.linalg import Vector

# Create feature vectors
# clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')
clean_up=VectorAssembler(inputCols=['idf'], outputCol='features')

In [15]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline

ml_pipeline=Pipeline(stages=[pos_neg_to_num, tokenizer, stop, hashingTF, idf, clean_up])

In [16]:
# Fit and transform the pipeline

df=df.withColumnRenamed('label', 'target')
processer=ml_pipeline.fit(df)
final_df=processer.transform(df)

In [17]:
final_df.show()

+------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|target|                text|label|               words|               clean|                hash|                 idf|            features|
+------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
| label|                text|  2.0|              [text]|              [text]|(262144,[143985],...|(262144,[143985],...|(262144,[143985],...|
|     0|@vivmondo Haha, i...|  0.0|[@vivmondo, haha,...|[@vivmondo, haha,...|(262144,[8804,210...|(262144,[8804,210...|(262144,[8804,210...|
|     0|again stuck with ...|  0.0|[again, stuck, wi...|[stuck, javascrip...|(262144,[12650,27...|(262144,[12650,27...|(262144,[12650,27...|
|     0|Exhausted...terri...|  0.0|[exhausted...terr...|[exhausted...terr...|(262144,[196946,2...|(262144,[196946,2...|(262144,[196946,2...|
|     0|@dsta

In [18]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = final_df.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [19]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|target|                text|label|               words|               clean|                hash|                 idf|            features|       rawPrediction|         probability|prediction|
+------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|     0|     &gt;( &gt;( ...|  0.0|[, , , , , &gt;(,...|[, , , , , &gt;(,...|(262144,[32055,64...|(262144,[32055,64...|(262144,[32055,64...|[-783.12830642394...|[1.0,1.0161174205...|       0.0|
|     0|     jb isnt show...|  0.0|[, , , , , jb, is...|[, , , , , jb, is...|(262144,[2284,283...|(262144,[2284,283...|(262144,[2284,283...|[-370.31788464988...|[0.99978904587422...|       0.0|
|     0|    Nobody likes ...| 

In [20]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting testing sentiment was: %f" % acc)

Accuracy of model at predicting sentiment was: 0.676430


In [22]:
# Tranform the model with the testing data
train_results = predictor.transform(training)
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(train_results)
print("Accuracy of model at predicting training sentiment was: %f" % acc)

+------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|target|                text|label|               words|               clean|                hash|                 idf|            features|       rawPrediction|         probability|prediction|
+------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|     0|                 ...|  0.0|[, , , , , , , , ...|[, , , , , , , , ...|(262144,[76764,23...|(262144,[76764,23...|(262144,[76764,23...|[-336.83801853752...|[0.99999999688240...|       0.0|
|     0|                 ...|  0.0|[, , , , , , , , ...|[, , , , , , , , ...|(262144,[23825,74...|(262144,[23825,74...|(262144,[23825,74...|[-393.67163090296...|[1.0,1.8043098800...|       0.0|
|     0|           FUCK YOU!| 

In [None]:
test_results.select(['target', 'text', 'label', 'prediction']).coalesce(1).write.csv("results.csv")