In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import abs, avg 

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor

from pyspark.ml.evaluation import RegressionEvaluator

# n-fold validation and the results.
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder


In [None]:
# spark.conf.set("spark.executor.memory", "2g")

In [2]:
%%configure -f
{"driverMemory": "4G", "executorCores": 4, "executorMemory": "4G"}

UsageError: Cell magic `%%configure` not found.


In [2]:
import numpy as np
import pandas as pd
import os
# from scipy.signal import hilbert, hann, convolve
# import scipy.stats as stats

pyspark_submit_args = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
from pyspark.sql import SparkSession

spark = SparkSession \
 .builder \
 .appName("myApp") \
 .config("spark.mongodb.input.uri", "mongodb://3.17.193.180/project.features2")\
 .config("spark.mongodb.output.uri", "mongodb://3.17.193.180/project.features2")\
 .getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()


In [3]:
# 1000         !! only run one per time, 1st
df_train, df_test = df.drop('_id','index').randomSplit([0.8,0.2])

## 5000           !! second 
# df_train, df_test = df.filter(df.index%5==0).drop('_id','index').randomSplit([0.8,0.2])

### 10000          !! third
# df_train, df_test = df.filter(df.index%10==0).drop('_id','index').randomSplit([0.8,0.2])

In [4]:
df_train.show(1)
# df_train.count()

+-----------------+-----------------+-----------------------+-------+-------+---------+-----------------+-----------------+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+---------------------+---------+---+-----------------+-----------------+-----+---------------+---------------+--------------+--------------+-----------------+------------------+------------------+-----------------+---------------+-----------------+-----------------+-----------------+----------------------------+----------------------------+---------------------------+---------------------------+---+------+---------------+---------------+--------------+--------------+------------------+------------------+-----------------+------+------------------+------------------+-----------------+------------------+-----------------+------------------+---+---+---+---+---+---+----+----+------------------+------------------+------------------+----+------------------+-------

In [5]:
feature_names = df_train.columns[0:-1]
# feature_names

## VectorAssemble features to build df for modeling

In [6]:
va = VectorAssembler(outputCol="features", inputCols=df_train.columns[0:-1])
df_train = va.transform(df_train).select("features", "y_target").persist()
df_test = va.transform(df_test).select("features", "y_target").persist()


## Set one evaluator for all algrithms, metric: RMSE on validation set

In [7]:
evaluator=RegressionEvaluator(labelCol="y_target",predictionCol="prediction", metricName="rmse")


## linear regression

In [8]:
lr = LinearRegression(featuresCol = 'features', labelCol='y_target',
                      maxIter=1, regParam=0.3, elasticNetParam=0.8)  # tune later

In [None]:
lrModel = lr.fit(df_test)


In [55]:
# feature_names x coeffecients
beta0 = [('intercept',round(lrModel.intercept,4))]
beta1 = [(x[0],round(x[1],4)) for x in zip(feature_names,lrModel.coefficients) if abs(x[1])>0.001]
beta1.sort(key = lambda x: abs(x[1]),reverse=True)
beta0+beta1

[('intercept', 24.9215),
 ('q80', -3.2064),
 ('mean', 1.8754),
 ('q85', -0.9219),
 ('avg_last_50000', 0.9087),
 ('q75', -0.6713),
 ('q90', -0.6122),
 ('avg_first_50000', 0.5092),
 ('count_big', 0.0084),
 ('std_first_50000', 0.0072),
 ('std_last_50000', -0.0016),
 ('max_last_10000', -0.0012),
 ('min_last_10000', 0.0008),
 ('max_first_50000', 0.0002),
 ('min_first_50000', -0.0002),
 ('min_last_50000', 0.0002),
 ('max1', 0.0001)]

In [27]:
# model performance on training data
lr_summary = lrModel.summary
lr_RMSE = lr_summary.rootMeanSquaredError
lr_R_2_adj = lr_summary.r2adj
print("linear model RMSE on train set = "+ '{:.4f}'.format(lr_RMSE))
print("linear model adjusted R square on train set = "+ '{:.4f}'.format(lr_R_2_adj))

# model performance on valid/test set
lr_preds = lrModel.transform(df_test)
lr_preds= lr_preds.withColumn('abs_error', abs(lr_preds.prediction-lr_preds.y_target))\
    .withColumn('APE', abs((lr_preds.prediction-lr_preds.y_target)/lr_preds.y_target))

lr_RMSE_test = evaluator.evaluate(lr_preds)
lr_MAPE_test = lr_preds.agg(avg('APE')).collect()[0][0]

print("linear model RMSE on test set = "+ '{:.4f}'.format(lr_RMSE_test))
print("linear model MAPE on test set = "+ '{:.2%}'.format(lr_MAPE_test))

linear model RMSE = 2.7556
linear model adjusted R square = 0.5728
linear model RMSE on test set = 2.7546
linear model MAPE on test set = 323.93%


In [28]:
print("linear model MAPE of prediction on signal within 1s before an actual fail = "\
      + '{:.2%}'.format(lr_preds.filter(lr_preds.y_target<=1).agg(avg('APE')).collect()[0][0]))
print("linear model MAPE of prediction on signal 1~5s before an actual fail = "\
      + '{:.2%}'.format(lr_preds.filter(lr_preds.y_target>1).filter(lr_preds.y_target<=5).agg(avg('APE')).collect()[0][0]))
print("linear model MAPE of prediction on signal 5~10s before an actual fail = "\
      + '{:.2%}'.format(lr_preds.filter(lr_preds.y_target>5).filter(lr_preds.y_target<=10).agg(avg('APE')).collect()[0][0]))
print("linear model MAPE of prediction on signal over 10s before an actual fail = "\
      + '{:.2%}'.format(lr_preds.filter(lr_preds.y_target>10).agg(avg('APE')).collect()[0][0]))


linear model MAPE of prediction on signal within 1s before an actual fail = 2394.58%
linear model MAPE of prediction on signal 1~5s before an actual fail = 99.76%
linear model MAPE of prediction on signal 5~10s before an actual fail = 13.05%
linear model MAPE of prediction on signal over 10s before an actual fail = 16.23%


### grid search for linear reg

In [55]:
#nfold cv
lr_cv = CrossValidator().setEstimator(lr).setEvaluator(evaluator).setNumFolds(5) # set
#ParamGridBuilder() – combinations of parameters and their values.

lr_paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [ x*0.01 for x in range(0,50,5)]) \
    .addGrid(lr.elasticNetParam, [ x*0.01 for x in range(0,100,5)])\
    .addGrid(lr.maxIter,[10,15,20,30,50])\
    .addGrid(lr.standardization, [True,False])
    .build()
#narrow down the hyper-para's range after first search 

lr_cv.setEstimatorParamMaps(lr_paramGrid)
lr_cvmodel = cv.fit(df_train)


In [71]:
lr_cvmodel.bestModel._java_obj.getElasticNetParam(),\
lr_cvmodel.bestModel._java_obj.getMaxIter(),\
lr_cvmodel.bestModel._java_obj.getElasticNetParam(),\
lr_cvmodel.bestModel._java_obj.getStandardization()
# to narrow down the grid search range, repeate the step above

(0.0, 10, 0.0)

In [68]:
lr_grid_test_rmse = evaluator.evaluate(lr_cvmodel.bestModel.transform(df_test))
lr_grid_test_rmse

0.6185593083463993

## DecisionTree Regression

In [29]:
## DT-reg
dt_reg = DecisionTreeRegressor(featuresCol = 'features', labelCol='y_target') # default hyper paras
dt_regModel = dt_reg.fit(df_train)


In [30]:
# Feature Importances:
dt_reg_feature_importance = list(zip([feature_names[i] for i in dt_regModel.featureImportances.indices],\
                                dt_regModel.featureImportances.values))
dt_reg_feature_importance.sort(key = lambda x: np.abs(x[1]),reverse=True)
[(x[0],np.round(x[1],4))for x in dt_reg_feature_importance]


[('q05_roll_std_10', 0.7377),
 ('q05_roll_std_100', 0.0628),
 ('skew', 0.0258),
 ('Moving_average_700_mean', 0.0235),
 ('Hilbert_mean', 0.0217),
 ('avg_first_50000', 0.0174),
 ('classic_sta_lta5_mean', 0.0169),
 ('q01_roll_std_100', 0.0146),
 ('avg_first_10000', 0.0132),
 ('q95_roll_std_1000', 0.0111),
 ('Hann_window_mean', 0.0087),
 ('q90', 0.0084),
 ('abs_max', 0.0067),
 ('q05_roll_std_1000', 0.0067),
 ('min_roll_std_10', 0.0066),
 ('mean_change_rate_last_50000', 0.0061),
 ('max_first_10000', 0.004),
 ('q01_roll_std_10', 0.0039),
 ('min_roll_std_1000', 0.0039)]

In [31]:
# model performance on training data
dt_yhats = dt_regModel.transform(df_train)
dt_yhats= dt_yhats.withColumn('abs_error', abs(dt_yhats.prediction-dt_yhats.y_target))\
    .withColumn('APE', abs((dt_yhats.prediction-dt_yhats.y_target)/dt_yhats.y_target))

dt_MAPE_train = dt_yhats.agg(avg('APE')).collect()[0][0]

dt_RMSE_train = evaluator.evaluate(dt_yhats)
print("DT model RMSE on train set = "+ '{:.4f}'.format(dt_RMSE_train))
print("DT model MAPE on train set = "+ '{:.2%}'.format(dt_MAPE_train))


DT model RMSE on train set = 1.7689
DT model MAPE on train set = 362.45%


In [32]:
# model performance on valid/test set
dt_preds = dt_regModel.transform(df_test)

dt_preds= dt_preds.withColumn('abs_error', abs(dt_preds.prediction-dt_preds.y_target))\
    .withColumn('APE', abs((dt_preds.prediction-dt_preds.y_target)/dt_preds.y_target))

dt_MAPE_test = dt_preds.agg(avg('APE')).collect()[0][0]
dt_RMSE_test = evaluator.evaluate(dt_preds)

print("DT model RMSE on test set = "+ '{:.4f}'.format(dt_RMSE_test))
print("DT model MAPE on test set = "+ '{:.2%}'.format(dt_MAPE_test))


DT model RMSE on test set = 2.8725
DT model MAPE on test set = 313.30%


In [33]:
# model performance on valid/test set per group

print("DT model MAPE of prediction on signal within 1s before an actual fail = "\
      + '{:.2%}'.format(dt_preds.filter(dt_preds.y_target<=1).agg(avg('APE')).collect()[0][0]))
print("DT model MAPE of prediction on signal 1~5s before an actual fail = "\
      + '{:.2%}'.format(dt_preds.filter(dt_preds.y_target>1).filter(dt_preds.y_target<=5).agg(avg('APE')).collect()[0][0]))
print("DT model MAPE of prediction on signal 5~10s before an actual fail = "\
      + '{:.2%}'.format(dt_preds.filter(dt_preds.y_target>5).filter(dt_preds.y_target<=10).agg(avg('APE')).collect()[0][0]))
print("DT model MAPE of prediction on signal over 10s before an actual fail = "\
      + '{:.2%}'.format(dt_preds.filter(dt_preds.y_target>10).agg(avg('APE')).collect()[0][0]))


DT model MAPE of prediction on signal within 1s before an actual fail = 2317.56%
DT model MAPE of prediction on signal 1~5s before an actual fail = 89.72%
DT model MAPE of prediction on signal 5~10s before an actual fail = 21.61%
DT model MAPE of prediction on signal over 10s before an actual fail = 11.99%


In [None]:
# tuning Gridsearch cv ...

In [80]:
cv = CrossValidator().setEstimator(dt_reg).setEvaluator(evaluator).setNumFolds(5) # set

In [114]:
dt_reg_paramGrid = ParamGridBuilder()\
    .addGrid(dt_reg.maxBins, [2**x for x in range(1,7)]) \
    .addGrid(dt_reg.maxDepth, list(range(5,31,5)))\
    .addGrid(dt_reg.minInfoGain,[0.1,1.0,10.0,15.0,20.0,30.0,50.0])\
    .addGrid(dt_reg.minInstancesPerNode, [5,10,20,30,50,100])\
    .build()
             

In [4]:
cv.setEstimatorParamMaps(dt_reg_paramGrid)
dt_reg_cvmodel = cv.fit(df_train)


In [116]:
dt_reg_cvmodel.bestModel._java_obj.getMaxBins(),\
dt_reg_cvmodel.bestModel._java_obj.getMaxDepth(),\
dt_reg_cvmodel.bestModel._java_obj.getMinInfoGain(),\
dt_reg_cvmodel.bestModel._java_obj.getMinInstancesPerNode()
# to narrow down the grid search range, repeate the step above

NameError: name 'dt_reg_cvmodel' is not defined

In [None]:
dt_reg_grid_test_rmse = evaluator.evaluate(dt_reg_cvmodel.bestModel.transform(df_test))
dt_reg_grid_test_rmse 

## RandomForest Regression

In [34]:
rf_reg = RandomForestRegressor(featuresCol = 'features', labelCol='y_target')
rf_regModel = rf_reg.fit(df_train)


In [35]:
rf_reg_feature_importance = list(zip([feature_names[i] for i in rf_regModel.featureImportances.indices],\
                                rf_regModel.featureImportances.values))
rf_reg_feature_importance.sort(key = lambda x: np.abs(x[1]),reverse=True)
[(x[0],np.round(x[1],4)) for x in rf_reg_feature_importance if x[1]>0.001]


[('q05_roll_std_10', 0.2453),
 ('q05_roll_std_100', 0.1909),
 ('q01_roll_std_10', 0.1372),
 ('q01_roll_std_100', 0.1184),
 ('ave_roll_std_100', 0.0447),
 ('q05_roll_std_1000', 0.039),
 ('Hann_window_mean', 0.0145),
 ('ave_roll_std_10', 0.0115),
 ('q95_roll_std_10', 0.0103),
 ('ave10', 0.0088),
 ('mean_change_rate_last_50000', 0.0086),
 ('avg_first_10000', 0.0073),
 ('min_roll_std_10', 0.0071),
 ('avg_first_50000', 0.0069),
 ('min_roll_std_100', 0.0065),
 ('abs_max', 0.0062),
 ('min_roll_std_1000', 0.0059),
 ('max_roll_std_1000', 0.0054),
 ('Moving_average_700_mean', 0.0051),
 ('mean', 0.0048),
 ('max_to_min_diff', 0.0047),
 ('max_first_10000', 0.0044),
 ('min_first_10000', 0.0043),
 ('q01_roll_std_1000', 0.0042),
 ('mean_change_rate_first_10000', 0.0041),
 ('q80', 0.0039),
 ('max_to_min', 0.0039),
 ('ave_roll_std_1000', 0.0037),
 ('std_last_50000', 0.0036),
 ('q999', 0.0034),
 ('skew', 0.0033),
 ('max1', 0.0033),
 ('abs_sum', 0.0032),
 ('mean_change_rate', 0.0032),
 ('Hilbert_mean', 0.

In [37]:
# model performance on training set

rf_yhats = rf_regModel.transform(df_train)
rf_yhats= rf_yhats.withColumn('abs_error', abs(rf_yhats.prediction-rf_yhats.y_target))\
    .withColumn('APE', abs((rf_yhats.prediction-rf_yhats.y_target)/rf_yhats.y_target))

rf_MAPE_train = rf_yhats.agg(avg('APE')).collect()[0][0]

print("RF model MAPE on train set = "+ '{:.2%}'.format(rf_MAPE_train))

rf_RMSE_train = evaluator.evaluate(rf_yhats)
print("RF model RMSE on train set = "+ '{:.4f}'.format(rf_RMSE_train))

RF model MAPE on train set = 842.64%
RF model RMSE on train set = 1.6356


In [38]:
# model performance on valid/test set
rf_preds = rf_regModel.transform(df_test)
rf_preds= rf_preds.withColumn('abs_error', abs(rf_preds.prediction-rf_preds.y_target))\
    .withColumn('APE', abs((rf_preds.prediction-rf_preds.y_target)/rf_preds.y_target))

rf_MAPE_test = rf_preds.agg(avg('APE')).collect()[0][0]

rf_RMSE_test = evaluator.evaluate(rf_preds)

print("RF model RMSE on test set = "+ '{:.4f}'.format(rf_RMSE_test))
print("RF model MAPE on test set = "+ '{:.2%}'.format(rf_MAPE_test))


RF model RMSE on test set = 2.6725
RF model MAPE on test set = 309.00%


In [39]:
# model performance on valid/test set per group

print("RF model MAPE of prediction on signal within 1s before an actual fail = "\
      + '{:.2%}'.format(rf_preds.filter(rf_preds.y_target<=1).agg(avg('APE')).collect()[0][0]))
print("RF model MAPE of prediction on signal 1~5s before an actual fail = "\
      + '{:.2%}'.format(rf_preds.filter(rf_preds.y_target>1).filter(rf_preds.y_target<=5).agg(avg('APE')).collect()[0][0]))
print("RF model MAPE of prediction on signal 5~10s before an actual fail = "\
      + '{:.2%}'.format(rf_preds.filter(rf_preds.y_target>5).filter(rf_preds.y_target<=10).agg(avg('APE')).collect()[0][0]))
print("RF model MAPE of prediction on signal over 10s before an actual fail = "\
      + '{:.2%}'.format(rf_preds.filter(rf_preds.y_target>10).agg(avg('APE')).collect()[0][0]))


RF model MAPE of prediction on signal within 1s before an actual fail = 2312.60%
RF model MAPE of prediction on signal 1~5s before an actual fail = 76.82%
RF model MAPE of prediction on signal 5~10s before an actual fail = 17.52%
RF model MAPE of prediction on signal over 10s before an actual fail = 14.99%


### grid search for linear reg

In [117]:
cv = CrossValidator().setEstimator(rf_reg).setEvaluator(evaluator).setNumFolds(5) # set

In [118]:
rf_reg_paramGrid = ParamGridBuilder()\
    .addGrid(rf_reg.maxBins, [2**x for x in range(1,7)]) \
    .addGrid(rf_reg.maxDepth, list(range(5,31,5)))\
    .addGrid(rf_reg.minInfoGain,[0.1,1.0,10.0,15.0,20.0,30.0,50.0])\
    .addGrid(rf_reg.minInstancesPerNode, [5,10,20,30,50,100])\
    .addGrid(rf_reg.subsamplingRate,[0.2,0.5,0.8,1.0])\
    .build()
             

In [121]:
cv.setEstimatorParamMaps(rf_reg_paramGrid)
rf_reg_cvmodel = cv.fit(df_train)


In [None]:
dt_reg_cvmodel.bestModel._java_obj.getMaxBins(),\
dt_reg_cvmodel.bestModel._java_obj.getMaxDepth(),\
dt_reg_cvmodel.bestModel._java_obj.getMinInfoGain(),\
dt_reg_cvmodel.bestModel._java_obj.getMinInstancesPerNode(),\
dt_reg_cvmodel.bestModel._java_obj.getSubsamplingRate()
# to narrow down the grid search range, repeate the step above

In [None]:
# grid-search best model performance on valid/test set
rf_grid_preds = rf_reg_cvmodel.bestModel.transform(df_test)

rf_reg_grid_test_rmse = evaluator.evaluate(rf_grid_preds)
print("grid search best RF model RMSE on test set = "+ '{:.4f}'.format(rf_reg_grid_test_rmse))

rf_grid_preds= rf_grid_preds.withColumn('abs_error', abs(rf_grid_preds.prediction-rf_grid_preds.y_target))\
    .withColumn('APE', abs((rf_grid_preds.prediction-rf_grid_preds.y_target)/rf_grid_preds.y_target))

rf_grid_MAPE_test = rf_grid_preds.agg(avg('APE')).collect()[0][0]
print("grid search best RF model MAPE on test set = "+ '{:.2%}'.format(rf_grid_MAPE_test))


In [None]:
# model performance on valid/test set per group

print("RF model MAPE of prediction on signal within 1s before an actual fail = "\
      + '{:.2%}'.format(rf_grid_preds.filter(rf_grid_preds.y_target<=1).agg(avg('APE')).collect()[0][0]))
print("RF model MAPE of prediction on signal 1~5s before an actual fail = "\
      + '{:.2%}'.format(rf_grid_preds.filter(rf_grid_preds.y_target>1).filter(rf_grid_preds.y_target<=5).agg(avg('APE')).collect()[0][0]))
print("RF model MAPE of prediction on signal 5~10s before an actual fail = "\
      + '{:.2%}'.format(rf_grid_preds.filter(rf_grid_preds.y_target>5).filter(rf_grid_preds.y_target<=10).agg(avg('APE')).collect()[0][0]))
print("RF model MAPE of prediction on signal over 10s before an actual fail = "\
      + '{:.2%}'.format(rf_grid_preds.filter(rf_grid_preds.y_target>10).agg(avg('APE')).collect()[0][0]))


In [None]:
#predict
# dt_reg_cvmodel.bestModel.transform(#actual test dataset#)