In [1]:
#Initialize
from pyspark import SQLContext, SparkContext, SparkConf
from pyspark.sql.functions import col
from pyspark.conf import SparkConf
from pyspark.mllib.regression import LabeledPoint
import numpy as np
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
import matplotlib
sqlContext = SQLContext(sc)


Creating SparkContext as 'sc'


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
16,,pyspark,idle,,,✔


Creating HiveContext as 'sqlContext'
SparkContext and HiveContext created. Executing user code ...


In [3]:
#Print Schema
df = sqlContext.read.json("In/reviews.json")
df.printSchema()

root
 |-- helpfuless_count: long (nullable = true)
 |-- helpfuless_score: long (nullable = true)
 |-- price: string (nullable = true)
 |-- productId: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- score: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- text: string (nullable = true)
 |-- time: string (nullable = true)
 |-- title: string (nullable = true)
 |-- userId: string (nullable = true)

In [4]:
#Show Sample of Initial Values
df1 = df.select('helpfuless_score', 'helpfuless_count','price','score')
df1.show(5)

+----------------+----------------+--------+-----+
|helpfuless_score|helpfuless_count|   price|score|
+----------------+----------------+--------+-----+
|               7|               7| unknown|  4.0|
|               0|               0|   17.99|  5.0|
|               0|               1|   17.99|  3.0|
|               7|               7| unknown|  4.0|
|               3|               4|   15.99|  5.0|
+----------------+----------------+--------+-----+
only showing top 5 rows

In [5]:
#Remove 0s and unknowns
df1 = df1[df1.helpfuless_score > 0]
df1 = df1[df1.helpfuless_count > 0]
df1 = df1[df1.price > 0]
df1 = df1[df1.score > 0]
df1.show()

+----------------+----------------+------+-----+
|helpfuless_score|helpfuless_count| price|score|
+----------------+----------------+------+-----+
|               3|               4| 15.99|  5.0|
|               8|              10| 19.40|  5.0|
|               1|               1| 19.40|  5.0|
|               1|               1| 19.40|  5.0|
|               1|               1| 19.40|  5.0|
|               4|               4| 10.26|  5.0|
|               1|               1| 10.26|  5.0|
|               7|              11| 10.95|  1.0|
|               1|               2| 10.95|  4.0|
|               1|               2| 10.95|  1.0|
|               2|               4| 10.95|  5.0|
|               5|               9| 10.95|  5.0|
|               1|               3| 10.95|  5.0|
|               1|               4| 10.95|  5.0|
|               1|               4| 10.95|  5.0|
|               4|               6| 10.95|  5.0|
|               2|               3| 10.95|  5.0|
|               1|  

In [6]:
#Check out the 'cleansed' dataset and cache (if helpful)
df1 = df1.select('score', 'price','helpfuless_count','helpfuless_score')
df1.show(5)
df1.cache()

+-----+------+----------------+----------------+
|score| price|helpfuless_count|helpfuless_score|
+-----+------+----------------+----------------+
|  5.0| 15.99|               4|               3|
|  5.0| 19.40|              10|               8|
|  5.0| 19.40|               1|               1|
|  5.0| 19.40|               1|               1|
|  5.0| 19.40|               1|               1|
+-----+------+----------------+----------------+
only showing top 5 rows

DataFrame[score: string, price: string, helpfuless_count: bigint, helpfuless_score: bigint]

In [7]:
#Define mapping function
def get_mapping(rdd, idx):
    return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()

In [7]:
#Print mapping of Score variable
print "Mapping of first categorical feature column: %s" % get_mapping(df1, 0)

Mapping of first categorical feature column: {u' 3.0': 3, u' 4.0': 4, u' 1.0': 2, u' 2.0': 0, u' 5.0': 1}

In [8]:
#Perform mapping - find lengths of categorical and numerical vectors
#Score will be flattened out into categorical
#Price and Helpfulness Count will be numerical
mappings = [get_mapping(df1, i) for i in range(0,1)]
cat_len = sum(map(len, mappings))
num_len = len(df1.first()[1:3])
total_len = num_len + cat_len

print "Feature vector length for categorical features: %d" % cat_len
print "Feature vector length for numerical features: %d" % num_len
print "Total feature vector length: %d" % total_len


Feature vector length for categorical features: 5
Feature vector length for numerical features: 2
Total feature vector length: 7

In [9]:
#Define extract_features and extract_label functions
def extract_features(record):
    cat_vec = np.zeros(cat_len)
    i = 0
    step = 0
    for field in record[0:1]:
        m = mappings[i]
        idx = m[field]
        cat_vec[idx + step] = 1
        i = i + 1
        step = step + len(m)
    num_vec = np.array([float(field) for field in record[1:3]])
    return np.concatenate((cat_vec, num_vec))
def extract_label(record):
    return float(record[-1])

In [11]:
#Create LabeledPoint RDD object based on the above functions
data = df1.map(lambda r: LabeledPoint(extract_label(r), extract_features(r)))

In [12]:
#Check out the data
first = df1.first()
first_point = data.first()
print "Raw data: " + str(first[0:])
print "Label: " + str(first_point.label)
print "Linear Model feature vector:\n" + str(first_point.features)
print "Linear Model feature vector length: " + str(len(first_point.features))

Raw data: (u' 5.0', u' 15.99', 4, 3)
Label: 3.0
Linear Model feature vector:
[0.0,1.0,0.0,0.0,0.0,15.99,4.0]
Linear Model feature vector length: 7

In [13]:
#Grab features from the RDD object for scaling
features = data.map(lambda x: x.features)
features.take(5)

[DenseVector([0.0, 1.0, 0.0, 0.0, 0.0, 15.99, 4.0]), DenseVector([0.0, 1.0, 0.0, 0.0, 0.0, 19.4, 10.0]), DenseVector([0.0, 1.0, 0.0, 0.0, 0.0, 19.4, 1.0]), DenseVector([0.0, 1.0, 0.0, 0.0, 0.0, 19.4, 1.0]), DenseVector([0.0, 1.0, 0.0, 0.0, 0.0, 19.4, 1.0])]

In [14]:
#Load scaling libraries and perform scaling on feature vectors
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler

standardizer = StandardScaler()
model = standardizer.fit(features)
features_transform = model.transform(features)
features_transform.take(5)

[DenseVector([0.0, 2.0134, 0.0, 0.0, 0.0, 0.3033, 0.1636]), DenseVector([0.0, 2.0134, 0.0, 0.0, 0.0, 0.368, 0.409]), DenseVector([0.0, 2.0134, 0.0, 0.0, 0.0, 0.368, 0.0409]), DenseVector([0.0, 2.0134, 0.0, 0.0, 0.0, 0.368, 0.0409]), DenseVector([0.0, 2.0134, 0.0, 0.0, 0.0, 0.368, 0.0409])]

In [15]:
#Get label vector (Helpfulness Score values)
lab = df1.map(lambda row: row[3])
lab.take(5)

[3, 8, 1, 1, 1]

In [16]:
#Zip (combine) the scaled features data with the Helpfulness Score data
transformedData = lab.zip(features_transform)
transformedData.take(5)

[(3, DenseVector([0.0, 2.0134, 0.0, 0.0, 0.0, 0.3033, 0.1636])), (8, DenseVector([0.0, 2.0134, 0.0, 0.0, 0.0, 0.368, 0.409])), (1, DenseVector([0.0, 2.0134, 0.0, 0.0, 0.0, 0.368, 0.0409])), (1, DenseVector([0.0, 2.0134, 0.0, 0.0, 0.0, 0.368, 0.0409])), (1, DenseVector([0.0, 2.0134, 0.0, 0.0, 0.0, 0.368, 0.0409]))]

In [18]:
#Get the RDD back into LabeledPoint format for the linear regression
transformedData = transformedData.map(lambda row: LabeledPoint(row[0],[row[1]]))
transformedData.take(5)

[LabeledPoint(3.0, [0.0,2.01341972154,0.0,0.0,0.0,0.303282603018,0.163614079319]), LabeledPoint(8.0, [0.0,2.01341972154,0.0,0.0,0.0,0.367960131241,0.409035198297]), LabeledPoint(1.0, [0.0,2.01341972154,0.0,0.0,0.0,0.367960131241,0.0409035198297]), LabeledPoint(1.0, [0.0,2.01341972154,0.0,0.0,0.0,0.367960131241,0.0409035198297]), LabeledPoint(1.0, [0.0,2.01341972154,0.0,0.0,0.0,0.367960131241,0.0409035198297])]

In [19]:
#Run linear regression model
linear_model = LinearRegressionWithSGD.train(transformedData, iterations=10, step=0.1, intercept=False)

In [20]:
#Check out Actual vs Predicted values
true_vs_predicted = data.map(lambda p: (p.label, linear_model.predict(p.features)))
print "Linear Model predictions: " + str(true_vs_predicted.take(5))

Linear Model predictions: [(3.0, 47.125492833594983), (8.0, 101.22626963863219), (1.0, 23.6482800840792), (1.0, 23.6482800840792), (1.0, 23.6482800840792)]

In [21]:
#Define error testing functions
def squared_error(actual, pred):
    return (pred - actual)**2

def abs_error(actual, pred):
    return np.abs(pred - actual)

def squared_log_error(pred, actual):
    return (np.log(pred + 1) - np.log(actual + 1))**2

In [24]:
#Calculate and display error values of the model
mse = true_vs_predicted.map(lambda (t, p): squared_error(t, p)).mean()
mae = true_vs_predicted.map(lambda (t, p): abs_error(t, p)).mean()
rmsle = np.sqrt(true_vs_predicted.map(lambda (t, p): squared_log_error(t, p)).mean())
print "Linear Model - Mean Squared Error: %2.4f" % mse
print "Linear Model - Mean Absolute Error: %2.4f" % mae
print "Linear Model - Root Mean Squared Log Error: %2.4f" % rmsle

Linear Model - Mean Squared Error: 45293.8213
Linear Model - Mean Absolute Error: 86.6048
Linear Model - Root Mean Squared Log Error: 2.6456