## Import the Libraries

In [1]:
import pyspark
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
spark = SparkSession.builder.appName("Gradient Boosted Tree Classifier").getOrCreate()

##  Download the Dataset 

In [2]:
!wget https://raw.githubusercontent.com/mananparasher/Spark-Datasets/master/bank_data.csv

--2020-06-20 23:46:00--  https://raw.githubusercontent.com/mananparasher/Spark-Datasets/master/bank_data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.124.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.124.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 918960 (897K) [text/plain]
Saving to: ‘bank_data.csv.1’


2020-06-20 23:46:00 (1.57 MB/s) - ‘bank_data.csv.1’ saved [918960/918960]



## Load the Data in Spark DataFrame 

In [2]:
df = spark.read.csv('bank_data.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



## Data Processing for Machine Learning Model

In [3]:
pipeline_stages=[]
categorical_columns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
for value in categorical_columns:
    string_indexer = StringIndexer(inputCol=value, outputCol=value+"indexer")
    pipeline_stages += [string_indexer]
    
target_column = StringIndexer(inputCol = 'deposit', outputCol = 'label')
pipeline_stages += [target_column]

numerical_columns = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
combined_columns = [columns + "indexer" for columns in categorical_columns] + numerical_columns

vector_assembler = VectorAssembler(inputCols=combined_columns, outputCol="features")
pipeline_stages += [vector_assembler]

In [4]:
pipeline_stages

[StringIndexer_49c88743a157ae1ef1c6,
 StringIndexer_45669e08de7f584b3647,
 StringIndexer_45f8b73b82257d6af515,
 StringIndexer_4c909de6bf56c2f849c0,
 StringIndexer_43539a11ce02fe8daaf6,
 StringIndexer_47b5b9445e742f879e30,
 StringIndexer_4470b0b1e94450033040,
 StringIndexer_4c798817b64666f9e8a3,
 StringIndexer_4921894900c8f9e0c18c,
 VectorAssembler_4087a8ec1215aa374047]

In [5]:
pipeline = Pipeline(stages = pipeline_stages)
pipeline_fit = pipeline.fit(df).transform(df)
df = pipeline_fit.select(["features","label"])
df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



##  Data Splitting 

In [6]:
training_data, testing_data = df.randomSplit([0.9, 0.1])
print("Training Dataset Count: " + str(training_data.count()))
print("Test Dataset Count: " + str(testing_data.count()))

Training Dataset Count: 10056
Test Dataset Count: 1106


## Model Implementation and Fitting 

In [7]:
gbtclassifier = GBTClassifier(featuresCol = 'features', labelCol = 'label', maxIter = 10)
model = gbtclassifier.fit(training_data)
model

GBTClassificationModel (uid=GBTClassifier_463284b24035ba6c57b9) with 10 trees

## Model Prediction  

In [8]:
predictions = model.transform(testing_data)
predictions.select("features","prediction","label").show(10)

+--------------------+----------+-----+
|            features|prediction|label|
+--------------------+----------+-----+
|(14,[0,1,2,4,8,10...|       0.0|  0.0|
|(14,[0,1,2,4,8,10...|       0.0|  0.0|
|(14,[0,1,2,4,8,10...|       0.0|  1.0|
|(14,[0,1,2,4,8,10...|       0.0|  0.0|
|(14,[0,1,2,6,8,10...|       1.0|  1.0|
|(14,[0,1,2,8,9,10...|       1.0|  1.0|
|(14,[0,1,2,8,9,10...|       1.0|  1.0|
|(14,[0,1,2,8,9,10...|       0.0|  1.0|
|(14,[0,1,2,8,9,10...|       0.0|  1.0|
|(14,[0,1,2,8,9,10...|       0.0|  1.0|
+--------------------+----------+-----+
only showing top 10 rows



## Model Evaluation

In [12]:
gbtclassifierevaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(gbtclassifierevaluator.evaluate(predictions, {gbtclassifierevaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.8792857424027731
