In [None]:
!pwd

In [None]:
import pandas as pd
import os
import re

import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
spark = SparkSession.builder.appName('amazon').getOrCreate()

schema = StructType([ \
    StructField("marketplace",       StringType(),    True), \
    StructField("customer_id",       StringType(),    True), \
    StructField("review_id",         StringType(),    True), \
    StructField("product_id",        StringType(),    True), \
    StructField("product_parent",    StringType(),    True), \
    StructField("product_title",     StringType(),    True), \
    StructField("product_category",  StringType(),    True), \
    StructField("star_rating",       IntegerType(),   True), \
    StructField("helpful_votes",     IntegerType(),   True), \
    StructField("total_votes",       IntegerType(),   True), \
    StructField("vine",              StringType(),    True), \
    StructField("verified_purchase", StringType(),    True), \
    StructField("review_headline",   StringType(),    True), \
    StructField("review_body",       StringType(),    True), \
    StructField("review_date",       TimestampType(), True), \
  ])

In [None]:
path = 'gs://biodata_bucket/archive-5/'

In [None]:
data = spark.read.csv(path, schema=schema, header=True, sep='\t', mode='DROPMALFORMED')

In [None]:
data.count()

In [None]:
#filter out missing review titles and bodies
data = data.filter(data.review_body.isNotNull())
data = data.filter(data.review_headline.isNotNull())
data = data.select('product_title', 'star_rating',
        'helpful_votes', 'total_votes', 'verified_purchase',
        'review_headline', 'review_body', 'product_category')
print(data.count())
print(data.columns)

In [None]:
data = data.withColumn("verified_purchase",data["verified_purchase"].cast(IntegerType()))

In [None]:
data.groupBy('product_category').count().collect()

In [None]:
categories = ['multilingual','Apparel','Automotive','Baby','Beauty','Books','Camera','Digital_Ebook_Purchase',
         'Digital_Music_Purchase','Digital_Software','Digital_Video_Download','Digital_Video_Games','Electronics',
         'Furniture','Gift_Card','Grocery','Health_Personal_Care','Major_Appliances','Mobile_Apps','Mobile_Electronics',
         'Music','Musical_Instruments','Office_Products','Outdoors','PC','Personal_Care_Appliances','Pet_Products',
         'Shoes','Software','Sports','Tools','Toys','Video_DVD','Video_Games','Video','Watches','Wireless']

data = data.filter(data.product_category.isin(categories))
data.count()

In [None]:
data.groupBy('product_category').count().collect()

In [None]:
data.show(5, truncate = False)

## Modeling

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, Word2Vec, StringIndexer, VectorAssembler, Normalizer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier

#onehotencoder_v_purch = OneHotEncoder(inputCol="verified_purchase", outputCol="ver_purch")
tokenizer_pt = Tokenizer(inputCol='product_title', outputCol='pt_token')
tokenizer_rh = Tokenizer(inputCol='review_headline', outputCol='rh_token')
tokenizer_rb = Tokenizer(inputCol='review_body', outputCol='rb_token')
tokenizer_cat = Tokenizer(inputCol='product_category', outputCol='cat_token')
remover_pt = StopWordsRemover(inputCol='pt_token', outputCol='pt_stop')
remover_rh = StopWordsRemover(inputCol='rh_token', outputCol='rh_stop')
remover_rb = StopWordsRemover(inputCol='rb_token', outputCol='rb_stop')
w2v_pt = Word2Vec(vectorSize=3, minCount=0, inputCol="pt_stop", outputCol="pt_vec")
w2v_rh = Word2Vec(vectorSize=3, minCount=0, inputCol="rh_stop", outputCol="rh_vec")
w2v_rb = Word2Vec(vectorSize=5, minCount=0, inputCol="rb_stop", outputCol="rb_vec")
w2v_cat = Word2Vec(vectorSize=1, minCount=0, inputCol="cat_token", outputCol="cat_vec")
# labeler = StringIndexer(inputCol='star_rating',outputCol='label', stringOrderType='alphabetAsc')
assembler = VectorAssembler(inputCols=['helpful_votes', 'total_votes', 'ver_purch', 'pt_vec', 'rh_vec', 'rb_vec', 'cat_vec'], outputCol='features')
normalizer = Normalizer(inputCol='features', outputCol='norm_features')


lr = LogisticRegression(featuresCol='norm_features', labelCol='star_rating')
dtc = DecisionTreeClassifier(featuresCol='norm_features', labelCol='star_rating')
rfc = RandomForestClassifier(featuresCol='norm_features',labelCol='star_rating')

# build your pipeline
pipeline = Pipeline(stages=[tokenizer_pt, tokenizer_rh, tokenizer_rb, tokenizer_cat,
                            remover_pt, remover_rh, remover_rb,
                            w2v_pt, w2v_rh, w2v_rb, w2v_cat
                            , assembler, normalizer])

In [None]:
# run your pipeline
final_data = pipeline.fit(data).transform(data).select('norm_features', 'star_rating')

In [None]:
# split your training set into 0.7/0.3 (train/test)
train_data, test_data = final_data.randomSplit([0.7,0.3])

In [None]:
# Train the models (its three models, so it might take some time)
lr_model = lr.fit(train_data)
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)

In [None]:
lr_predictions = lr_model.transform(test_data)
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="star_rating", predictionCol="prediction", metricName="accuracy")
lr_acc = acc_evaluator.evaluate(lr_predictions)
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)

In [None]:
print("Here are the results!")
print('-'*80)
print('A logistic regression classifier had an accuracy of: {0:2.2f}%'.format(lr_acc*100))
print('-'*80)
print('A single decision tree had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*80)
print('A random forest ensemble had an accuracy of: {0:2.2f}%'.format(rfc_acc*100))