# Bike Sharing Demand 

In [None]:
#Initializing PySpark
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

from pyspark.mllib.regression import LabeledPoint
import numpy as np

from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.tree import DecisionTree

## Loading and inspecting the data 

In [None]:
path = "input/hour_noheader.csv"
raw_data = sc.textFile(path)
num_data = raw_data.count()

In [None]:
records = raw_data.map(lambda x: x.split(","))
first = records.first()
print (first)

In [None]:
print (num_data)

In [None]:
records.cache()

In [None]:
def get_mapping(rdd, idx):
    return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()


In [None]:
print ("Mapping of first categorical feature column: %s" % get_mapping(records, 2))

In [None]:
mappings = [get_mapping(records, i) for i in range(2,10)]
cat_len = sum(map(len, mappings))
num_len = len(records.first()[11:15])
total_len = num_len + cat_len

In [None]:
print ("Feature vector length for categorical features: %d" % cat_len)
print ("Feature vector length for numerical features: %d" % num_len)
print ("Total feature vector length: %d" % total_len)

## Creating feature vectors for the linear model 

In [None]:
def extract_features(record):
    cat_vec = np.zeros(cat_len)
    i = 0
    step = 0
    for field in record[2:9]:
        m = mappings[i]
        idx = m[field]
        cat_vec[idx + step] = 1
        i = i + 1
        step = step + len(m)
    num_vec = np.array([float(field) for field in record[10:14]])
    return np.concatenate((cat_vec, num_vec))

In [None]:
def extract_label(record):
    return float(record[-1])

In [None]:
data = records.map(lambda r: LabeledPoint(extract_label(r), extract_features(r)))

In [None]:
first_point = data.first()
print ("Raw data: " + str(first[2:]))
print ("Label: " + str(first_point.label))
print ("Linear Model feature vector:\n" + str(first_point.features))
print ("Linear Model feature vector length: " + str(len(first_point.features)))

## Creating feature vectors for the decision tree

In [None]:
def extract_features_dt(record):
    return np.array(map(float, record[2:14]))

In [None]:
data_dt = records.map(lambda r: LabeledPoint(extract_label(r),extract_features_dt(r)))
first_point_dt = data_dt.first()
print ("Decision Tree feature vector: " + str(first_point_dt.features))
print ("Decision Tree feature vector length: " + str(len(first_point_dt.features)))

## Training a regression model on the bike sharing dataset

In [None]:
linear_model = LinearRegressionWithSGD.train(data, iterations=10,step=0.1, intercept=False)

In [None]:
true_vs_predicted = data.map(lambda p: (p.label, linear_model.predict(p.features)))
print ("Linear Model predictions: " + str(true_vs_predicted.take(5)))

In [None]:
dt_model = DecisionTree.trainRegressor(data_dt,{})
preds = dt_model.predict(data_dt.map(lambda p: p.features))
actual = data.map(lambda p: p.label)
true_vs_predicted_dt = actual.zip(preds)
print ("Decision Tree predictions: " + str(true_vs_predicted_dt.take(5)))
print ("Decision Tree depth: " + str(dt_model.depth()))
print ("Decision Tree number of nodes: " + str(dt_model.numNodes()))

## Evaluating the performance of regression models

In [None]:
def squared_error(actual, pred):
    return (pred - actual)**2

In [None]:
def abs_error(actual, pred):
    return np.abs(pred - actual)

In [None]:
def squared_log_error(pred, actual):
    return (np.log(pred + 1) - np.log(actual + 1))**2

In [None]:
mse = true_vs_predicted.map(lambda (t, p): squared_error(t, p)).mean()
mae = true_vs_predicted.map(lambda (t, p): abs_error(t, p)).mean()
rmsle = np.sqrt(true_vs_predicted.map(lambda (t, p): squared_log_error(t, p)).mean())

In [None]:
print ("Linear Model - Mean Squared Error: %2.4f" % mse)
print ("Linear Model - Mean Absolute Error: %2.4f" % mae)
print ("Linear Model - Root Mean Squared Log Error: %2.4f" % rmsle)

In [None]:
mse_dt = true_vs_predicted_dt.map(lambda (t, p): squared_error(t, p)).mean()
mae_dt = true_vs_predicted_dt.map(lambda (t, p): abs_error(t, p)).mean()
rmsle_dt = np.sqrt(true_vs_predicted_dt.map(lambda (t, p): squared_log_error(t, p)).mean())
print ("Decision Tree - Mean Squared Error: %2.4f" % mse_dt)
print ("Decision Tree - Mean Absolute Error: %2.4f" % mae_dt)
print ("Decision Tree - Root Mean Squared Log Error: %2.4f" % rmsle_dt)