In [0]:
#Import Libraries
#import mlflow
#import mlflow.spark
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
#from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import PolynomialExpansion, PCA
from pyspark.sql.functions import length, col, explode
from pyspark.ml.functions import vector_to_array



##Basic EDA

In [0]:
#Read parquet 
data = spark.read.parquet("/tmp/output/higgsdf.parquet")

In [0]:
#Check data
data.show(5)

In [0]:
#Check the label distribution
labelcounts = data.groupBy('label').count().show()

In [0]:
# import sql function pyspark
import pyspark.sql.functions as f

# null values in each column
data_agg = data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in data.columns])
data_agg.show()

In [0]:
features =['feature1','feature2','feature3','feature4','feature5','feature6','feature7','feature8','feature9','feature10','feature11','feature12','feature13','feature14','feature15','feature16','feature17','feature18','feature19','feature20','feature21','feature22','feature23','feature24','feature25','feature26','feature27','feature28']

##Vanilla Pipeline

In [0]:
stages = [] #stages in pipeline

In [0]:
#Start by using vector assembler to truncate all the columns
vecAssembler = VectorAssembler(inputCols = features, outputCol="features")
stages += [vecAssembler]

In [0]:
#Check to see vector assembler output

testdf = vecAssembler.transform(data)
testdf.head().features

In [0]:
#Scale the data
scaler = MinMaxScaler( min=-1.0, max=1.0,inputCol="features", outputCol="scaledFeatures")
stages += [scaler]
stages

In [0]:
#Test the scaler
# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(testdf)

# rescale each feature to range [min, max].
testdf2 = scalerModel.transform(testdf)

In [0]:
#Build baseline classifier
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

stages += [lr]

In [0]:
#Split train and test label
train,test = data.randomSplit([0.8, 0.2])

In [0]:
#Check the ratio, if balanced no need to do stratified sampling
train.groupBy('label').count().show()

In [0]:


#Build the pipeline
#with mlflow.start_run(run_name='logreg_baseline') as run:
Regpipeline = Pipeline(stages = stages) 
  
model = Regpipeline.fit(train)
  
predicted = model.transform(test)
  
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='label', metricName='areaUnderROC', numBins=1000)

accuracy = evaluator.evaluate(predicted)



In [0]:
#Record model training times in dictionary for comparative analysis later
model_metrics = { }
model_metrics['Baseline'] = {}
model_metrics['Baseline']['Time'] = 5.80
model_metrics['Baseline']['Features'] = 28
model_metrics['Baseline']['AUC'] = 0.66


print(model_metrics)

In [0]:
predicted = model.transform(train)

In [0]:
predicted.select('features', 'label', 'rawPrediction', 'probability', 'prediction').show()

In [0]:


evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='label', metricName='areaUnderROC', numBins=1000)

accuracy = evaluator.evaluate(predicted)

In [0]:
print(accuracy)

# Pipeline 2nd Iteration - observe classifier change

In [0]:
stages2 =[]
stages2 += [vecAssembler]

In [0]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
stages2 += [rf]

In [0]:
Rfpipeline = Pipeline(stages = stages2) 
  
model = Rfpipeline.fit(train)
  
predicted = model.transform(test)

In [0]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='label', metricName='areaUnderROC', numBins=1000)

accuracy = evaluator.evaluate(predicted)

print(accuracy)

In [0]:
model_metrics['RFBaseline'] = {}
model_metrics['RFBaseline']['Time'] = 9.95
model_metrics['RFBaseline']['Features'] = 28
model_metrics['RFBaseline']['AUC'] = 0.73
print(model_metrics)

#Pipeline 3rd Iteration - Feature Expansion

In [0]:
#Initialise stages of 3rd pipeline
stages3 = []
stages3 += [vecAssembler]

In [0]:
#Instantiate polyexpansion object
polyExpansion = PolynomialExpansion(degree=2, inputCol="features", outputCol="polyFeatures")

#Add object to stages
stages3 += [polyExpansion]

In [0]:
rf2 = RandomForestClassifier(featuresCol = 'polyFeatures', labelCol = 'label')

In [0]:
#Carryover randomforestclassifier object from previous pipeline over
stages3 += [rf2]

#Check stages
stages3

In [0]:
#Build pipeline 3
pipeline3 = Pipeline(stages = stages3) 
  
model3 = pipeline3.fit(train)
  
predicted3 = model3.transform(test)

In [0]:
#Check the number of features that was blown out by the 2nd degree poly expansion
predicted3.head()

In [0]:
evaluator3 = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='label', metricName='areaUnderROC', numBins=1000)

accuracy = evaluator3.evaluate(predicted3)

In [0]:
print(accuracy)

In [0]:
model_metrics['RFPoly1'] = {}
model_metrics['RFPoly1']['Time'] = 34
model_metrics['RFPoly1']['Features'] = 414
model_metrics['RFPoly1']['AUC'] = 0.74
print(model_metrics)

#Pipeline Iteration 4(4th degree polynomial expansion) (insufficient memory with community edition cluster will run on paid)

In [0]:
#Initialise stages of 4th pipeline
stages4 = []
stages4 += [vecAssembler]

In [0]:
#Instantiate polyexpansion object
polyExpansion2 = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")

#Add object to stages
stages4 += [polyExpansion2]

In [0]:

#Carryover randomforestclassifier object from previous pipeline over
stages4 += [rf2]

#Check stages
stages4

In [0]:
#Build pipeline 4
pipeline4 = Pipeline(stages = stages4) 
  
model4 = pipeline4.fit(train)
  
predicted4 = model4.transform(test)

In [0]:
evaluator4 = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='label', metricName='areaUnderROC', numBins=1000)
 
accuracy = evaluator4.evaluate(predicted4)

#Pipeline iteration 5 PCA on 2nd degree expansion

In [0]:
#Initialise stages of 4th pipeline
stages5 = []
stages5 += [vecAssembler]

In [0]:
#Instantiate polyexpansion object
polyExpansion3 = PolynomialExpansion(degree=2, inputCol="features", outputCol="polyFeatures")

#Add object to stages
stages5 += [polyExpansion3]

In [0]:
pca = PCA(k=5, inputCol="polyFeatures",outputCol="PCAfeatures")

stages5 += [pca]

In [0]:
rf3 = RandomForestClassifier(featuresCol = 'PCAfeatures', labelCol = 'label')

In [0]:
#Check stages before building pipelien
stages5 += [rf3]
stages5

In [0]:
#Build pipeline 5
pipeline5 = Pipeline(stages = stages5) 
  
model5 = pipeline5.fit(train)
  
predicted5 = model5.transform(test)

In [0]:
evaluator5 = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='label', metricName='areaUnderROC', numBins=1000)
 
accuracy = evaluator5.evaluate(predicted5)

In [0]:
predicted5.head()

In [0]:
print(accuracy)

In [0]:
model_metrics['RFPolyPCA5'] = {}
model_metrics['RFPolyPCA5']['Time'] = 37
model_metrics['RFPolyPCA5']['Features'] = 5
model_metrics['RFPolyPCA5']['AUC'] = 0.59
print(model_metrics)

#PCA on original dataset

In [0]:
#Initialise stages of 6th pipeline
stages6 = []
stages6 += [vecAssembler]

In [0]:
pca = PCA(k=15, inputCol="features" ,outputCol="PCAfeatures")

stages6 += [pca]

In [0]:
stages6

In [0]:
rf4 = RandomForestClassifier(featuresCol = 'PCAfeatures', labelCol = 'label')
stages6 += [rf4]

In [0]:
#Build pipeline 6
pipeline6 = Pipeline(stages = stages6) 
  
model6 = pipeline6.fit(train)
  
predicted6 = model6.transform(test)

In [0]:
evaluator6 = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='label', metricName='areaUnderROC', numBins=1000)
 
accuracy = evaluator6.evaluate(predicted6)

print(accuracy)

In [0]:
model_metrics['RFPPCA15'] = {}
model_metrics['RFPPCA15']['Time'] = 14
model_metrics['RFPPCA15']['Features'] = 15
model_metrics['RFPPCA15']['AUC'] = 0.61
print(model_metrics)