In [44]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.1.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connected to cloud.r-pro0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:10

In [45]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()

In [46]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://data-bootcamp-x399.s3.us-east-2.amazonaws.com/Reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("Reviews.csv"), sep=",", header=True)

# Show DataFrame
df.show(truncate=False)

+---+----------+--------------+-----------------------------------+--------------------+----------------------+-----+----------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Id |ProductId |UserId    

## Transform DataFrame to fit Review-Score Table

In [47]:
df = df.select(['Score', 'Text'])

df.show()

+-----+--------------------+
|Score|                Text|
+-----+--------------------+
|    5|I have bought sev...|
|    1|"Product arrived ...|
|    4|"This is a confec...|
|    2|If you are lookin...|
|    5|Great taffy at a ...|
|    4|I got a wild hair...|
|    5|This saltwater ta...|
|    5|This taffy is so ...|
|    5|Right now I'm mos...|
|    5|This is a very he...|
|    5|I don't know if i...|
|    5|One of my boys ne...|
|    1|My cats have been...|
|    4|good flavor! thes...|
|    5|The Strawberry Tw...|
|    5|My daughter loves...|
|    2|I love eating the...|
|    5|I am very satisfi...|
|    5|Twizzlers, Strawb...|
|    5|Candy was deliver...|
+-----+--------------------+
only showing top 20 rows



In [48]:
review_df = df.withColumnRenamed('Score', 'score').withColumnRenamed('Text', 'review_text')
review_df.show()


+-----+--------------------+
|score|         review_text|
+-----+--------------------+
|    5|I have bought sev...|
|    1|"Product arrived ...|
|    4|"This is a confec...|
|    2|If you are lookin...|
|    5|Great taffy at a ...|
|    4|I got a wild hair...|
|    5|This saltwater ta...|
|    5|This taffy is so ...|
|    5|Right now I'm mos...|
|    5|This is a very he...|
|    5|I don't know if i...|
|    5|One of my boys ne...|
|    1|My cats have been...|
|    4|good flavor! thes...|
|    5|The Strawberry Tw...|
|    5|My daughter loves...|
|    2|I love eating the...|
|    5|I am very satisfi...|
|    5|Twizzlers, Strawb...|
|    5|Candy was deliver...|
+-----+--------------------+
only showing top 20 rows



In [50]:
from pyspark.sql.functions import length
review_df = review_df.withColumn('review_length', length(review_df['review_text'])).dropna()
review_df.cache()
review_df.show()

+-----+--------------------+-------------+
|score|         review_text|review_length|
+-----+--------------------+-------------+
|    5|I have bought sev...|          263|
|    1|"Product arrived ...|          194|
|    4|"This is a confec...|          386|
|    2|If you are lookin...|          219|
|    5|Great taffy at a ...|          140|
|    4|I got a wild hair...|          416|
|    5|This saltwater ta...|          304|
|    5|This taffy is so ...|          140|
|    5|Right now I'm mos...|          131|
|    5|This is a very he...|          137|
|    5|I don't know if i...|          779|
|    5|One of my boys ne...|          328|
|    1|My cats have been...|          399|
|    4|good flavor! thes...|           97|
|    5|The Strawberry Tw...|          113|
|    5|My daughter loves...|          155|
|    2|I love eating the...|          195|
|    5|I am very satisfi...|          141|
|    5|Twizzlers, Strawb...|          746|
|    5|Candy was deliver...|          145|
+-----+----

## Create Data Pipeline

In [55]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
# Create all the features to the data set
tokenizer = Tokenizer(inputCol="review_text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [56]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'review_length'], outputCol='features')

In [57]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[tokenizer, stopremove, hashingTF, idf, clean_up])

## Transform Data

In [58]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(review_df)
cleaned = cleaner.transform(review_df)

In [60]:
# Show label of ham spame and resulting features
cleaned.select(['label', 'features']).show()