In [1]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Spark_GBM").getOrCreate()
spark.sparkContext._conf.getAll()

[('spark.history.kerberos.keytab', 'none'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.history.ui.port', '18081'),
 ('spark.driver.extraLibraryPath',
  '/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64'),
 ('spark.executor.extraLibraryPath',
  '/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64'),
 ('spark.port.maxRetries', '128'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://hdp002.cac.queensu.ca:8088/proxy/application_1548786446322_4811'),
 ('spark.history.provider',
  'org.apache.spark.deploy.history.FsHistoryProvider'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.yarn.historyServer.address', 'hdp001.cac.queensu.ca:18081'),
 ('spark.driver.port', '36691'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
 ('spark.app.id', 'application_154

In [2]:
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '95g'), ('spark.app.name', 'Spark_GBM'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','95g')])
#Stop the current Spark Session

spark.sparkContext.stop()
#Create a Spark Session

spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
file_location = "/user/mie_sbetancourt/PROJECT/Data/data_reduced_reweighted_FINAL_3.csv"

from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import DoubleType, StringType, IntegerType

# Import the data into a Spark DataFrame with the schema 
data = spark.read.format("csv").option("header","true").option("inferSchema","true").load(file_location)


#data = spark.read.csv(file_location, header=True, inferSchema=True)
data = (data.drop("Census_FirmwareVersionIdentifier")
        .withColumn("AVProductsEnabled", when(data["AVProductsEnabled"]=="unknown", -1).otherwise(data["AVProductsEnabled"])))
    #.withColumn("classWeightCol", when(data["classWeightCol"]>.5, 0.091).otherwise(0.908)))

data_1 = (data.withColumn("AVProductsEnabled", data["AVProductsEnabled"].cast(IntegerType()))
          .withColumn("AvSigVersion_new", data["AvSigVersion_new"].cast(StringType()))
          .withColumn("Census_OSBuildNumber", data["Census_OSBuildNumber"].cast(StringType()))
          .withColumn("Census_OSBuildRevision", data["Census_OSBuildRevision"].cast(StringType()))
          .withColumn("Census_OSUILocaleIdentifier", data["Census_OSUILocaleIdentifier"].cast(StringType()))
          .withColumn("Census_OSVersion_new", data["Census_OSVersion_new"].cast(StringType()))
          .withColumn("CountryIdentifier", data["CountryIdentifier"].cast(StringType()))
          .withColumn("LocaleEnglishNameIdentifier", data["LocaleEnglishNameIdentifier"].cast(StringType()))
          .withColumn("OsBuild", data["OsBuild"].cast(StringType()))
          .withColumn("OsSuite", data["OsSuite"].cast(StringType())))
data_1 = data_1.withColumnRenamed("HasDetections","label").drop("OsBuildLab_new")

In [4]:
stringCols = []
for col in data_1.dtypes:
    if col[1] == 'string':
        stringCols.append(col[0])

In [5]:
stringCols.pop(0)
#stringCols

'MachineIdentifier'

In [6]:
import numpy as np
numericCols = np.setdiff1d(data_1.columns, stringCols).tolist()
numericCols.remove("MachineIdentifier")
numericCols.remove("classWeightCol")
numericCols.remove("label")

In [7]:
sampling_seed=1111
trainingData1 = data_1.sampleBy("label", fractions={0: .1, 1: 1}, seed=sampling_seed)
trainingData = trainingData1.sampleBy("label", fractions={0: .9, 1: .9}, seed=sampling_seed)
# Subtracting 'train' from original 'data' to get test set 
testData = trainingData1.subtract(trainingData)

In [8]:
trainingData.groupBy("label").count().show()

+-----+------+
|label| count|
+-----+------+
|    1|401788|
|    0|402132|
+-----+------+



In [None]:
temp_path = "/user/mie_sbetancourt/PROJECT/"

from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import time
start_time = time.time()

sampling_seed=1111

# The index of string values multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c),handleInvalid="keep")
    for c in stringCols
]

# The encode of indexed vlaues multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]

gbt = (GBTClassifier(labelCol="label", featuresCol="features"))
      
# Vectorizing encoded values
assembler = VectorAssembler(inputCols=([encoder.getOutputCol() for encoder in encoders] + numericCols),outputCol="features")   
      
#pipeline = Pipeline(stages=indexers + encoders+[assembler]+lr)
pipeline = Pipeline(stages=indexers + encoders+[assembler]+[gbt])

estimatorParam = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [4, 6, 8]) \
    .addGrid(gbt.maxBins, [15, 25])  \
    .addGrid(gbt.stepSize, [0.1, 0.05]) \
    .addGrid(gbt.subsamplingRate, [.7]) \
    .build()

evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction")
  
gbt_crossval = CrossValidator(estimator=pipeline,
                         estimatorParamMaps=estimatorParam,
                         evaluator=evaluator,
                         numFolds=3,
                         parallelism=8,
                         seed=sampling_seed)

gbt_cvmodel = gbt_crossval.fit(trainingData)      
# Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
# is areaUnderROC.

gbt_path = temp_path + "/gbt"
gbt.save(gbt_path)
model_path = temp_path + "/gbt_model"
gbt_cvmodel.bestModel.save(model_path)


print("--- %s seconds ---" % (time.time() - start_time))

In [9]:
gbt_path = temp_path + "/gbt"
gbt.save(gbt_path)
model_path = temp_path + "/gbt_model"
gbt_cvmodel.bestModel.save(model_path)

In [12]:
from pyspark.ml import *
gbt_model_path = temp_path + "/gbt_model"
gbt_mod2 = PipelineModel.load(gbt_model_path)
gbt_predictions = gbt_mod2.transform(testData)
gbt_train_predictions = gbt_mod2.transform(trainingData)
print("The area under ROC for train set after CV  is {}".format(evaluator.evaluate(gbt_train_predictions)))
print("The area under ROC for test set after CV  is {}".format(evaluator.evaluate(gbt_predictions)))
print ('Best maxDepth: ', gbt_mod2.stages[-1]._java_obj.getMaxDepth()
print ('Best maxBins: ', gbt_mod2.stages[-1]._java_obj.getMaxBins())

The area under ROC for train set after CV  is 0.689483203950502
The area under ROC for test set after CV  is 0.657841253989691
Best maxDepth:  8
Best maxBins:  25


In [13]:
print('Best stepSize: ', gbt_mod2.stages[-1]._java_obj.getStepSize())

Best stepSize:  0.1


In [None]:
'''
1st try ---- 4 hours

gbt.maxDepth, [4, 8, 12]) \
    .addGrid(gbt.maxBins, [20, 25])  \
    .addGrid(gbt.stepSize, [0.1, 0.05]) \
    
The area under ROC for train set after CV  is 0.689483203950502
The area under ROC for test set after CV  is 0.657841253989691
Best maxDepth:  8
Best maxBins:  25
Best stepSize:  0.1    

'''


In [None]:
'''2nd place lightGBM
best_hyp = {'boosting_type': 'gbdt',
            'class_weight': None,
            'colsample_bytree': 0.2685319471585845,
            'learning_rate': 0.011114918157721438,
            'min_child_samples': 270,
            'num_leaves': 261,
            'reg_alpha': 0.4182767807212193,
            'reg_lambda': 0.07336659149142766,
            'subsample_for_bin': 40000,
            'subsample': 0.6462594904717385}
'''