## Import the Libraries

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
conf = pyspark.SparkConf().setAppName("Gradient Boosted Tree Regressor")
sc = SparkContext.getOrCreate(conf=conf)

## Download the Dataset

In [10]:
!wget https://raw.githubusercontent.com/mananparasher/Spark-Datasets/master/bank_data.csv

--2020-06-21 19:21:54--  https://raw.githubusercontent.com/mananparasher/Spark-Datasets/master/bank_data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.52.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.52.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 918960 (897K) [text/plain]
Saving to: ‘bank_data.csv.2’


2020-06-21 19:21:55 (1.80 MB/s) - ‘bank_data.csv.2’ saved [918960/918960]



## Load the Data in Spark DataFrame

In [2]:
df = spark.read.csv('bank_data.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



## Data Processing for Machine Learning Model

In [3]:
pipeline_stages=[]
categorical_columns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome','deposit']
for value in categorical_columns:
    string_indexer = StringIndexer(inputCol=value, outputCol=value+"indexer")
    pipeline_stages += [string_indexer]

numerical_columns = ['age', 'duration', 'campaign', 'pdays', 'previous']
combined_columns = [columns + "indexer" for columns in categorical_columns] + numerical_columns

vector_assembler = VectorAssembler(inputCols=combined_columns, outputCol="features")
pipeline_stages += [vector_assembler]

In [4]:
pipeline_stages

[StringIndexer_4894986158ffd85d937e,
 StringIndexer_47aca0a8d5a7dbc1363c,
 StringIndexer_401897a9e1c6029394ee,
 StringIndexer_4497a5a197505e842cad,
 StringIndexer_4325827b2bee5dfd8c93,
 StringIndexer_4227851293fcb2787a28,
 StringIndexer_4edd8dbfa3253521d791,
 StringIndexer_41a5b03fd3604db29541,
 StringIndexer_4b5a9650f9311b6e97c0,
 VectorAssembler_4cd8bb2123bb6316424a]

In [5]:
pipeline = Pipeline(stages = pipeline_stages)
pipeline_fit = pipeline.fit(df).transform(df)
df = pipeline_fit.select(["features","balance"])
df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- balance: integer (nullable = true)



## Data Splitting

In [6]:
training_data, testing_data = df.randomSplit([0.9, 0.1])
print("Training Dataset Count: " + str(training_data.count()))
print("Test Dataset Count: " + str(testing_data.count()))

Training Dataset Count: 10130
Test Dataset Count: 1032


## Model Implementation and Fitting

In [8]:
gbtregressor = GBTRegressor(featuresCol = 'features', labelCol = 'balance', maxDepth = 30)
model = gbtregressor.fit(training_data)
model

GBTRegressionModel (uid=GBTRegressor_4866858e4dd3dd61c619) with 20 trees

## Model Prediction

In [9]:
predictions = model.transform(testing_data)
predictions.select("features","prediction","balance").show(10)

+--------------------+--------------------+-------+
|            features|          prediction|balance|
+--------------------+--------------------+-------+
|(14,[0,1,2,4,9,10...|               808.0|    127|
|(14,[0,1,2,4,9,10...|                64.0|    272|
|(14,[0,1,2,4,9,10...|               803.0|    138|
|(14,[0,1,2,4,9,10...|               826.0|    221|
|(14,[0,1,2,4,9,10...|2.626005753863317...|   1489|
|(14,[0,1,2,5,9,10...|             13410.0|    113|
|(14,[0,1,2,5,9,10...|               696.0|     83|
|(14,[0,1,2,6,9,10...|               287.0|  10984|
|(14,[0,1,2,6,9,10...|8.968628667020811...|     29|
|(14,[0,1,2,6,9,10...|               108.0|    122|
+--------------------+--------------------+-------+
only showing top 10 rows

