In [1]:
path = "/home/stan/ApacheSpark/MLwithSpark/Regression/hour_noheader.csv"

In [2]:
raw_data = sc.textFile(path)

In [3]:
num_data=raw_data.count()

In [4]:
records = raw_data.map(lambda x: x.split(","))

In [5]:
first = records.first()

In [6]:
print first

[u'1', u'2011-01-01', u'1', u'0', u'1', u'0', u'0', u'6', u'0', u'1', u'0.24', u'0.2879', u'0.81', u'0', u'3', u'13', u'16']


In [7]:
print num_data

17379


# Binary Encoding

In [8]:
def get_mapping(rdd,idx):
    return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()

In [9]:
print "Mapping of first categorical feature column: %s" %get_mapping(records,2)

Mapping of first categorical feature column: {u'1': 0, u'3': 1, u'2': 2, u'4': 3}


In [10]:
mappings = [get_mapping(records,i) for i in range(2,10)]

In [11]:
cat_len = sum(map(len,mappings))

In [12]:
cat_len

57

In [13]:
num_len = len(records.first()[11:15])

In [14]:
records.first()[11:15]

[u'0.2879', u'0.81', u'0', u'3']

In [15]:
total_len = num_len+cat_len

In [18]:
print "Feature vector length for categorical features: %s" %cat_len

Feature vector length for categorical features: 57


In [19]:
from pyspark.mllib.regression import LabeledPoint

In [20]:
import numpy as np

In [23]:
def extract_features(record):
    cat_vec = np.zeros(cat_len)
    i = 0
    step = 0
    for field in record[2:9]:
        m = mappings[i]
        idx = m[field]
        cat_vec[idx+step] =1
        i = i+1
        step =step + len(m)
    num_vec = np.array([float(field) for field in record[10:14]])
    return np.concatenate((cat_vec,num_vec))

        

In [24]:
def extract_label(record):
    return float(record[-1])

In [28]:
data = records.map(lambda r: LabeledPoint(extract_label(r),extract_features(r)))

In [29]:
first_point = data.first()

In [30]:
print "Raw data:" + str(first[2:])

Raw data:[u'1', u'0', u'1', u'0', u'0', u'6', u'0', u'1', u'0.24', u'0.2879', u'0.81', u'0', u'3', u'13', u'16']


In [31]:
print "Label:" + str(first_point.label)

Label:16.0


In [32]:
print "Linear Model feature vector: \n" + str(first_point.features)

Linear Model feature vector: 
[1.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.24,0.2879,0.81,0.0]


In [33]:
def extract_features_dt(record):
    return np.array(map(float,record[2:14]))


In [34]:
data_dt = records.map(lambda r: LabeledPoint(extract_label(r),extract_features_dt(r)))

In [35]:
first_point_dt = data_dt.first()

In [36]:
str(first_point_dt.features)

'[1.0,0.0,1.0,0.0,0.0,6.0,0.0,1.0,0.24,0.2879,0.81,0.0]'

In [37]:
from pyspark.mllib.regression import LinearRegressionWithSGD

In [38]:
from pyspark.mllib.tree import DecisionTree

In [39]:
help(LinearRegressionWithSGD.train)

Help on method train in module pyspark.mllib.regression:

train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None, regParam=0.0, regType=None, intercept=False, validateData=True) method of __builtin__.type instance
    Train a linear regression model using Stochastic Gradient
    Descent (SGD).
    This solves the least squares regression formulation
    
        f(weights) = 1/(2n) ||A weights - y||^2,
    
    which is the mean squared error.
    Here the data matrix has n rows, and the input RDD holds the
    set of rows of A, each with its corresponding right hand side
    label y. See also the documentation for the precise formulation.
    
    :param data:              The training data, an RDD of
                              LabeledPoint.
    :param iterations:        The number of iterations
                              (default: 100).
    :param step:              The step parameter used in SGD
                              (default: 1.0).
    :

In [40]:
help(DecisionTree.trainRegressor)

Help on method trainRegressor in module pyspark.mllib.tree:

trainRegressor(cls, data, categoricalFeaturesInfo, impurity='variance', maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0) method of __builtin__.type instance
    Train a DecisionTreeModel for regression.
    
    :param data: Training data: RDD of LabeledPoint.
                 Labels are real numbers.
    :param categoricalFeaturesInfo: Map from categorical feature
             index to number of categories.
             Any feature not in this map is treated as continuous.
    :param impurity: Supported values: "variance"
    :param maxDepth: Max depth of tree.
             E.g., depth 0 means 1 leaf node.
             Depth 1 means 1 internal node + 2 leaf nodes.
    :param maxBins: Number of bins used for finding splits at each
             node.
    :param minInstancesPerNode: Min number of instances required at
             child nodes to create the parent split
    :param minInfoGain: Min info gain requir

In [41]:
linear_model = LinearRegressionWithSGD.train(data,iterations=10,step=0.1,intercept=False)

In [42]:
true_vs_predicted = data.map(lambda p: (p.label,linear_model.predict(p.features)))

In [43]:
print "Linear Model predictions: "+ str(true_vs_predicted.take(5)) 

Linear Model predictions: [(16.0, 117.89250386724845), (40.0, 116.2249612319211), (32.0, 116.02369145779235), (13.0, 115.67088016754433), (1.0, 115.56315650834316)]


In [44]:
dt_model = DecisionTree.trainRegressor(data_dt,{})

In [45]:
preds = dt_model.predict(data_dt.map(lambda p: p.features))

In [46]:
actual = data.map(lambda p: p.label)


In [47]:
true_vs_predicted_dt = actual.zip(preds)

In [48]:
print "Decision Tree predictions: " + str(true_vs_predicted_dt.take(5))

Decision Tree predictions: [(16.0, 54.913223140495866), (40.0, 54.913223140495866), (32.0, 53.171052631578945), (13.0, 14.284023668639053), (1.0, 14.284023668639053)]


In [49]:
print "Decision Tree depth: "+ str(dt_model.depth())

Decision Tree depth: 5


In [50]:
print "Decision Tree number of nodes: "+ str(dt_model.numNodes())

Decision Tree number of nodes: 63


# Evaluating the performance of regression models

In [51]:
def squared_error(actual, pred):
    return (pred-actual)**2

In [52]:
def abs_error(actual, pred):
    return np.abs(pred-actual)

In [53]:
def squared_log_error(pred,actual):
    return (np.log(pred+1)-np.log(actual+1))**2

In [54]:
mse = true_vs_predicted.map(lambda(t,p):squared_error(t,p)).mean()

In [55]:
mae = true_vs_predicted.map(lambda(t,p):abs_error(t,p)).mean()

In [56]:
rmsle = np.sqrt(true_vs_predicted.map(lambda (t,p):squared_log_error(t,p)).mean()) ## root mean squared log error

In [58]:
print "Linear Model- Mean squared error, mean absolute error, root mean squared log error: %2.4f, %2.4f, %2.4f" %(mse,mae,rmsle)

Linear Model- Mean squared error, mean absolute error, root mean squared log error: 30679.4539, 130.6429, 1.4653


In [59]:
mse_dt = true_vs_predicted_dt.map(lambda(t,p):squared_error(t,p)).mean()
mae_dt = true_vs_predicted_dt.map(lambda(t,p):abs_error(t,p)).mean()
rmsle_dt = np.sqrt(true_vs_predicted_dt.map(lambda (t,p):squared_log_error(t,p)).mean())

print "Decision Tree model- Mean squared error, mean absolute error, root mean squared log error: %2.4f, %2.4f, %2.4f" %(mse_dt,mae_dt,rmsle_dt)


Decision Tree model- Mean squared error, mean absolute error, root mean squared log error: 11560.7978, 71.0969, 0.6259
