In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler, Bucketizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, Word2Vec
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnan, when, count, col


## Start Spark Server

In [3]:
APP_NAME = "Amazon review sentiment analysis"
SPARK_URL = "local[*]"
TRAINING_DATA_RATIO = .75

In [4]:
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

## Process Data

### Load Data

In [5]:
path = "reviews_Home_and_Kitchen_5.json"
reviewsDF = spark.read.json(path)

In [6]:
reviewsDF_1 = reviewsDF.withColumn("total_votes", reviewsDF.helpful[1])
reviewsDF_2 = reviewsDF_1.withColumn("perc_helpful", reviewsDF_1.helpful[0]/reviewsDF_1.helpful[1])

reviewsDF_2 = reviewsDF_2.fillna(0, subset=['perc_helpful'])

In [13]:
reviewsDF_2.show(1)

+----------+-------+-------+--------------------+-----------+-------------+---------------+----------+--------------+-----------+------------+
|      asin|helpful|overall|          reviewText| reviewTime|   reviewerID|   reviewerName|   summary|unixReviewTime|total_votes|perc_helpful|
+----------+-------+-------+--------------------+-----------+-------------+---------------+----------+--------------+-----------+------------+
|0615391206| [0, 0]|    5.0|My daughter wante...|10 19, 2013|APYOBQE6M18AA|Martin Schwartz|Best Price|    1382140800|          0|         0.0|
+----------+-------+-------+--------------------+-----------+-------------+---------------+----------+--------------+-----------+------------+
only showing top 1 row



### Check Data

In [17]:
# check for null values
reviewsDF_2.select([count(when(isnan(c), c)).alias(c) for c in reviewsDF_2.columns if c not in ['helpful']]).show()

+----+-------+----------+----------+----------+------------+-------+--------------+-----------+------------+
|asin|overall|reviewText|reviewTime|reviewerID|reviewerName|summary|unixReviewTime|total_votes|perc_helpful|
+----+-------+----------+----------+----------+------------+-------+--------------+-----------+------------+
|   0|      0|         0|         0|         0|           0|      0|             0|          0|           0|
+----+-------+----------+----------+----------+------------+-------+--------------+-----------+------------+



### Create labels

In [7]:
splits = [-float("inf"), 3.5, float("inf")]
bucketizer = Bucketizer(splits=splits, inputCol="overall", outputCol="label")

# Transform original data into its bucket index.
reviewsDF_3 = bucketizer.transform(reviewsDF_2)

# label_stringIdx = StringIndexer(inputCol = "binned_overall", outputCol = "label")

### Create test and training sets

In [8]:
(trainingData, testData) = reviewsDF_3.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

## Create Modeling Pipeline with Logistic Regression

### Feature Creation

In [9]:
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")

In [58]:
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5)

# featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4)

In [16]:
word2Vec = Word2Vec(vectorSize=200, seed=42, inputCol="words", outputCol="w2v_vector")

featureIndexer = VectorIndexer(inputCol="w2v_vector", outputCol="features", maxCategories=4)

### Initialize model

In [11]:
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=100)

### Build Pipelines

#### TF-IDF & Log. Reg

In [69]:
# Build the pipeline.
tfidf_pipeline = Pipeline(stages=[tokenizer, hashtf, idf, lr])

# Train model.
model_tfidf_lr = tfidf_pipeline.fit(trainingData)

# Make predictions.
predictions_tfidf_lr = model_tfidf_lr.transform(testData)

In [70]:
accuracy = predictions_tfidf_lr.filter(predictions_tfidf_lr.label == predictions_tfidf_lr.prediction).count() / float(test_df.count())
blind_guess = predictions_tfidf_lr.filter(predictions_tfidf_lr.label == 1).count()/float(test_df.count())
print('accuracy:', accuracy, 'blind guess percentage:', blind_guess) 

accuracy: 0.8446755776513086 blind guess percentage: 0.824956571669465


This suggests that the model isn't guessing one class exclusively. 

#### Word2Vec and Log Reg

In [None]:
# Build the pipeline
w2v_pipeline = Pipeline(stages=[tokenizer, word2Vec, featureIndexer, lr])

# Train model.
model_d2v_lr = w2v_pipeline.fit(trainingData)

# Make predictions.
predictions_d2v_lr = model_d2v_lr.transform(testData)

In [None]:
accuracy = predictions_d2v_lr.filter(predictions_d2v_lr.label == predictions_d2v_lr.prediction).count() / float(test_df.count())
blind_guess = predictions_d2v_lr.filter(predictions_d2v_lr.label == 1).count()/float(test_df.count())
print('accuracy:', accuracy, 'blind guess percentage:', blind_guess) 

In [None]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_d2v_lr)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")