In [1]:
import pandas as pd
import numpy as np
import sys
#import sources.endomondolib as endo
#import sources.pysparkconvenience as ps
from numpy import array
from math import sqrt
from pyspark.sql.functions import *
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.sql import DataFrameReader
from pyspark.sql import SQLContext
from IPython.display import display, HTML
from pyspark.sql.functions import col, mean, min, max
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, GBTRegressor, RandomForestRegressor
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator

from pyspark.ml.feature import VectorAssembler

# Disable warnings, set Matplotlib inline plotting and load Pandas package
import warnings
warnings.filterwarnings('ignore')

%matplotlib inline
pd.options.display.mpl_style = 'default'

%load_ext autotime

In [2]:
!pwd

/home/ubuntu/Regression 2nd Iteration
time: 109 ms


In [4]:
#sc = SQLContext()
#create pyspark dataframe from csv
def df_from_csv(csv_file):
    text = sc.textFile(csv_file)\
       .map(lambda line: line.split(','))
   #didn’t work with take(1). believe returns
   #different object then first()
    schema = text.first()
    data = text.filter(lambda x: x != schema)
    df = sqlContext.createDataFrame(data, schema)
    return df

#here’s the new vectorizer function:

def vectorizeData(data):
        return data.rdd.map(lambda r: [r[0], r[1], r[2], r[3], Vectors.dense(r[4:-1]),float(r[-1])])\
            .toDF(['route cluster', 'performance cluster', 'userid', 'workoutid', 'features', 'label'])

#load cluster csv
pandas_df = pd.read_csv('dave.csv')
df = sqlContext.createDataFrame(pandas_df)

time: 2.58 s


In [5]:
pandas_df.columns

Index([u'index', u'workoutid', u'route_prediction', u'perf_prediction',
       u'userid', u'diff_altitude', u'geo_distance', u'heart_rate_avg',
       u'speed_avg', u'elapsed_time', u'user_avg_speed', u'user_avg_dist'],
      dtype='object')

time: 4.13 ms


In [6]:
df.show(5)

+-----+---------+----------------+---------------+-------+---------------+--------------------+--------------+-------------+------------+------------------+--------------------+
|index|workoutid|route_prediction|perf_prediction| userid|  diff_altitude|        geo_distance|heart_rate_avg|    speed_avg|elapsed_time|    user_avg_speed|       user_avg_dist|
+-----+---------+----------------+---------------+-------+---------------+--------------------+--------------+-------------+------------+------------------+--------------------+
|    0|323338446|               1|              2|4133458|-0.157952472568|-0.22519654035599998|       160.469|13.3074129583|        2132|     13.3074129583|-0.22519654035599998|
|    1|380349663|               1|              0|4457345|-0.212179556489|-0.26389169693000003|         152.0| 8.6422788248|        3180|      8.6422788248|-0.26389169693000003|
|    2|460809081|               1|              2|9525377|-0.212179556489|     -0.415958881378|         162.0|

In [7]:
df.schema

StructType(List(StructField(index,LongType,true),StructField(workoutid,LongType,true),StructField(route_prediction,LongType,true),StructField(perf_prediction,LongType,true),StructField(userid,LongType,true),StructField(diff_altitude,DoubleType,true),StructField(geo_distance,DoubleType,true),StructField(heart_rate_avg,DoubleType,true),StructField(speed_avg,DoubleType,true),StructField(elapsed_time,LongType,true),StructField(user_avg_speed,DoubleType,true),StructField(user_avg_dist,DoubleType,true)))

time: 2.31 ms


In [9]:
select_columns = ['route_prediction', 'perf_prediction','userid', 'workoutid', 'geo_distance', 'diff_altitude', \
                'speed_avg', 'heart_rate_avg','user_avg_dist', 'user_avg_speed', 'elapsed_time']

reg_df = vectorizeData(df.select(select_columns))

time: 191 ms


In [10]:
route_clusters = reg_df.select('route cluster').distinct().collect()
perf_clusters = reg_df.select('performance cluster').distinct().collect()
route_cluster_numbers = [int(route_clusters[i][0]) for i in range(len(route_clusters))]
perf_cluster_numbers = [int(perf_clusters[i][0]) for i in range(len(perf_clusters))]

time: 2.97 s


In [31]:
#model_dictionary
model_dict = {}
model_dict['lr'] = {'model': LinearRegression(featuresCol="features", labelCol="label"),\
                    'param': "ParamGridBuilder() \
                    .addGrid(temp_lr.maxIter, [5, 10, 100]) \
                    .addGrid(temp_lr.regParam, [0, 0.1, 0.01]) \
                    .build()"}

model_dict['dt'] = {'model': DecisionTreeRegressor(featuresCol="features", labelCol="label",maxMemoryInMB=1028),\
                    'param': "ParamGridBuilder() \
                    .addGrid(temp_lr.maxDepth, [3, 5]) \
                    .addGrid(temp_lr.minInfoGain, [0, 0.1, 1]) \
                    .build()"}

model_dict['gbt'] = {'model': GBTRegressor(featuresCol="features", labelCol="label", maxMemoryInMB=2056),\
                    'param': "ParamGridBuilder() \
                    .addGrid(temp_lr.maxDepth, [3, 5]) \
                    .addGrid(temp_lr.maxIter, [10,20,40]) \
                    .build()"}

model_dict['rfr'] = {'model': RandomForestRegressor(featuresCol="features", labelCol="label", maxMemoryInMB = 2056),\
                    'param': "ParamGridBuilder() \
                    .addGrid(temp_lr.maxDepth, [3, 5]) \
                    .addGrid(temp_lr.numTrees, [10,20,40]) \
                    .build()"}

time: 18.5 ms


In [33]:
for m in model_dict.keys():
    for n in perf_cluster_numbers:
        for i in route_cluster_numbers:
            temp_df = reg_df[(reg_df['route cluster'] == i) & (reg_df['performance cluster'] == n)]
            #temp_df1 = reg_df[reg_df['route cluster']==i]
            #temp_df = temp_df1[reg_df['performance cluster']==n]
            temp_df_cv = temp_df

            if temp_df.count() == 0:
                #print "Cluster pair dropped"
                pass

            else:
                temp_lr = model_dict[m]['model']
                temp_lrModel = temp_lr.fit(temp_df['label','features'])
                temp_df = temp_lrModel.transform(temp_df)

                paramGrid = eval(model_dict[m]['param'])
                #paramGrid = ParamGridBuilder() \
                #.addGrid(temp_lr.maxDepth, [3, 5]) \
                #.addGrid(temp_lr.numTrees, [10,20,40]) \
                #.build()


                evaluator = RegressionEvaluator(
                    labelCol="label", predictionCol="prediction", metricName="mae")

                crossval = CrossValidator(estimator=temp_lr,
                                          estimatorParamMaps=paramGrid,
                                          evaluator=evaluator,
                                          numFolds=10)  # use 3+ folds in practice

                # Run cross-validation, and choose the best set of parameters.
                cvModel = crossval.fit(temp_df_cv)

                pred = cvModel.transform(temp_df_cv)


                #print("For " + str(type(temp_lr)) + " and cluster " + str(i))
                #print(cvModel.explainParams())

                #Print the coefficients and intercept for linear regression
                #print("Coefficients: " + str(temp_lrModel.coefficients))
                #print("Intercept: " + str(temp_lrModel.intercept))


                
                mae = evaluator.evaluate(pred)
                rmse = evaluator.evaluate(pred,{evaluator.metricName: "rmse"})
                r2 = evaluator.evaluate(pred, {evaluator.metricName: "r2"})

                model_dict[m]['Route%d_Perf%d_%s" % (n,i,m)']={}
                model_dict[m]['Route%d_Perf%d_%s" % (n,i,m)']['mae'] = mae
                model_dict[m]['Route%d_Perf%d_%s" % (n,i,m)']['rmse'] = rmse
                model_dict[m]['Route%d_Perf%d_%s" % (n,i,m)']['r2'] = r2
                

                
                print("(Route, Perf, Model): " + str((n,i,m)) +": " + str(rmse))
                print(cvModel.bestModel)
                #print(cvModel.bestModel.weights)
                #print(cvModel.bestModel.coefficients)
                basePath = "/home/ubuntu/Regression\ 2nd\ Iteration/Models/"
                cvModel.bestModel.save(basePath +"Route%d_Perf%d_%s" % (n,i,m))
                
                model_dict[m]['Route%d_Perf%d_%s" % (n,i,m)']["path"] = basePath +"Route%d_Perf%d_%s" % (n,i,m)
 

(Route, Perf, Model): (0, 0, 'rfr'): 597.352163189
RandomForestRegressionModel (uid=rfr_2e57092fb85d) with 10 trees


AnalysisException: u'Path does not exist: file:/home/ubuntu/Regression\\ 2nd\\ Iteration/models/Route0_Perf0_rfr/treesMetadata;'

time: 1min 30s


In [35]:
!pwd

/home/ubuntu/Regression 2nd Iteration
time: 128 ms


In [37]:
path = basePath+"Route%d_Perf%d_%s" % (n,i,m)

time: 1.24 ms


In [48]:
basePath

'/home/ubuntu/Regression\\ 2nd\\ Iteration/Models/'

time: 2.36 ms


In [41]:
path

'/home/ubuntu/Regression\\ 2nd\\ Iteration/Models/Route0_Perf0_rfr'

time: 2.3 ms


In [36]:
!mkdir basePath+"Route%d_Perf%d_%s" % (n,i,m)

/bin/sh: 1: Syntax error: "(" unexpected
time: 125 ms


In [47]:
basePath = "/home/ubuntu/Regression\ 2nd\ Iteration/Models/"
cvModel.bestModel.save(basePath +"Route%d_Perf%d_%s" % (n,i,m))
                

Py4JJavaError: An error occurred while calling o22162.save.
: java.io.IOException: Path /home/ubuntu/Regression\ 2nd\ Iteration/Models/Route0_Perf0_rfr already exists. Please use write.overwrite().save(path) to overwrite it.
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:107)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


time: 25.2 ms
