<a href="https://colab.research.google.com/github/laurahallaman1/final_project/blob/master/reviews.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [0]:
 # Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()

In [0]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://raw.githubusercontent.com/laurahallaman1/final_project/master/Reviewer_Combined_Amazon_Consumer_Reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("Reviewer_Combined_Amazon_Consumer_Reviews.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------------------+--------------------+--------------------+--------------------+-----------+-----------+------+--------------------+-------------------+--------------------+----------+--------+
|                  id|                name|               asins|   primaryCategories|doRecommend| numHelpful|rating|                text|              title|            username|      _c10|reviewer|
+--------------------+--------------------+--------------------+--------------------+-----------+-----------+------+--------------------+-------------------+--------------------+----------+--------+
|AVpgdkC8ilAPnD_xsvyi|Fire Tablet, 7 Di...|          B018Y22BI4|         Electronics|       TRUE|          0|     3|Serves the purpos...|         Good price|                 712|      null|     Bot|
|AVqkIdZiv8e3D1O-leaJ|Fire Tablet with ...|          B018Y224PY|         Electronics|       TRUE|          0|     3|Serves the purpos...|         Good price|                 712|      null|     Bot|
|AVpj

In [0]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = df.withColumn('length', length(df['text']))
data_df.show()

+--------------------+--------------------+--------------------+--------------------+-----------+-----------+------+--------------------+-------------------+--------------------+----------+--------+------+
|                  id|                name|               asins|   primaryCategories|doRecommend| numHelpful|rating|                text|              title|            username|      _c10|reviewer|length|
+--------------------+--------------------+--------------------+--------------------+-----------+-----------+------+--------------------+-------------------+--------------------+----------+--------+------+
|AVpgdkC8ilAPnD_xsvyi|Fire Tablet, 7 Di...|          B018Y22BI4|         Electronics|       TRUE|          0|     3|Serves the purpos...|         Good price|                 712|      null|     Bot|    59|
|AVqkIdZiv8e3D1O-leaJ|Fire Tablet with ...|          B018Y224PY|         Electronics|       TRUE|          0|     3|Serves the purpos...|         Good price|                 71

In [0]:
 from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='rating',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [0]:
 from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [0]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [0]:
 # Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [0]:
cleaned.select(['label', 'features']).show()

+------+--------------------+
| label|            features|
+------+--------------------+
|1774.0|(262145,[16332,25...|
|1774.0|(262145,[16332,25...|
|1774.0|(262145,[16332,25...|
|7061.0|(262145,[15889,16...|
|7061.0|(262145,[15889,16...|
| 480.0|(262145,[15889,16...|
| 480.0|(262145,[15889,16...|
| 480.0|(262145,[15889,16...|
| 480.0|(262145,[15889,16...|
| 273.0|(262145,[12888,16...|
| 273.0|(262145,[12888,16...|
| 273.0|(262145,[12888,16...|
| 273.0|(262145,[12888,16...|
| 273.0|(262145,[12888,16...|
|  58.0|(262145,[212053,2...|
|3024.0|(262145,[14,2711,...|
|3024.0|(262145,[14,2711,...|
| 417.0|(262145,[15889,27...|
| 417.0|(262145,[15889,27...|
| 417.0|(262145,[15889,27...|
+------+--------------------+
only showing top 20 rows



In [0]:
test_results = predictor.transform(testing)
test_results.show(5)

+--------------------+--------------------+--------------------+-----------------+-----------+----------+------+--------------------+--------------------+-----------------+----+--------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                  id|                name|               asins|primaryCategories|doRecommend|numHelpful|rating|                text|               title|         username|_c10|reviewer|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+-----------------+-----------+----------+------+--------------------+--------------------+-----------------+----+--------+------+-----+--------------------+--------------------+--------------------+--------------------+-

In [0]:
 from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [0]:
 # Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.701945
