In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETLProject").getOrCreate()

In [0]:
from pyspark import SparkFiles
# Load in user_data.csv from S3 into a DataFrame
url = "https://s3.amazonaws.com//dataviz-curriculum/day_3/ratings_and_sentiments.csv"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("ratings_and_sentiments.csv"), inferSchema=True, sep=',', timestampFormat="mm/dd/yy")
df.show(10)

+--------------------+--------------------+-----------------+----------+----------+---------+------------+---------+--------+------------+------------+----------+------------+-------------+------------+-----------+---------+----------+-------------+----------+
|    coffee_shop_name|         review_text|           rating|num_rating|cat_rating|bool_HIGH|overall_sent|vibe_sent|tea_sent|service_sent|seating_sent|price_sent|parking_sent|location_sent|alcohol_sent|coffee_sent|food_sent|hours_sent|internet_sent|local_sent|
+--------------------+--------------------+-----------------+----------+----------+---------+------------+---------+--------+------------+------------+----------+------------+-------------+------------+-----------+---------+----------+-------------+----------+
|The Factory - Caf...|11/25/2016 1 chec...| 5.0 star rating |         5|      HIGH|        1|           4|        3|       0|           0|           0|         0|           0|            0|           1|          3|   

## Transform DataFrame to fit review_rating table

In [0]:
review_df = df.select(["review_text","num_rating"])
review_df.show()

+--------------------+----------+
|         review_text|num_rating|
+--------------------+----------+
|11/25/2016 1 chec...|         5|
|12/2/2016 Listed ...|         4|
|11/30/2016 1 chec...|         4|
|11/25/2016 Very c...|         2|
|12/3/2016 1 check...|         4|
|11/20/2016 1 chec...|         4|
|"10/27/2016 2 che...|         4|
|"11/2/2016 2 chec...|         5|
|"10/25/2016 1 che...|         3|
|11/10/2016 3 chec...|         5|
|"10/22/2016 1 che...|         4|
|11/20/2016 The st...|         3|
|11/17/2016 1 chec...|         3|
|12/5/2016 This is...|         5|
|11/13/2016 Beauti...|         5|
|11/9/2016 1 check...|         5|
|11/6/2016 Really ...|         5|
|10/25/2016 1 chec...|         4|
|10/15/2016 1 chec...|         4|
|12/1/2016 So much...|         4|
+--------------------+----------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import regexp_extract, length
review_df = df.withColumn("date", regexp_extract("review_text", "\d+/\d+/\d+", 0))\
      .withColumn("review_text", regexp_extract("review_text", "\d+/\d+/\d+(?:\s)(.*)", 1))\
      .withColumnRenamed("num_rating", "label")\
      .select(["label", "date", "review_text"])
review_df = review_df.withColumn('review_length', length(review_df['review_text'])).dropna()
review_df.cache()
review_df.show()

+-----+----------+--------------------+-------------+
|label|      date|         review_text|review_length|
+-----+----------+--------------------+-------------+
|    5|11/25/2016|1 check-in Love l...|          542|
|    4| 12/2/2016|Listed in Date Ni...|          279|
|    4|11/30/2016|1 check-in Listed...|         1240|
|    2|11/25/2016|Very cool vibe! G...|          364|
|    4| 12/3/2016|1 check-in They a...|          629|
|    4|11/20/2016|1 check-in Very c...|          999|
|    4|10/27/2016|2 check-ins Liste...|         1326|
|    5| 11/2/2016|2 check-ins Love ...|         1780|
|    3|10/25/2016|1 check-in Ok let...|         1805|
|    5|11/10/2016|3 check-ins This ...|          725|
|    4|10/22/2016|1 check-in Listed...|         1669|
|    3|11/20/2016|The store has A+ ...|          509|
|    3|11/17/2016|1 check-in Listed...|          835|
|    5| 12/5/2016|This is such a cu...|          152|
|    5|11/13/2016|Beautiful eccentr...|          378|
|    5| 11/9/2016|1 check-in

## Create Data Pipeline

In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
# Create all the features to the data set
tokenizer = Tokenizer(inputCol="review_text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'review_length'], outputCol='features')

In [0]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[tokenizer, stopremove, hashingTF, idf, clean_up])

## Transform DataFrame

In [0]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(review_df)
cleaned = cleaner.transform(review_df)

In [0]:
# Show label of ham spame and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    5|(262145,[9639,991...|
|    4|(262145,[512,1588...|
|    4|(262145,[3578,963...|
|    2|(262145,[9639,157...|
|    4|(262145,[3294,736...|
|    4|(262145,[14,8443,...|
|    4|(262145,[14,604,3...|
|    5|(262145,[14,4543,...|
|    3|(262145,[3890,392...|
|    5|(262145,[991,2437...|
|    4|(262145,[14,326,3...|
|    3|(262145,[6922,736...|
|    3|(262145,[6922,963...|
|    5|(262145,[4081,158...|
|    5|(262145,[1076,199...|
|    5|(262145,[14,329,1...|
|    5|(262145,[14,1998,...|
|    4|(262145,[14,5281,...|
|    4|(262145,[7388,963...|
|    4|(262145,[9639,158...|
+-----+--------------------+
only showing top 20 rows



## Run NaiveBayes

In [0]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [0]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+-----+---------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|     date|         review_text|review_length|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+-----+---------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    1|1/10/2015|1 check-in I just...|         1444|[1, check-in, i, ...|[1, check-in, edi...|(262144,[14,2326,...|(262144,[14,2326,...|(262145,[14,2326,...|[-8530.8523111699...|[5.18392012015807...|       4.0|
|    1|1/14/2016|Worst brownie I h...|          103|[worst, brownie, ...|[worst, brownie, ...|(262144,[9639,244...|(262144,[9639,244...|(262145,[9639,244...

## Predict accuracy of the model

In [0]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.141092
