# This is an example of the solution for assignment 2 qestion 1

## Import spark and create spark  session

In [None]:
import pyspark
import os
from pyspark.sql import SparkSession
import datetime
import  numpy as np
from pyspark.sql.types import *

In [None]:
import os
import subprocess
def module(*args):        
    if isinstance(args[0], list):        
        args = args[0]        
    else:        
        args = list(args)        
    (output, error) = subprocess.Popen(['/usr/bin/modulecmd', 'python'] + args, stdout=subprocess.PIPE).communicate()
    exec(output)    
module('load', 'apps/java/jdk1.8.0_102/binary')    
os.environ['PYSPARK_PYTHON'] = os.environ['HOME'] + '/.conda/envs/jupyter-spark/bin/python'


In [None]:
spark = SparkSession.builder \
        .appName("COM6012 Assignment2 Question 1") \
        .config("spark.local.dir","/fastdata/acq18mc")\
        .config("spark.sql.warehouse.dir", "/fastdata/acq18mc/pyspark/spark-warehouse/")\
        .config("hive.metastore.warehouse.dir", "/fastdata/acq18mc/pyspark/spark-warehouse/")\
        .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")


## load in the dataset

In [None]:
# load csv gz  data as DataFrame
print("start")
startTime = datetime.datetime.now()
#load data as dataframe
data = spark.read.option("inferschema",False).csv("/fastdata/acq18mc/HIGGS.csv.gz")
#cache the data from later use
data.cache()
#register data to a temp table, for later sql use. 
data.registerTempTable("data")
print(f"whole data size is {data.count()} loading data takes {(datetime.datetime.now()-startTime).total_seconds()}")

## convert string to double type

In [None]:
columnName = data.columns

labelColumnName = columnName[0]

featureColumnName = columnName[1:]
# composing a string with sql instructions to cast every column to double. 
convertToDoubleSQLString = ""
for i, name in enumerate(columnName):
    if i <len(columnName)-1:
        convertToDoubleSQLString+="cast("+name+" as double) ,"
    elif i==len(columnName)-1:
        convertToDoubleSQLString+="cast("+name+" as double)"
    else:
        pass
startTime = datetime.datetime.now()
#execute the sql instruction
doubleData = spark.sql(f"select {convertToDoubleSQLString} from data")
doubleData.registerTempTable("doubleData")
doubleData.cache()
print(f"transfered to double, and takes {(datetime.datetime.now()- startTime).total_seconds()}")


## convert data to vectors

In [None]:
#convert to vector
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(inputCols = featureColumnName, outputCol = "Feature" )
processedData = assembler.transform(doubleData)

processedData.registerTempTable("TempData")
#change the column name to the spark ml models' default col names
processedData = spark.sql("select _c0 as label, Feature as features from TempData")
print("get vector for feature ")

## split dataset

 1. extract a 5% subset and split it
 2. split the whole dataset
 3. cache each dataset

In [None]:
from pyspark.ml.classification import *
from pyspark.ml.regression import *
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

SEED=1234
#get 5% of the whole data
subSetData,_ = processedData.randomSplit([0.05, 0.95], seed = SEED)
subSetData.cache()
print(f"sub set size is {subSetData.count()}")

#split the sub set
(subTrainingData, subTestData) = subSetData.randomSplit([0.7, 0.3], seed = SEED)
subTrainingData.cache()
subTestData.cache()
print(f"sub set training set size is {subTrainingData.count()}")
#split the whole set
trainingData, testData = processedData.randomSplit([0.6,0.4], seed = SEED)
print(f"whole training set size is {trainingData.count()}")
trainingData.cache()
testData.cache()

## training
 1. check model name
 2. setup param grid
 3. cross validation on 5% dataset
 4. extract the best pramaters and report classification results
 5. train on the whole dataset and report the results
 6. find the best three features

In [None]:
# this function is doing three repeated process, on three different models.
# we use a modelName parameter and eval() to switch on  models.     
def findBestParametersAndTrainAgain(modelName):
    #check if input parameter is string
    if not isinstance( modelName, str) :
        raise Exception("Wrong Input")
    #if input string is one of the demanded model names
    if  modelName not in [ "RandomForestClassifier", "GBTClassifier"] :
        raise Exception("Wrong Input")

    
    startTime = datetime.datetime.now()
    
    paramGrid = None
    #create the model by given modelName
    model = eval(modelName)(featuresCol='features', labelCol='label',predictionCol='prediction')
    #we use binary evaluater for AUR, multi evaluator for accuracy
    evaluator = BinaryClassificationEvaluator()
    multiEvaluator = None
  
    #have a switch here to set up param grid for models. defferent model need different param grid
             
    if modelName == "RandomForestClassifier":
        paramGrid = ParamGridBuilder() \
            .addGrid(model.maxDepth, [10,5,15]) \
            .addGrid(model.maxBins, [32,20,15])\
            .addGrid(model.minInfoGain, [0.0,0.2,0.1])\
            .addGrid(model.impurity, ['gini','entropy'])\
            .build()
        multiEvaluator = MulticlassClassificationEvaluator(metricName='accuracy')
        pipeline = Pipeline(stages=[ model])
        
    elif modelName == "GBTClassifier":
        paramGrid = ParamGridBuilder() \
            .addGrid(model.maxDepth, [15, 10, 5]) \
            .addGrid(model.maxBins, [32, 20, 15])\
            .addGrid(model.minInfoGain, [0.0, 0.2, 0.3])\
            .build()
        pipeline = Pipeline(stages=[model])
        multiEvaluator = MulticlassClassificationEvaluator(metricName='accuracy',predictionCol='rawPrediction')
    else:
        raise Exception
        
    
    crossVal = CrossValidator(estimator = pipeline, estimatorParamMaps = paramGrid, evaluator = evaluator , numFolds = 3 )
    cvModel = crossVal.fit(subTrainingData)
    prediction = cvModel.transform(subTestData)
    print(f"for {modelName}, cross validation best model  {evaluator.getMetricName()} :  {evaluator.evaluate(prediction)}, training takes time {(datetime.datetime.now()-startTime).total_seconds()} ")
    if multiEvaluator:
        print(f"for {modelName} , acuracy is {multiEvaluator.evaluate(prediction)}")
    #get the best model parameters obtained from cross validation process. 
    bestPipeline = cvModel.bestModel
    bestModel = bestPipeline.stages[0]
    bestParams = bestModel.extractParamMap()
    for param, value in bestParams.items():
        print(f"{param.name} : {value}")
    #create a new model to train it on the whole dataset, with the best parameter obtained above.
    startTime = datetime.datetime.now()
    newModel = eval(modelName)()
    newModel = newModel.fit(trainingData,bestParams)
    #transform test set
    newPrediction = newModel.transform(testData)

    print(f"new model for {modelName}  {evaluator.getMetricName()} :  {evaluator.evaluate(newPrediction)}, training takes time {(datetime.datetime.now()-startTime).total_seconds()} ")

    if multiEvaluator:
        print(f"new model for {modelName} , acuracy is {multiEvaluator.evaluate(newPrediction)}")

    # get feature importances or coefficients


    if modelName == "LogisticRegression":
        coefficients = newModel.coefficients
        print(f"get LR new model coefficients {coefficients} with length {len(coefficients)}")
        maxIndex = np.argmax(coefficients)
        print(f"LR model best feature index in {maxIndex} with coefficient {coefficients[maxIndex]}")
        
    else:
        featureImportances = list(newModel.featureImportances.toArray())
        print(f"{modelName} feature importance is {featureImportances} with length {len(featureImportances)}")
        maxIndex = np.argmax(featureImportances)
        print(f"{modelName} best feature index in {maxIndex} with importance  {featureImportances[maxIndex]}")

In [None]:

findBestParametersAndTrainAgain("RandomForestClassifier")



In [None]:
findBestParametersAndTrainAgain("GBTClassifier")