In [1]:
# Install Java, Spark, and Findspark
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q -c http://www-us.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u0% [1 InRelease gpgv 21.3 kB] [Waiting for headers] [Connecting to security.ubu                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease gpgv 21.3 kB] [Waiting for headers] [Connecting to security.ubu                                                                               Hit:3 http://ppa.launchpad.net/marutter/c2d4u3.5/ubuntu bionic InRelease
0% [1 InRelease gpgv 21.3 kB] [Waiting for headers] [Connecting to security.ubu                                                                               Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [1 InRelease gpgv 21.3 kB] [4 InRelease 15.6 kB/88.7 kB 18%] [Connecting to 0% [1 InRelease gpgv 21.3 kB] [Waiting for headers

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-08-14 18:52:49--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.2’


2020-08-14 18:52:51 (1.43 MB/s) - ‘postgresql-42.2.9.jar.2’ saved [914037/914037]



In [3]:

# Start Spark session
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
# from pyspark import SparkFiles
# #url = "https://drive.google.com/file/d/1TYor4zGC30w8zGPXa9l0L0bc447wVMOX/view?usp=sharing"
# from google.colab import drive
# drive.mount('/gdrive',force_remount=True)
# import glob

# url = glob.glob("/gdrive/My Drive/")
#s3://data-boogtcamp-luminda/amazon_reviews_us_Watches_v1_00.tsv
from pyspark import SparkFiles
url="https://data-boogtcamp-luminda.s3.us-east-2.amazonaws.com/amazon_reviews_us_Watches_v1_00.tsv"
spark.sparkContext.addFile(url)
user_data_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Watches_v1_00.tsv"), sep="\t", header=True, inferSchema=True)

# Show DataFrame
user_data_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...|2015-08-31 00:00:00|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...| 

In [5]:
user_data_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [6]:
print(user_data_df.count())

960872


In [7]:
review_df = user_data_df.select(["review_body","star_rating"])
review_df.show()


+--------------------+-----------+
|         review_body|star_rating|
+--------------------+-----------+
|Absolutely love t...|          5|
|I love this watch...|          5|
|           Scratches|          2|
|It works well on ...|          5|
|Beautiful watch f...|          4|
|i love this watch...|          5|
|for my wife and s...|          5|
|I was about to bu...|          1|
|Watch is perfect....|          5|
|Great quality and...|          4|
|The watch was pre...|          4|
|I bought this wat...|          1|
|It is a cheap wat...|          3|
|Heavier than i th...|          5|
|Had it for severa...|          5|
|This one is diffe...|          5|
|The watch is attr...|          4|
|      said my wife..|          5|
|Nice watch, on ti...|          5|
|Looks great and l...|          4|
+--------------------+-----------+
only showing top 20 rows



In [8]:
from pyspark.sql.functions import regexp_extract, length
review_df = user_data_df.withColumnRenamed("star_rating", "label")\
      .select(["label", "review_date", "review_body"])
review_df = review_df.withColumn('review_length', length(review_df['review_body'])).dropna()
review_df.cache()
review_df.show()

+-----+-------------------+--------------------+-------------+
|label|        review_date|         review_body|review_length|
+-----+-------------------+--------------------+-------------+
|    5|2015-08-31 00:00:00|Absolutely love t...|           80|
|    5|2015-08-31 00:00:00|I love this watch...|           44|
|    2|2015-08-31 00:00:00|           Scratches|            9|
|    5|2015-08-31 00:00:00|It works well on ...|           94|
|    4|2015-08-31 00:00:00|Beautiful watch f...|          566|
|    5|2015-08-31 00:00:00|i love this watch...|          127|
|    5|2015-08-31 00:00:00|for my wife and s...|           60|
|    1|2015-08-31 00:00:00|I was about to bu...|          244|
|    5|2015-08-31 00:00:00|Watch is perfect....|          322|
|    4|2015-08-31 00:00:00|Great quality and...|          145|
|    4|2015-08-31 00:00:00|The watch was pre...|          550|
|    1|2015-08-31 00:00:00|I bought this wat...|          446|
|    3|2015-08-31 00:00:00|It is a cheap wat...|       

In [9]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
# Create all the features to the data set
tokenizer = Tokenizer(inputCol="review_body", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [10]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'review_length'], outputCol='features')

In [11]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[tokenizer, stopremove, hashingTF, idf, clean_up])




In [12]:
cleaner = data_prep_pipeline.fit(review_df)
cleaned = cleaner.transform(review_df)

In [13]:
# Show label of ham spame and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    5|(262145,[24417,35...|
|    5|(262145,[9090,244...|
|    2|(262145,[204538,2...|
|    5|(262145,[12888,24...|
|    4|(262145,[1998,963...|
|    5|(262145,[9639,163...|
|    5|(262145,[16332,33...|
|    1|(262145,[15889,16...|
|    5|(262145,[12888,15...|
|    4|(262145,[14,37852...|
|    4|(262145,[14,4870,...|
|    1|(262145,[5381,963...|
|    3|(262145,[12946,15...|
|    5|(262145,[24417,14...|
|    5|(262145,[9639,163...|
|    5|(262145,[3924,844...|
|    4|(262145,[15889,16...|
|    5|(262145,[37852,59...|
|    5|(262145,[22346,10...|
|    4|(262145,[14,1846,...|
+-----+--------------------+
only showing top 20 rows



In [14]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [15]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+-----+-------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|        review_date|         review_body|review_length|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+-----+-------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    1|2014-08-18 00:00:00|    Doesn't work !!!|           16|[doesn't, work, !!!]|         [work, !!!]|(262144,[34343,15...|(262144,[34343,15...|(262145,[34343,15...|[-124.85128513382...|[0.40173777979546...|       4.0|
|    1|2014-08-18 00:00:00|I did not like it...|           69|[i, did, not, lik...|[like, it., like,...|(262144,[14,

In [16]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.078470


In [17]:

# summary of the Ratings
num_cols = ['label']
cleaned.select(num_cols).describe().show()

+-------+------------------+
|summary|             label|
+-------+------------------+
|  count|            960680|
|   mean| 4.138244784943998|
| stddev|1.2932916461526518|
|    min|                 1|
|    max|                 5|
+-------+------------------+

