In [33]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnan, when, count, col, udf

In [2]:
DATA_PATH = "/home/ds/notebooks/reviews_Cell_Phones_and_Accessories_5.json.gz"
APP_NAME = "Amazon Reviews Random Forest"
SPARK_URL = "local[*]"


In [3]:
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()
df = spark.read.options(inferschema = "true").json(DATA_PATH)

In [4]:
df.show(5)

+----------+-------+-------+--------------------+-----------+--------------+----------------+--------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|    reviewerName|             summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+----------------+--------------------+--------------+
|120401325X| [0, 0]|    4.0|They look good an...|05 21, 2014|A30TL5EWN6DFXT|       christina|          Looks Good|    1400630400|
|120401325X| [0, 0]|    5.0|These stickers wo...|01 14, 2014| ASY55RVNIL0UD|        emily l.|Really great prod...|    1389657600|
|120401325X| [0, 0]|    5.0|These are awesome...|06 26, 2014|A2TMXE2AFO7ONB|           Erica|      LOVE LOVE LOVE|    1403740800|
|120401325X| [4, 4]|    4.0|Item arrived in g...|10 21, 2013| AWJ0WZQYMYFQ4|              JM|               Cute!|    1382313600|
|120401325X| [2, 3]|    5.0|awesome! stays on...| 02 3, 2013| ATX7CZYFXI1KW|patrice m rogo

In [5]:
df.columns

['asin',
 'helpful',
 'overall',
 'reviewText',
 'reviewTime',
 'reviewerID',
 'reviewerName',
 'summary',
 'unixReviewTime']

In [6]:
print(f"Dataset shape is {df.count():d} rows by {len(df.columns):d} columns.")

Dataset shape is 194439 rows by 9 columns.


In [7]:
df = df.drop("helpful")


In [8]:
print(f"Dataset shape is {df.count():d} rows by {len(df.columns):d} columns.")

Dataset shape is 194439 rows by 8 columns.


In [9]:
from pyspark.sql import DataFrameNaFunctions as na


In [10]:
null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
                         for c in df.columns]).toPandas().to_dict(orient='records')

print(f"There are {sum(null_counts[0].values()):d} null values in the dataset.")

There are 3519 null values in the dataset.


In [11]:
df = df.na.drop()

In [12]:
print(f"Dataset shape is {df.count():d} rows by {len(df.columns):d} columns.")

Dataset shape is 190920 rows by 8 columns.


In [13]:
null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
                         for c in df.columns]).toPandas().to_dict(orient='records')

print(f"There are {sum(null_counts[0].values()):d} null values in the dataset.")

There are 0 null values in the dataset.


In [14]:

from pyspark.ml.feature import Tokenizer, Word2Vec

tokenizer = Tokenizer(inputCol="reviewText", outputCol="tokenized_text").transform(df)

word2Vec = Word2Vec(vectorSize=300, seed=42, inputCol="tokenized_text", outputCol="w2v_vector").fit(tokenizer)

w2vdf=word2Vec.transform(tokenizer)

In [15]:
w2vdf.select("w2v_vector").show()

+--------------------+
|          w2v_vector|
+--------------------+
|[0.04338343018615...|
|[0.03020610212115...|
|[0.00380904941395...|
|[0.05073430995096...|
|[0.01278761538560...|
|[-0.0293635625760...|
|[0.05099961345853...|
|[-0.0138569975737...|
|[0.00583547643724...|
|[0.00207917444141...|
|[0.04824984188945...|
|[-0.0167498880321...|
|[0.05679807888130...|
|[0.00722917910534...|
|[0.02919429005123...|
|[-0.0121844317599...|
|[0.02507152950958...|
|[0.04784079593734...|
|[0.00537914987288...|
|[-0.0309728097968...|
+--------------------+
only showing top 20 rows



In [16]:
w2vdf = w2vdf.withColumn("sentiment", when(col("overall") >= 4, 1).otherwise(0))



In [17]:
w2vdf.select("reviewText", "sentiment").show(10)

+--------------------+---------+
|          reviewText|sentiment|
+--------------------+---------+
|They look good an...|        1|
|These stickers wo...|        1|
|These are awesome...|        1|
|Item arrived in g...|        1|
|awesome! stays on...|        1|
|These make using ...|        0|
|Came just as desc...|        1|
|it worked for the...|        0|
|Good case, solid ...|        1|
|This is a fantast...|        1|
+--------------------+---------+
only showing top 10 rows



In [18]:
w2vdf.groupBy("sentiment").count().show()

+---------+------+
|sentiment| count|
+---------+------+
|        1|145878|
|        0| 45042|
+---------+------+



In [19]:
45042 / (145878 + 45042)

0.2359208045254557

In [31]:

def waste_count(text):
    if "waste" in text:
        return 1
    else:
        return 0

In [34]:
waste_udf = udf(waste_count)

In [35]:
w2vdf = w2vdf.withColumn("waste", waste_udf(df["reviewText"]))



In [36]:

def cheap_count(text):
    if "cheap" in text:
        return 1
    else:
        return 0

In [37]:
cheap_udf = udf(cheap_count)

In [38]:
w2vdf = w2vdf.withColumn("cheap", cheap_udf(df["reviewText"]))



In [39]:

def refund_count(text):
    if "refund" in text:
        return 1
    else:
        return 0

In [40]:
refund_udf = udf(refund_count)

In [41]:
w2vdf = w2vdf.withColumn("refund", refund_udf(df["reviewText"]))



In [43]:
w2vdf.dtypes

[('asin', 'string'),
 ('overall', 'double'),
 ('reviewText', 'string'),
 ('reviewTime', 'string'),
 ('reviewerID', 'string'),
 ('reviewerName', 'string'),
 ('summary', 'string'),
 ('unixReviewTime', 'bigint'),
 ('tokenized_text', 'array<string>'),
 ('w2v_vector', 'vector'),
 ('sentiment', 'int'),
 ('waste', 'string'),
 ('cheap', 'string'),
 ('refund', 'string')]

In [45]:
from pyspark.sql.types import IntegerType

w2vdf = w2vdf.withColumn("waste", w2vdf["waste"].cast(IntegerType()))
w2vdf = w2vdf.withColumn("refund", w2vdf["refund"].cast(IntegerType()))
w2vdf = w2vdf.withColumn("cheap", w2vdf["cheap"].cast(IntegerType()))


In [46]:
w2vdf.dtypes

[('asin', 'string'),
 ('overall', 'double'),
 ('reviewText', 'string'),
 ('reviewTime', 'string'),
 ('reviewerID', 'string'),
 ('reviewerName', 'string'),
 ('summary', 'string'),
 ('unixReviewTime', 'bigint'),
 ('tokenized_text', 'array<string>'),
 ('w2v_vector', 'vector'),
 ('sentiment', 'int'),
 ('waste', 'int'),
 ('cheap', 'int'),
 ('refund', 'int')]

In [47]:
w2vdf = VectorAssembler(inputCols=["refund", "cheap", "waste", "w2v_vector"], outputCol="features").transform(w2vdf)

In [48]:
TRAINING_DATA_RATIO = .8
RF_NUM_TREES = 20
RF_MAX_DEPTH = 10
RF_NUM_BINS = 32

In [49]:
labelIndexer = StringIndexer(inputCol="sentiment", outputCol="indexedLabel").fit(w2vdf)

featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(w2vdf)

(trainingData, testData) = w2vdf.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=RF_NUM_TREES)

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

In [50]:
model = pipeline.fit(trainingData)

In [51]:
predictions = model.transform(testData)

In [52]:
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.236691
Accuracy = 0.763309


In [53]:

from pyspark.sql import DataFrame

predictions.crosstab("sentiment", "prediction").show()

+--------------------+-----+---+
|sentiment_prediction|  0.0|1.0|
+--------------------+-----+---+
|                   1|29257| 10|
|                   0| 9091| 93|
+--------------------+-----+---+

