In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnan, when, count, col
import pandas as pd
import numpy as np
from pyspark.sql.functions import UserDefinedFunction

In [2]:
DATA_NAME = "/home/ds/notebooks/datasets/amazonreviews/AmznInstantVideo.json"
APP_NAME = "Amazon Video Reviews"
SPARK_URL = "local[*]"
RANDOM_SEED = 141107
TRAINING_DATA_RATIO = 0.8
RF_NUM_TREES = 10
RF_MAX_DEPTH = 4
RF_NUM_BINS = 32

In [3]:
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()
sqlContext = SQLContext(spark)


In [4]:
df = sqlContext.read.json(DATA_NAME)

In [5]:
#check shape of data
print('Columns:')
print(len(df.columns))
print('Rows:')
df.count()

Columns:
9
Rows:


37121

In [6]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [7]:
df.select("reviewText", "summary").show()

+--------------------+--------------------+
|          reviewText|             summary|
+--------------------+--------------------+
|I had big expecta...|A little bit bori...|
|I highly recommen...|Excellent Grown U...|
|This one is a rea...|Way too boring fo...|
|Mysteries are int...|Robson Green is m...|
|This show always ...|Robson green and ...|
|I discovered this...|I purchased the s...|
|It beats watching...|It takes up your ...|
|There are many ep...|A reasonable way ...|
|This is the best ...|           kansas001|
|Not bad.  Didn't ...| Entertaining Comedy|
|Funny, interestin...|     Worth watching!|
|I love the variet...|comedy club quali...|
|comedy is a matte...|                  ok|
|if this had to do...|not sure who this...|
|Watched it for Ke...|           Loved it!|
|he's OK. His humo...|same routine he d...|
|some comedians ar...|           it was ok|
|I only watched th...|I Only Watched Wa...|
|Enjoyed some of t...|     Some were funny|
|All the comedians...|          

In [8]:
df.registerTempTable('reviews')

In [9]:
#Look at review stars coding
sqlContext.sql("select overall, count(overall) as reviewCount from reviews group by overall order by overall desc").show()

+-------+-----------+
|overall|reviewCount|
+-------+-----------+
|    5.0|      20888|
|    4.0|       8445|
|    3.0|       4185|
|    2.0|       1885|
|    1.0|       1718|
+-------+-----------+



In [10]:
print('Columns:')
print(len(df.columns))
print('Rows:')
df.count()

Columns:
9
Rows:


37121

In [11]:
#create function to make over 3 stars positive
udf = UserDefinedFunction (lambda x: 1 if x > 3 else 0, IntegerType())

In [12]:
#create new dataframe with binary coding for pos/neg opinions
df2 = df.withColumn("recode",udf(df.overall))

In [13]:
# Tokenize and vectorize reviewText column using tf-idf
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="reviewText", outputCol="tokenized_text").transform(df2)
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="tokenized_text", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(tokenizer)

idf = IDF(inputCol="rawFeatures", outputCol="feature")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("recode", "feature").show()


+------+--------------------+
|recode|             feature|
+------+--------------------+
|     0|(20,[0,1,3,5,6,7,...|
|     1|(20,[1,2,3,4,5,8,...|
|     0|(20,[0,1,4,5,6,7,...|
|     1|(20,[0,1,2,3,8,9,...|
|     1|(20,[0,1,2,3,4,5,...|
|     1|(20,[0,1,2,3,4,5,...|
|     0|(20,[0,3,5,6,7,8,...|
|     0|(20,[1,3,4,5,6,7,...|
|     1|(20,[0,1,2,3,4,5,...|
|     0|(20,[0,2,3,7,8,10...|
|     1|(20,[0,1,8,9,10,1...|
|     1|(20,[0,1,2,3,5,6,...|
|     0|(20,[0,1,3,7,10,1...|
|     0|(20,[0,1,2,3,5,7,...|
|     1|(20,[0,1,2,3,6,9,...|
|     0|(20,[0,1,2,3,5,9,...|
|     0|(20,[0,1,2,3,4,5,...|
|     0|(20,[0,1,2,3,4,5,...|
|     1|(20,[0,3,5,8,9,10...|
|     1|(20,[0,1,2,3,4,5,...|
+------+--------------------+
only showing top 20 rows



In [14]:
# Tokenize and vectorize summary column using tf-idf
tokenizer = Tokenizer(inputCol="summary", outputCol="tokenized_text2").transform(rescaledData)
hashingTF = HashingTF(inputCol="tokenized_text2", outputCol="rawFeatures2", numFeatures=20)
featurizedData = hashingTF.transform(tokenizer)

idf = IDF(inputCol="rawFeatures2", outputCol="feature2")
idfModel = idf.fit(featurizedData)
rescaledData2 = idfModel.transform(featurizedData)

rescaledData2.select("recode", "feature","feature2").show()

+------+--------------------+--------------------+
|recode|             feature|            feature2|
+------+--------------------+--------------------+
|     0|(20,[0,1,3,5,6,7,...|(20,[4,6,10,11,16...|
|     1|(20,[1,2,3,4,5,8,...|(20,[1,6,8,10],[1...|
|     0|(20,[0,1,4,5,6,7,...|(20,[4,11,16,19],...|
|     1|(20,[0,1,2,3,8,9,...|(20,[1,5,15,18],[...|
|     1|(20,[0,1,2,3,4,5,...|(20,[0,5,13,17,18...|
|     1|(20,[0,1,2,3,4,5,...|(20,[3,8,9,10,11,...|
|     0|(20,[0,3,5,6,7,8,...|(20,[3,8,14,15],[...|
|     0|(20,[1,3,4,5,6,7,...|(20,[0,6,8,10,15,...|
|     1|(20,[0,1,2,3,4,5,...|(20,[3],[1.444927...|
|     0|(20,[0,2,3,7,8,10...|(20,[13,19],[1.39...|
|     1|(20,[0,1,8,9,10,1...|(20,[3,12],[1.444...|
|     1|(20,[0,1,2,3,5,6,...|(20,[3,4,5,7,13,1...|
|     0|(20,[0,1,3,7,10,1...|(20,[4],[1.778752...|
|     0|(20,[0,1,2,3,5,7,...|(20,[1,13,14,16,1...|
|     1|(20,[0,1,2,3,6,9,...|(20,[11,17],[1.75...|
|     0|(20,[0,1,2,3,5,9,...|(20,[2,3,6,7,9,16...|
|     0|(20,[0,1,2,3,4,5,...|(2

In [15]:
#Put review text and summary vectors into one vector
df3 = VectorAssembler(inputCols=['feature','feature2'], outputCol="features").transform(rescaledData2)


In [16]:
df3.select("recode", "features").show()

+------+--------------------+
|recode|            features|
+------+--------------------+
|     0|(40,[0,1,3,5,6,7,...|
|     1|(40,[1,2,3,4,5,8,...|
|     0|(40,[0,1,4,5,6,7,...|
|     1|(40,[0,1,2,3,8,9,...|
|     1|(40,[0,1,2,3,4,5,...|
|     1|[2.28483993183917...|
|     0|(40,[0,3,5,6,7,8,...|
|     0|(40,[1,3,4,5,6,7,...|
|     1|(40,[0,1,2,3,4,5,...|
|     0|(40,[0,2,3,7,8,10...|
|     1|(40,[0,1,8,9,10,1...|
|     1|(40,[0,1,2,3,5,6,...|
|     0|(40,[0,1,3,7,10,1...|
|     0|(40,[0,1,2,3,5,7,...|
|     1|(40,[0,1,2,3,6,9,...|
|     0|(40,[0,1,2,3,5,9,...|
|     0|(40,[0,1,2,3,4,5,...|
|     0|(40,[0,1,2,3,4,5,...|
|     1|(40,[0,3,5,8,9,10...|
|     1|(40,[0,1,2,3,4,5,...|
+------+--------------------+
only showing top 20 rows



In [17]:
# Build the training indexers, split data (set aside 30% for testing)

labelIndexer = StringIndexer(inputCol="recode", outputCol="indexedLabel").fit(df3)

featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df3)
    
(trainingData, testData) = df3.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=RF_NUM_TREES)

# Make pipeline of indexers and RFC
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

In [18]:
# Train model  
model = pipeline.fit(trainingData)

In [19]:
# Make predictions
predictions = model.transform(testData)

In [20]:
# test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Test Error:")
print(1-accuracy)
print("Accuracy:")
print(accuracy)

Test Error:
0.20203548342731403
Accuracy:
0.797964516572686
