In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.1.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [1 InRelease 14.2 kB/88.70% [Connected to archive.ubuntu.com (91.189.88.152)] [Connected to cloud.r-proj                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
         

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()

In [7]:
import pandas as pd
url = 'https://raw.githubusercontent.com/lisb020/scifi_book_game/main/BookList.csv'
pd_df = pd.read_csv(url)

pd_df.head()

Unnamed: 0.1,Unnamed: 0,Book_Title,Rating_score,Rating_votes,Book_Description,subgenre
0,0,Obsidian,4.17,236780,Starting over sucks.When we moved to West Virg...,aliens
1,1,Onyx,4.27,153429,BEING CONNECTED TO DAEMON BLACK SUCKS… Thanks ...,aliens
2,2,The 5th Wave,4.03,400600,"After the 1st wave, only darkness remains. Aft...",aliens
3,3,The Host,3.84,915026,Melanie Stryder refuses to fade away. The eart...,aliens
4,4,Opal,4.27,129006,No one is like Daemon Black.When he set out to...,aliens


In [8]:
pd_df.drop(columns="Unnamed: 0", inplace=True)
pd_df.head()

Unnamed: 0,Book_Title,Rating_score,Rating_votes,Book_Description,subgenre
0,Obsidian,4.17,236780,Starting over sucks.When we moved to West Virg...,aliens
1,Onyx,4.27,153429,BEING CONNECTED TO DAEMON BLACK SUCKS… Thanks ...,aliens
2,The 5th Wave,4.03,400600,"After the 1st wave, only darkness remains. Aft...",aliens
3,The Host,3.84,915026,Melanie Stryder refuses to fade away. The eart...,aliens
4,Opal,4.27,129006,No one is like Daemon Black.When he set out to...,aliens


In [22]:
pd_df['subgenre'].unique()

array(['aliens', 'alternate', 'apocalyptic', 'cyberpunk', 'dystopia',
       'hard', 'military', 'robots', 'space', 'steampunk', 'time'],
      dtype=object)

In [9]:
from pyspark.sql.types import *
mySchema = StructType([ StructField("Book_Title", StringType(), True)\
                       ,StructField("Rating_score", FloatType(), True)\
                       ,StructField("Rating_votes", LongType(), True)\
                       ,StructField("Book_Description", StringType(), True)\
                       ,StructField("subgenre", StringType(), True)])

In [10]:
spark_df = spark.createDataFrame(pd_df, schema=mySchema)
spark_df.show()

+--------------------+------------+------------+--------------------+--------+
|          Book_Title|Rating_score|Rating_votes|    Book_Description|subgenre|
+--------------------+------------+------------+--------------------+--------+
|            Obsidian|        4.17|      236780|Starting over suc...|  aliens|
|                Onyx|        4.27|      153429|BEING CONNECTED T...|  aliens|
|        The 5th Wave|        4.03|      400600|After the 1st wav...|  aliens|
|            The Host|        3.84|      915026|Melanie Stryder r...|  aliens|
|                Opal|        4.27|      129006|No one is like Da...|  aliens|
|              Origin|        4.35|       93979|Daemon will do an...|  aliens|
|          Opposition|        4.37|       67740|Katy knows the wo...|  aliens|
|    I Am Number Four|        3.94|      319092|Nine of us came h...|  aliens|
|    The Infinite Sea|        3.87|      123001|How do you rid th...|  aliens|
|             Shadows|        4.12|       36224|The 

In [11]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = spark_df.withColumn('length', length(spark_df['Book_Description']))
data_df.show()

+--------------------+------------+------------+--------------------+--------+------+
|          Book_Title|Rating_score|Rating_votes|    Book_Description|subgenre|length|
+--------------------+------------+------------+--------------------+--------+------+
|            Obsidian|        4.17|      236780|Starting over suc...|  aliens|   856|
|                Onyx|        4.27|      153429|BEING CONNECTED T...|  aliens|  1026|
|        The 5th Wave|        4.03|      400600|After the 1st wav...|  aliens|   724|
|            The Host|        3.84|      915026|Melanie Stryder r...|  aliens|   610|
|                Opal|        4.27|      129006|No one is like Da...|  aliens|   901|
|              Origin|        4.35|       93979|Daemon will do an...|  aliens|   884|
|          Opposition|        4.37|       67740|Katy knows the wo...|  aliens|   842|
|    I Am Number Four|        3.94|      319092|Nine of us came h...|  aliens|   736|
|    The Infinite Sea|        3.87|      123001|How do

In [13]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
sub_genre = StringIndexer(inputCol='subgenre',outputCol='label')
tokenizer = Tokenizer(inputCol="Book_Description", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [14]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [15]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[sub_genre, tokenizer, stopremove, hashingTF, idf, clean_up])

In [18]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

DataFrame[Book_Title: string, Rating_score: float, Rating_votes: bigint, Book_Description: string, subgenre: string, length: int, label: double, token_text: array<string>, stop_tokens: array<string>, hash_token: vector, idf_token: vector, features: vector]

In [19]:
# Show label and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(262145,[3524,828...|
|  1.0|(262145,[6446,912...|
|  1.0|(262145,[4174,420...|
|  1.0|(262145,[5968,233...|
|  1.0|(262145,[134,4254...|
|  1.0|(262145,[1371,232...|
|  1.0|(262145,[3440,538...|
|  1.0|(262145,[4136,193...|
|  1.0|(262145,[1371,395...|
|  1.0|(262145,[3530,538...|
|  1.0|(262145,[2437,343...|
|  1.0|(262145,[6034,912...|
|  1.0|(262145,[453,8121...|
|  1.0|(262145,[32705,38...|
|  1.0|(262145,[4136,691...|
|  1.0|(262145,[3617,390...|
|  1.0|(262145,[288,4504...|
|  1.0|(262145,[5381,552...|
|  1.0|(262145,[2705,392...|
|  1.0|(262145,[10524,11...|
+-----+--------------------+
only showing top 20 rows



In [23]:
from pyspark.sql.functions import col, asc,desc
cleaned.orderBy(col("label").asc()).select(['label', 'features']).show(truncate=False)

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [24]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [25]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+--------------------+------------+------------+--------------------+-----------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|          Book_Title|Rating_score|Rating_votes|    Book_Description|   subgenre|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------------------+------------+------------+--------------------+-----------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                  14|        3.95|       31046|Padlocked doors. ...|apocalyptic|   860|  3.0|[padlocked, doors...|[padlocked, doors...|(262144,[578,7987...|(262144,[578,7987...|(262145,[578,7987...|[-5436.7621983396...|[1.0,3.3006792634...|

In [26]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting subgenre was: %f" % acc)

Accuracy of model at predicting subgenre was: 0.508086
