In [0]:
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql import Row

import numpy as np
np.set_printoptions(suppress=True)

In [0]:
import json

base_path = '/mnt/sparky/data/artificial/'
folder_name = '04-05-2021T13-55-04'

# Folder names for different datasets
# 04-05-2021T13-55-04    70 useful 30 useless
# 06-05-2021T16-26-37    30 useful 70 useless
# 06-05-2021T16-31-36    50 useful 50 useless

prof_path = base_path + folder_name + '/' + folder_name + '-profiler.json'
xy_path = base_path + folder_name + '/' + folder_name + '-xy.csv'

df = spark.read.format('csv').option('header', 'false').load(xy_path)
df.cache()

prof = spark.read.format('json').load(prof_path)
prof.cache()

In [0]:
display(df)

In [0]:
# Shuffling dataset

import random

seed = 2143

# shuffle columns (features)
cols = df.columns
random.Random(seed).shuffle(cols)
df = df.select(*cols)

# shuffle rows (data points)
df = df.orderBy(F.rand(seed=seed))

# sanity check columns randomiser
# cols = df.columns
# random.Random(rand_seed).shuffle(cols)
# print(cols)

# sanity check row randomiser
# display(df.orderBy(F.rand(seed=rand_seed)))
# display(df.orderBy(F.rand(seed=rand_seed)))

In [0]:
display(df)

In [0]:
display(prof)

In [0]:
label = 'label'

# rename the label column
df = df.withColumnRenamed('_c0', label)

# replace -1 with 0, some library does not like negative values
df = df.withColumn(label, F.when(df.label == -1, 0).otherwise(df.label))

# convert everything to float
df = df.select([F.col(c).cast('float').alias(c) for c in df.columns])

# convert the label column to double, the evaluator likes doubles
df = df.withColumn(label, df.label.cast('double'))

In [0]:
# transforming continuous variables to discrete
from pyspark.ml.feature import QuantileDiscretizer

for col in df.columns:
  if col == "label":
    continue
    
  col_name = col + "_discrete"
  df = QuantileDiscretizer(numBuckets=100, inputCol=col,outputCol=col_name).fit(df).transform(df)

In [0]:
cols_discr = df.columns

for col in df.columns:
  if "_discrete" not in col or col == "label":
    cols_discr.remove(col)


In [0]:
df_discr = df.select(cols_discr + ["label"])
display(df_discr)

In [0]:
df = df.drop(*cols_discr)

In [0]:
# get known useful features from the profiler
def get_useful_feats(profiler):
  return  np.sort([x - 1 for x in profiler.select('idx_useful_feat').collect()[0].idx_useful_feat])

# get features ranked by chi-square test
def get_chi_ranking(vals):
  return vals.argsort()[::-1]

# get the chi-squared statistic for each feature
def get_chi_vals(chi_test):
  return chi_test.select("statistics").collect()[0].statistics

# get intersection of known useful features and selected useful features
def get_inters(arr_1, arr_2):
  return [x for x in arr_1 if x in arr_2]

# get the known useful features which were not selected by the feature selection methods
def get_diff(arr_1, arr_2):
  return [x for x in arr_1 if x not in arr_2]

In [0]:
from pyspark.ml.feature import VectorAssembler

# create vector assembler for discrete values
featuresCols = df_discr.columns
featuresCols.remove(label)
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol='features')

In [0]:
from pyspark.ml.stat import ChiSquareTest

In [0]:
# Calculating the chi-square test for all features
chiSqTest = ChiSquareTest.test(vectorAssembler.transform(df_discr), 'features', label)

In [0]:
# Comparing best known features from artificial dataset with top features selected from the chi-square test
bestKnown = get_useful_feats(prof)
chiVals = get_chi_vals(chiSqTest)

# order the most important n features where n = no. useful features
bestChi = np.sort(get_chi_ranking(chiVals)[:len(bestKnown)])

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier

# Testing different parameter values of numTrees
rfEnsemble = RandomForestClassifier(featuresCol='features', labelCol=label, seed=seed, numTrees=30) # default numTrees=20

In [0]:
# create vector assembler for continuous values
featuresCols = df.columns
featuresCols.remove(label) 
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol='features')

In [0]:
from pyspark.ml import Pipeline

rfPipeline = Pipeline(stages=[vectorAssembler, rfEnsemble])
rfPipeline_model = rfPipeline.fit(df)

In [0]:
rfTree = rfPipeline_model.stages[-1]
rfGinis = rfTree.featureImportances.toArray()

In [0]:
rf_best_ginis = np.sort(rfGinis.argsort()[::-1][:len(bestKnown)])

print('Known useful features:\t', bestKnown)

#Chi-square results
print('Best Chi-sq features:\t', bestChi)
print('Chi-sq feature selection accuracy: \t', len(get_inters(bestKnown, bestChi)) / len(bestKnown))
print('Best known features not selected by Chi-sq: \t', get_diff(bestKnown, bestChi))

#RF results
print('Best RF features:\t', rf_best_ginis)
print('RF feature selection accuracy: \t', len(get_inters(bestKnown, rf_best_ginis)) / len(bestKnown))
print('Best known features not selected by RF: \t', get_diff(bestKnown, rf_best_ginis))

In [0]:
# new dataframe with just Chi chosen features 
chiColNames = []

for col in bestChi:
  #might need to get rid of this line
  chiColNames.append("_c" + str(col + 1)) 
  
chiColNames.append('label')
chiDf = df.select(chiColNames)

# new dataframe with just best known features 
bestColNames = []

for col in bestKnown:
  #might need to get rid of this line
  bestColNames.append("_c" + str(col + 1)) 
  
bestColNames.append('label')
bestDf = df.select(bestColNames)

# new dataframe with just RF chosen features 
rfColNames = []

for col in rf_best_ginis:
  #accuracy goes up when I get rid of this line for RF and GBT?
  rfColNames.append("_c" + str(col + 1)) 

rfColNames.append('label')
rfDf = df.select(rfColNames)

In [0]:
# training SVM on different datasets
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
# before preprocessing
train, test = df.randomSplit([0.7, 0.3], seed)

trainData = vectorAssembler.transform(train)
testData = vectorAssembler.transform(test)

svc = LinearSVC(maxIter=100, regParam=0.01)

In [0]:
svcModel = svc.fit(trainData)

In [0]:
prediction = svcModel.transform(testData)

# evaluation
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(prediction)

print("Accuracy (no pre-processing): ", accuracy)

In [0]:
# best known selection training

# create vector assembler for new chi squared df
bestFeaturesCols = bestDf.columns
bestFeaturesCols.remove(label)
bestVectorAssembler = VectorAssembler(inputCols=bestFeaturesCols, outputCol='features')
bestTrain, bestTest = bestDf.randomSplit([0.7, 0.3], seed)

bestTrainData = bestVectorAssembler.transform(bestTrain)
bestTestData = bestVectorAssembler.transform(bestTest)

In [0]:
bestSvcModel = svc.fit(bestTrainData)

In [0]:
bestPrediction = bestSvcModel.transform(bestTestData)

bestAccuracy = evaluator.evaluate(bestPrediction)
print("Accuracy (best features): ", bestAccuracy)

In [0]:
# chi squared selection training

# create vector assembler for new chi squared df
chiFeaturesCols = chiDf.columns
chiFeaturesCols.remove(label)
chiVectorAssembler = VectorAssembler(inputCols=chiFeaturesCols, outputCol='features')
chiTrain, chiTest = chiDf.randomSplit([0.7, 0.3], seed)

chiTrainData = chiVectorAssembler.transform(chiTrain)
chiTestData = chiVectorAssembler.transform(chiTest)

In [0]:
chiSvcModel = svc.fit(chiTrainData)

In [0]:
chiPrediction = chiSvcModel.transform(chiTestData)

chiAccuracy = evaluator.evaluate(chiPrediction)
print("Accuracy (Chi-Squared): ", chiAccuracy)

In [0]:
# random forest selection training

# create vector assembler for new RF df
rfFeaturesCols = rfDf.columns
rfFeaturesCols.remove(label)
rfVectorAssembler = VectorAssembler(inputCols=rfFeaturesCols, outputCol='features')
rfTrain, rfTest = rfDf.randomSplit([0.7, 0.3], seed)

rfTrainData = rfVectorAssembler.transform(rfTrain)
rfTestData = rfVectorAssembler.transform(rfTest)

In [0]:
rfSvcModel = svc.fit(rfTrainData)

In [0]:
rfPrediction = rfSvcModel.transform(rfTestData)

rfAccuracy = evaluator.evaluate(rfPrediction)
print("Accuracy (RF): ", rfAccuracy)