In [None]:
# Spark did not perform better than scikit for this round of data.
# If spark could model all 1200 stores, I feel it would beat scikit.

In [27]:
# PyData
import pandas
import numpy
from pandas import DataFrame, Series
from sklearn import datasets, linear_model, preprocessing, cross_validation
from sklearn.ensemble import RandomForestRegressor, ExtraTreesRegressor
from sklearn.linear_model import LinearRegression, Ridge

# System
import datetime
import os
import math

# Graphing
#%matplotlib inline # Only works on Python 3 in the docker container
#import seaborn # Only works on Python 3 in the docker container

#os.environ['PYSPARK_PYTHON'] = 'python2'


%matplotlib inline

# Spark
import pyspark
from pyspark.sql import SQLContext

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint

pyspark.SparkContext.setSystemProperty('spark.executor.memory', '30g')

sc = pyspark.SparkContext('local[2]')
sqlContext = SQLContext(sc)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[2]) created by __init__ at <ipython-input-1-f72c1e1c4b39>:36 

In [None]:
df_basic = DataFrame.from_csv("train_features_basic.csv", index_col=['Date', 'Store'])
df_means = DataFrame.from_csv("train-features-predicted_mean.csv", index_col=['Date', 'Store'])
del df_means['Sales-prediction_mean_error'] # Not supported

In [None]:
df_sales = DataFrame.from_csv("train.csv", index_col=['Date', 'Store'])[['Sales']]

In [None]:
df_features = df_means.join(df_basic, how='outer')
train_df = df_sales.join(df_features)
train_df['Sales_predicted'] = train_df['Sales_predicted'].fillna(0) # when sales is 0, sales_predicted is nan.
train_df.head()

In [None]:
del df_features
del df_basic
del df_means
del df_sales

In [None]:
train_df['Sales_predicted'] = train_df['Sales_predicted'].fillna(train_df['Sales_predicted'].mean())

In [None]:
train_df['Sales_predicted'] = train_df['Sales_predicted'].map(lambda x: int(x))

In [None]:
train_df.reset_index(inplace=True)

In [None]:
del train_df['Date']

In [None]:
feature_columns = list(train_df.columns)
feature_columns.remove("Sales")
feature_columns = ['Sales'] + feature_columns
train_df = train_df[feature_columns]

In [None]:

train_df.head()

# Switch to Spark

In [None]:
df = sqlContext.createDataFrame(train_df)
# This relies on correct order: [0] is sales, [1:] are all features
df = df.map(lambda row: LabeledPoint(row[0], row[1:])).toDF()


In [None]:
%%time
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=31).fit(df)


In [28]:
%%time
(trainingData, testData) = df.randomSplit([0.8, 0.2])


# Train a RandomForest model.
# rf = RandomForestRegressor(numTrees=10, maxDepth=7, maxBins=31, featuresCol="indexedFeatures")
# OOMS with numTrees=10, maxDepth=7, maxBins=1200 on 30gb r.2xlarge
# Works with 30gb r.2xlarge: numTrees=12, maxDepth=7, maxBins=31
# OOMS with numTrees=12, maxDepth=10, maxBins=31 on 30gb r.2xlarge
# QUESTION: Does limiting the number of processes improve this by reducing memory contention of the processes?
# - YES! The above was using all 8 processes, by limiting to 2 processes:
# - 2procs: Works with 30gb r.2xlarge: numTrees=12, maxDepth=10, maxBins=31
# - OOMS: Works with 30gb r.2xlarge: numTrees=10, maxDepth=7, maxBins=1200
# - Does not finish: numTrees=30, maxDepth=20, maxBins=31
# - Takes 5 min to fit: numTrees=15, maxDepth=10, maxBins=31
rf = RandomForestRegressor(numTrees=3, maxDepth=10, maxBins=31, featuresCol="indexedFeatures")


# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])
#pipeline = Pipeline(stages=[rf])


CPU times: user 0 ns, sys: 8 ms, total: 8 ms
Wall time: 9.85 ms


In [29]:
%%time
# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

 

CPU times: user 28 ms, sys: 0 ns, total: 28 ms
Wall time: 2min 13s


In [30]:

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
# predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
# print "Root Mean Squared Error (RMSE) on test data = %g" % rmse

# Compute RMPSE
squares = predictions.rdd.filter(lambda x: x.label != 0).map(lambda x: ((x.label - x.prediction) / x.label) *  ((x.label - x.prediction) / x.label))

mean = squares.mean()
import math
math.sqrt(mean)

0.1931746743111487

In [31]:
testData.head()

Row(features=DenseVector([14.0, 6349.0, 5.0, 1.0, 1.0, 0.0, 1.0, 31.0, 4.0, 212.0, 7.0, 3.0, 31.0, 2015.0]), label=6544.0)

In [32]:
def score(prediction, actual):
    pcts = (actual - prediction) / actual
    return math.sqrt( (pcts * pcts).mean() )


In [33]:
predictions.head()

Row(features=DenseVector([14.0, 6349.0, 5.0, 1.0, 1.0, 0.0, 1.0, 31.0, 4.0, 212.0, 7.0, 3.0, 31.0, 2015.0]), label=6544.0, indexedFeatures=DenseVector([14.0, 6349.0, 4.0, 1.0, 1.0, 0.0, 1.0, 30.0, 4.0, 212.0, 6.0, 2.0, 31.0, 2.0]), prediction=6619.00634858855)

In [34]:
predictions_df = predictions.map(lambda x: [x.label, x.features[1], x.prediction]).collect()

In [35]:
predictions_df = DataFrame.from_records(predictions_df, columns=['Actual', 'Prediction-Mean', 'Prediction-RF'])
predictions_df.head()

Unnamed: 0,Actual,Prediction-Mean,Prediction-RF
0,6544,6349,6619.006349
1,7248,6764,7809.727993
2,6395,4764,5538.316913
3,5464,5037,5538.316913
4,11946,11105,11666.842641


In [36]:
predictions_df = predictions_df[predictions_df.Actual != 0]

In [37]:
score(predictions_df['Prediction-Mean'], predictions_df.Actual)

0.17763765947346152

In [38]:
score(predictions_df['Prediction-RF'], predictions_df.Actual)

0.19317467431114874

# Eval

In [None]:
df_basic = DataFrame.from_csv("test_features_basic.csv", index_col=['Date', 'Store'])
df_means = DataFrame.from_csv("test-features-predicted_mean.csv", index_col=['Date', 'Store'])

In [None]:
df_test_features = df_means.join(df_basic)

In [None]:
df_test_features.fillna(0, inplace=True)
df_test_features['Sales_predicted'] = df_test_features['Sales_predicted'].map(lambda x: int(x))

In [None]:
df_test_features.reset_index(inplace=True)
del df_test_features['Date']

In [None]:
df_test_features.set_index('Id', inplace=True)
df_test_features.head()

In [None]:

df_test_features.head()

In [None]:
len(df_test_features.columns)

In [None]:
df_test = sqlContext.createDataFrame(df_test_features[0:10])
df_test = df_test.map(lambda row: LabeledPoint(0, features=row[0:])).toDF()

In [None]:
model.transform(df_test).take(1)

In [None]:
prediction = df_test_features.index

In [None]:
model.transform(df_test).map(lambda x: x.prediction).take(5)

In [None]:
1+1

In [None]:
prediction['Sales'] = model.transform(df_test).map(lambda x: x.prediction).collect()

In [None]:
prediction.head()

In [None]:

prediction.to_csv( "spark-v1.csv", index = False )

In [None]:
model.transform(trainingData).map(lambda x: x.prediction).take(5)

In [None]:
 trainingData.take(5)