## Import the Libraries

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
conf = pyspark.SparkConf().setAppName("Decision Tree Regression")
sc = SparkContext.getOrCreate(conf=conf)

## Download the Dataset

In [10]:
!wget https://raw.githubusercontent.com/mananparasher/Spark-Datasets/master/bank_data.csv

--2020-06-21 19:21:54--  https://raw.githubusercontent.com/mananparasher/Spark-Datasets/master/bank_data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.52.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.52.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 918960 (897K) [text/plain]
Saving to: ‘bank_data.csv.2’


2020-06-21 19:21:55 (1.80 MB/s) - ‘bank_data.csv.2’ saved [918960/918960]



## Load the Data in Spark DataFrame

In [2]:
df = spark.read.csv('bank_data.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



## Data Processing for Machine Learning Model

In [3]:
pipeline_stages=[]
categorical_columns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome','deposit']
for value in categorical_columns:
    string_indexer = StringIndexer(inputCol=value, outputCol=value+"indexer")
    pipeline_stages += [string_indexer]

numerical_columns = ['age', 'duration', 'campaign', 'pdays', 'previous']
combined_columns = [columns + "indexer" for columns in categorical_columns] + numerical_columns

vector_assembler = VectorAssembler(inputCols=combined_columns, outputCol="features")
pipeline_stages += [vector_assembler]

In [4]:
pipeline_stages

[StringIndexer_4835827940c8e276ec35,
 StringIndexer_42ba8eb918a0df020e65,
 StringIndexer_46ee93620497ebeb195a,
 StringIndexer_44e5a1d27cb7cf138441,
 StringIndexer_475381175977db0cbf5c,
 StringIndexer_4c62a1ff3cd7ab7a2f32,
 StringIndexer_4e1eaf9fdc390c59e86b,
 StringIndexer_4ea9bfaba09ead8bb4a0,
 StringIndexer_408a962bd73cbc6a687b,
 VectorAssembler_4f69b29b64a38e1625b3]

In [5]:
pipeline = Pipeline(stages = pipeline_stages)
pipeline_fit = pipeline.fit(df).transform(df)
df = pipeline_fit.select(["features","balance"])
df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- balance: integer (nullable = true)



## Data Splitting

In [6]:
training_data, testing_data = df.randomSplit([0.9, 0.1])
print("Training Dataset Count: " + str(training_data.count()))
print("Test Dataset Count: " + str(testing_data.count()))

Training Dataset Count: 10026
Test Dataset Count: 1136


## Model Implementation and Fitting

In [9]:
decisiontreeregressor = DecisionTreeRegressor(featuresCol = 'features', labelCol = 'balance', maxDepth = 10)
model = decisiontreeregressor.fit(training_data)
model

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_45f0ad2b97c083e6e9e7) of depth 10 with 877 nodes

## Model Prediction

In [8]:
predictions = model.transform(testing_data)
predictions.select("features","prediction","balance").show(10)

+--------------------+------------------+-------+
|            features|        prediction|balance|
+--------------------+------------------+-------+
|(14,[0,1,2,3,9,10...|              -4.0|     24|
|(14,[0,1,2,4,9,10...|            597.76|    426|
|(14,[0,1,2,4,9,10...| 1344.556511056511|   2565|
|(14,[0,1,2,4,9,10...| 582.2363636363636|      0|
|(14,[0,1,2,4,9,10...| 1344.556511056511|    130|
|(14,[0,1,2,4,9,10...|320.30379746835445|    418|
|(14,[0,1,2,4,9,10...|           1862.25|    392|
|(14,[0,1,2,4,9,10...| 1344.556511056511|  13107|
|(14,[0,1,2,4,9,10...| 1344.556511056511|     31|
|(14,[0,1,2,6,9,10...|            3744.7|     79|
+--------------------+------------------+-------+
only showing top 10 rows

