In [1]:
#import necessary libraries and set up pyspark environment
import sys,os,glob,math
import pandas as pd
os.environ["PYSPARK_PYTHON"] = "C:/Users/z001133/AppData/Local/Continuum/anaconda3/python.exe"
os.environ["SPARK_HOME"] = "D:/Public/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.appName("GrabChallengeSafety").master("local[*]").config('spark.executor.memory','20G').config('spark.driver.memory','10G').getOrCreate()

In [2]:
#import classification ML libraries from pyspark.ml
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StandardScaler, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression,LogisticRegressionModel,DecisionTreeClassifier,\
    DecisionTreeClassificationModel,RandomForestClassifier,GBTClassifier,NaiveBayes, NaiveBayesModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.cross_validation import train_test_split
import xgboost as xgb
from xgboost import XGBClassifier



In [3]:
#get the preprocessed data in .csv file to a dataframe
os.chdir('C:\\Users\\z001133\\IdeaProjects\\pyLearning\\data\\for_model')
df_01 = pd.DataFrame()
for file in glob.glob("*.csv"):
    print(file)
    df_00 = pd.read_csv(file)
    df_01 = df_01.append(df_00,ignore_index=True)
    del df_00
df_02 = spark.createDataFrame(df_01)
del df_01
(trainingData, testData) = df_02.randomSplit([0.90, 0.10], seed = 100)

part-00000-bde078b0-f39b-4de5-bef9-12ab14e0b2c2-c000.csv


In [4]:
#build spark stages for model building pipeline
def build_model_pipeline(algo):
    safety_stringIdx = [StringIndexer(inputCol="label",outputCol="class",handleInvalid="skip")]
    assembler = [VectorAssembler(inputCols=["bearing","change_in_accuracy","change_in_bearing","change_in_speed",
                                            "degreesZ","speed","acc_magn","time_per_turn","distance_per_turn","no_of_turns",
                                            "change_in_dist","change_in_accelaration","total_ditance","change_in_gyro",
                                            "accuracy","total_time"
                                            ], outputCol="features")]
    if algo == 'log':
        mlalgo = [build_logistic()]
    else:
        if algo == 'decisiontree':
            mlalgo = [DecisionTreeClassifier(labelCol="class",featuresCol="features",maxBins=405)]
        else:
            if algo == "randomforest":
                mlalgo = [RandomForestClassifier(labelCol="class",featuresCol="features",numTrees=10)]
            else:
                if algo == "gbt":
                    mlalgo = [GBTClassifier(labelCol="class", featuresCol="features", maxIter=10)]
                else:
                    if algo == "nbayes":
                        mlalgo = [build_naive_bayes()]
                    else:
                        if algo == "xgb":
                            mlalgo = [build_xgboost_classifier()]
                        else:
                            mlalgo = [RandomForestClassifier(labelCol="class",featuresCol="features",numTrees=10)]
                            
    return Pipeline(stages= safety_stringIdx + assembler + mlalgo)

In [5]:
#function to predict the classification and its confidence level by probability value
def accur(model,filenm,df):
    preds = model.transform(df)
    accuracy = preds.filter(preds.label == preds.prediction).count() / float(preds.count())
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    roc_auc = evaluator.evaluate(preds)
    preds.orderBy("probability", ascending=False) \
        .toPandas() \
        .to_csv("C:\\Users\\z001133\\IdeaProjects\\pyLearning\\data\\result" + "\\" + filenm + ".csv")
    print("Accuracy Score: {0:.4f}".format(accuracy))
    print("ROC-AUC: {0:.4f}".format(roc_auc))
    return

In [6]:
#function to define logistics regression
def build_logistic():
    return LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0, family=binomial)

In [9]:
#building logistic regression classification model
model = build_model_pipeline("log").fit(trainingData)
test_result = "log_model_result"
accur(model,test_result,testData)
PicklePath = "C:\\Users\\z001133\\IdeaProjects\\pyLearning\\model\\log\\log_model.pkl"
model.save(PicklePath)

Accuracy Score: 0.7578
ROC-AUC: 0.6667


In [10]:
#building decision tree classification model
model = build_model_pipeline("decisiontree").fit(trainingData)
test_result = "dt_model_result"
accur(model,test_result,testData)
PicklePath = "C:\\Users\\z001133\\IdeaProjects\\pyLearning\\model\\dt\\dt_model.pkl"
model.save(PicklePath)

Accuracy Score: 0.7752
ROC-AUC: 0.7125


In [11]:
#building random forest classification model
model = build_model_pipeline("randomforest").fit(trainingData)
test_result = "rf_model_result"
accur(model,test_result,testData)
PicklePath = "C:\\Users\\z001133\\IdeaProjects\\pyLearning\\model\\rf\\rf_model.pkl"
model.save(PicklePath)

Accuracy Score: 0.7726
ROC-AUC: 0.7101


In [12]:
#building gradient boost classification model
model = build_model_pipeline("gbt").fit(trainingData)
test_result = "gbt_model_result"
accur(model,test_result,testData)
PicklePath = "C:\\Users\\z001133\\IdeaProjects\\pyLearning\\model\\gbt\\gbt_model.pkl"
model.save(PicklePath)

Accuracy Score: 0.7731
ROC-AUC: 0.7160
