In [4]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnan, when, count, col, monotonically_increasing_id

In [5]:
JSON_PATH = "reviews_Video_Games_5.json"
APP_NAME = "AMAZON VIDEO GAME ANALYSIS"
SPARK_URL = "local[*]"
RANDOM_SEED = 141107
TRAINING_DATA_RATIO = 0.8

In [6]:
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

In [7]:
df = spark.read.json(JSON_PATH)

In [8]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [9]:
df.select(['reviewText', 'summary', 'overall']).show()

+--------------------+--------------------+-------+
|          reviewText|             summary|overall|
+--------------------+--------------------+-------+
|Installing the ga...|Pay to unlock con...|    1.0|
|If you like rally...|     Good rally game|    4.0|
|1st shipment rece...|           Wrong key|    1.0|
|I got this versio...|awesome game, if ...|    3.0|
|I had Dirt 2 on X...|              DIRT 3|    4.0|
|Overall this is a...|Good racing game,...|    4.0|
|Loved playing Dir...|A step up from Di...|    5.0|
|I can't tell you ...|Crash 3 is correc...|    1.0|
|I initially gave ...|A great game ruin...|    4.0|
|I still haven't f...|Couldn't get this...|    2.0|
|I'm not quite fin...| Best in the series!|    5.0|
|I have been playi...|   A 5 stars winner!|    5.0|
|Dirt 3 on DVDi co...|                Cars|    5.0|
|I bought this and...|It might have bee...|    1.0|
|Crashed in Vista....|Don't waste your ...|    1.0|
|This game was a r...|Not as good as Di...|    1.0|
|In today's 

In [10]:
df2 = df.withColumn('target', when(df.overall < 3, 0).otherwise(1))
df2 = df2.withColumn('id', monotonically_increasing_id())

In [18]:
df2 = df2.select(['id', 'reviewText', 'target'])
df2 = df2.selectExpr("id as id", "reviewText as reviewText", "target as label")
df2.printSchema()

root
 |-- id: long (nullable = false)
 |-- reviewText: string (nullable = true)
 |-- label: integer (nullable = false)



In [19]:
df2.show(5)

+---+--------------------+-----+
| id|          reviewText|label|
+---+--------------------+-----+
|  0|Installing the ga...|    0|
|  1|If you like rally...|    1|
|  2|1st shipment rece...|    0|
|  3|I got this versio...|    1|
|  4|I had Dirt 2 on X...|    1|
+---+--------------------+-----+
only showing top 5 rows



In [20]:
df2.describe()

DataFrame[summary: string, id: string, reviewText: string, label: string]

In [21]:
(trainingData, valdata, testData) = df2.randomSplit([0.75, 0.05, 0.20])

In [22]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol="tf")
idf = IDF(inputCol="tf", outputCol="features", minDocFreq=5)
#label_stringIdx = StringIndexer(inputCol="target", outputCol="label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf])

pipelineFit = pipeline.fit(trainingData)
train_df = pipelineFit.transform(trainingData)
val_df = pipelineFit.transform(valdata)
train_df.show(20)

+---+--------------------+-----+--------------------+--------------------+--------------------+
| id|          reviewText|label|               words|                  tf|            features|
+---+--------------------+-----+--------------------+--------------------+--------------------+
|  0|Installing the ga...|    0|[installing, the,...|(65536,[14,680,16...|(65536,[14,680,16...|
|  1|If you like rally...|    1|[if, you, like, r...|(65536,[2410,3092...|(65536,[2410,3092...|
|  2|1st shipment rece...|    0|[1st, shipment, r...|(65536,[568,6534,...|(65536,[568,6534,...|
|  3|I got this versio...|    1|[i, got, this, ve...|(65536,[14,672,73...|(65536,[14,672,73...|
|  6|Loved playing Dir...|    1|[loved, playing, ...|(65536,[4461,4488...|(65536,[4461,4488...|
|  7|I can't tell you ...|    0|[i, can't, tell, ...|(65536,[1903,2026...|(65536,[1903,2026...|
|  9|I still haven't f...|    0|[i, still, haven'...|(65536,[3053,3149...|(65536,[3053,3149...|
| 12|Dirt 3 on DVDi co...|    1|[dirt, 3

In [23]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.7731273946108982

In [25]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(valdata.count())
accuracy

0.8601098523858565