In [1]:
# Ye olde basic PySpark Setup
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL basic example").master("local[*]").getOrCreate()

In [2]:
# Read in datafile
df = spark.read.json("reviews_Musical_Instruments_5.json.gz")

In [3]:
# Sanity check dataset size
df.count()

10261

In [4]:
#Set up features
from pyspark.sql.functions import udf

def _countWords(string):
    if string is None:
        return 0
    return len(string.split())

countWords = udf(_countWords)

def _avgWordLength(string):
    if string is None:
        return 0
    words = string.split()
    if len(words) == 0:
        return 0
    return sum(len(word) for word in words) / len(words)
            
avgWordLength = udf(_avgWordLength)
pctUpper = udf(lambda x:0 if x is None or len(x)==0 else sum(1 for y in x if y.isupper())/len(x))
strLen = udf(lambda x:0 if x is None else len(x))

helpfulPct = udf(lambda x:None if x[1]==0 else x[0]/x[1])

df = df.withColumn('reviewLen', countWords(df.reviewText))
df = df.withColumn('reviewWordAvg', avgWordLength(df.reviewText))
df = df.withColumn('pctUpper', pctUpper(df.reviewText))
df = df.withColumn('nameLen', strLen(df.reviewerName))
df = df.withColumn('helpfulPct', helpfulPct(df.helpful))


In [5]:
#If the review has been reacted to, try to predict the % 'helpful' reactions

#Build our dataset, filtering out reviews with no reactions
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

dataset = df.rdd.filter(lambda row:row['helpfulPct'] is not None)
dataset = dataset.map(lambda row: LabeledPoint(row['helpfulPct'],
                      Vectors.dense(row['reviewLen'], 
                                    row['reviewWordAvg'], 
                                    row['pctUpper'],
                                    row['helpful'][1],
                                    row['nameLen'], 
                                    row['overall'])))

print("Data Point Examples:")
print(dataset.take(2))

Data Point Examples:
[LabeledPoint(0.9285714285714286, [104.0,4.240384615384615,0.016544117647058824,14.0,4.0,5.0]), LabeledPoint(1.0, [77.0,4.675324675324675,0.009174311926605505,1.0,29.0,5.0])]


In [6]:
print("Dataset Size: " + str(dataset.count()))

#Create training and test sets
TRAINING_DATA_RATIO = 0.67
RANDOM_SEED = 0xdeadface
splits = [TRAINING_DATA_RATIO, 1.0 - TRAINING_DATA_RATIO]
training_data, test_data = dataset.randomSplit(splits, RANDOM_SEED)

print("Training Data Size: " + str(training_data.count()))
print("Test Data Size: " + str(test_data.count()))

Dataset Size: 3465
Training Data Size: 2345
Test Data Size: 1120


In [7]:
#Train Model

from pyspark.mllib.tree import RandomForest
from time import *

RF_NUM_TREES=3
RF_MAX_DEPTH=4
RF_MAX_BINS=8

start_time = time()

model = RandomForest.trainRegressor(training_data, categoricalFeaturesInfo={}, \
    numTrees=RF_NUM_TREES, featureSubsetStrategy="auto", impurity="variance", \
    maxDepth=RF_MAX_DEPTH, maxBins=RF_MAX_BINS, seed=RANDOM_SEED)

end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)


Time to train model: 2.083 seconds


In [8]:
#Get Baseline
print(training_data.take(2))
avgHelpfulPct = training_data.map(lambda x:x.label).mean()
print("Average % Helpful Reaction: " + str(avgHelpfulPct))
baselineMSE = test_data.map(lambda x: (x.label - avgHelpfulPct) * (x.label - avgHelpfulPct)).sum() /\
    float(test_data.count())
print("Baseline Mean Squared Error: " + str(baselineMSE))

#Compare our model's performance with the baseline
predictions = model.predict(test_data.map(lambda x: x.features))
labelsAndPredictions = test_data.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
    float(test_data.count())
print('Test Mean Squared Error = ' + str(testMSE))
print()
print()
#Check out the model that was learned
print('Learned regression forest model:')
print(model.toDebugString())

[LabeledPoint(1.0, [77.0,4.675324675324675,0.009174311926605505,1.0,29.0,5.0]), LabeledPoint(1.0, [190.0,4.663157894736842,0.02137546468401487,6.0,27.0,5.0])]
Average % Helpful Reaction: 0.7831364052428323
Baseline Mean Squared Error: 0.12080010319567204
Test Mean Squared Error = 0.10893388916420153


Learned regression forest model:
TreeEnsembleModel regressor with 3 trees

  Tree 0:
    If (feature 3 <= 4.0)
     If (feature 5 <= 3.0)
      If (feature 1 <= 4.133333333333334)
       If (feature 4 <= 24.0)
        Predict: 0.3694968553459119
       Else (feature 4 > 24.0)
        Predict: 0.95
      Else (feature 1 > 4.133333333333334)
       If (feature 5 <= 2.0)
        Predict: 0.47756410256410253
       Else (feature 5 > 2.0)
        Predict: 0.6576576576576577
     Else (feature 5 > 3.0)
      If (feature 1 <= 4.133333333333334)
       If (feature 4 <= 17.0)
        Predict: 0.7625
       Else (feature 4 > 17.0)
        Predict: 0.8555555555555555
      Else (feature 1 > 4.133333

In [None]:
#####################################################
# Challenge! 
#
# Can you improve the performance of the regression model?
# Perhaps consider using some of the features you 
# came up with in a previous lab exercise. Alternatively,
# you could make up some new features that you feel are
# particularly suited to this problem. Also, you could
# try adjusting the parameters of the learning algorithm.
# If you like, you could also come up with an alternative
# target that you want to try to learn to predict.
#####################################################