In [None]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.0.1'
#spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [Co                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
0% [Waiting for headers] [Waiting for headers] [Connecting to cloud.r-project.o0% [Release.gpg gpgv 697 B] [Waiting for headers] [Waiting for headers] [Connec                                                                               Get:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release [564 B]
0% [Release.g

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()

In [None]:
train_df = spark.read.csv("./train.csv", inferSchema= True, header = True)
test_df = spark.read.csv("./test.csv", inferSchema= True, header = True)

In [None]:
train_df.show()

+---+-------+--------+--------------------+------+
| id|keyword|location|                text|target|
+---+-------+--------+--------------------+------+
|  1|   null|    null|Our Deeds are the...|     1|
|  4|   null|    null|Forest fire near ...|     1|
|  5|   null|    null|All residents ask...|     1|
|  6|   null|    null|13,000 people rec...|     1|
|  7|   null|    null|Just got sent thi...|     1|
|  8|   null|    null|#RockyFire Update...|     1|
| 10|   null|    null|#flood #disaster ...|     1|
| 13|   null|    null|I'm on top of the...|     1|
| 14|   null|    null|There's an emerge...|     1|
| 15|   null|    null|I'm afraid that t...|     1|
| 16|   null|    null|Three people died...|     1|
| 17|   null|    null|Haha South Tampa ...|     1|
| 18|   null|    null|#raining #floodin...|     1|
| 19|   null|    null|#Flood in Bago My...|     1|
| 20|   null|    null|Damage to school ...|     1|
| 23|   null|    null|      What's up man?|     0|
| 24|   null|    null|       I 

In [None]:
train_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- keyword: string (nullable = true)
 |-- location: string (nullable = true)
 |-- text: string (nullable = true)
 |-- target: integer (nullable = true)



In [None]:
train_df.describe()

DataFrame[summary: string, id: string, keyword: string, location: string, text: string, target: string]

In [None]:
#Remove extraneous columns

trainer_df = train_df.select("id", "text", "target")

trainer_df.show()

+---+--------------------+------+
| id|                text|target|
+---+--------------------+------+
|  1|Our Deeds are the...|     1|
|  4|Forest fire near ...|     1|
|  5|All residents ask...|     1|
|  6|13,000 people rec...|     1|
|  7|Just got sent thi...|     1|
|  8|#RockyFire Update...|     1|
| 10|#flood #disaster ...|     1|
| 13|I'm on top of the...|     1|
| 14|There's an emerge...|     1|
| 15|I'm afraid that t...|     1|
| 16|Three people died...|     1|
| 17|Haha South Tampa ...|     1|
| 18|#raining #floodin...|     1|
| 19|#Flood in Bago My...|     1|
| 20|Damage to school ...|     1|
| 23|      What's up man?|     0|
| 24|       I love fruits|     0|
| 25|    Summer is lovely|     0|
| 26|   My car is so fast|     0|
| 28|What a goooooooaa...|     0|
+---+--------------------+------+
only showing top 20 rows



In [None]:
trainer_df = trainer_df.dropna()

In [None]:
tester_df = test_df.dropna()

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
#pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token'], outputCol='features')

In [None]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[tokenizer, stopremove, hashingTF, idf, clean_up])

In [None]:
# Fit and transform the pipeline
cleaner_train = data_prep_pipeline.fit(trainer_df)
cleaned_train = cleaner.transform(trainer_df)

In [None]:
# Fit and transform the pipeline
cleaner_test = data_prep_pipeline.fit(tester_df)
cleaned_test = cleaner.transform(tester_df)

In [None]:
from pyspark.sql.functions import col

cleaned_train = cleaned_train.withColumn("label",col('target'))
cleaned_test = cleaned_train.withColumn("label",col('target'))

In [None]:
# Show label and resulting features
cleaned_train.show()

+---+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
| id|                text|target|          token_text|         stop_tokens|          hash_token|           idf_token|            features|label|
+---+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|  1|Our Deeds are the...|     1|[our, deeds, are,...|[deeds, reason, #...|(262144,[24370,35...|(262144,[24370,35...|(262144,[24370,35...|    1|
|  4|Forest fire near ...|     1|[forest, fire, ne...|[forest, fire, ne...|(262144,[55310,72...|(262144,[55310,72...|(262144,[55310,72...|    1|
|  5|All residents ask...|     1|[all, residents, ...|[residents, asked...|(262144,[38983,70...|(262144,[38983,70...|(262144,[38983,70...|    1|
|  6|13,000 people rec...|     1|[13,000, people, ...|[13,000, people, ...|(262144,[38983,11...|(262144,[38983,11...|(262144,[3898

In [None]:
cleaned_test.show()

+---+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
| id|                text|target|          token_text|         stop_tokens|          hash_token|           idf_token|            features|label|
+---+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|  1|Our Deeds are the...|     1|[our, deeds, are,...|[deeds, reason, #...|(262144,[24370,35...|(262144,[24370,35...|(262144,[24370,35...|    1|
|  4|Forest fire near ...|     1|[forest, fire, ne...|[forest, fire, ne...|(262144,[55310,72...|(262144,[55310,72...|(262144,[55310,72...|    1|
|  5|All residents ask...|     1|[all, residents, ...|[residents, asked...|(262144,[38983,70...|(262144,[38983,70...|(262144,[38983,70...|    1|
|  6|13,000 people rec...|     1|[13,000, people, ...|[13,000, people, ...|(262144,[38983,11...|(262144,[38983,11...|(262144,[3898

In [None]:
from pyspark.ml.classification import NaiveBayes

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(cleaned_train)

In [None]:
# Tranform the model with the testing data
test_results = predictor.transform(cleaned_test)
test_results.show(5)

+---+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
| id|                text|target|          token_text|         stop_tokens|          hash_token|           idf_token|            features|label|       rawPrediction|         probability|prediction|
+---+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|  1|Our Deeds are the...|     1|[our, deeds, are,...|[deeds, reason, #...|(262144,[24370,35...|(262144,[24370,35...|(262144,[24370,35...|    1|[-493.56460695090...|[2.27999902863278...|       1.0|
|  4|Forest fire near ...|     1|[forest, fire, ne...|[forest, fire, ne...|(262144,[55310,72...|(262144,[55310,72...|(262144,[55310,72...|    1|[-473.86088609548...|[5.29362798873338...|       1.0|
|  5|All r

In [47]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting disaster tweets was: %f" % acc)

Accuracy of model at predicting disaster tweets was: 0.975716
