## Import the Libraries

In [14]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
conf = pyspark.SparkConf().setAppName("Random Forest Regressor")
sc = SparkContext.getOrCreate(conf=conf)

## Download the Dataset

In [10]:
!wget https://raw.githubusercontent.com/mananparasher/Spark-Datasets/master/bank_data.csv

--2020-06-21 19:21:54--  https://raw.githubusercontent.com/mananparasher/Spark-Datasets/master/bank_data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.52.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.52.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 918960 (897K) [text/plain]
Saving to: ‘bank_data.csv.2’


2020-06-21 19:21:55 (1.80 MB/s) - ‘bank_data.csv.2’ saved [918960/918960]



## Load the Data in Spark DataFrame

In [2]:
df = spark.read.csv('bank_data.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



## Data Processing for Machine Learning Model

In [3]:
pipeline_stages=[]
categorical_columns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome','deposit']
for value in categorical_columns:
    string_indexer = StringIndexer(inputCol=value, outputCol=value+"indexer")
    pipeline_stages += [string_indexer]

numerical_columns = ['age', 'duration', 'campaign', 'pdays', 'previous']
combined_columns = [columns + "indexer" for columns in categorical_columns] + numerical_columns

vector_assembler = VectorAssembler(inputCols=combined_columns, outputCol="features")
pipeline_stages += [vector_assembler]

In [4]:
pipeline_stages

[StringIndexer_48b6a8e4fea56b20d271,
 StringIndexer_493a93c2a773bd960c00,
 StringIndexer_42bb957c57264040e3ec,
 StringIndexer_4d39b1703f42ddddf07d,
 StringIndexer_40a9bdb4a20c99a339fe,
 StringIndexer_4100983e1bc85d37d044,
 StringIndexer_49a587483bbf78f0c3a5,
 StringIndexer_4b25b143d5edcee0e2a1,
 StringIndexer_4bc398e61726ff5a759b,
 VectorAssembler_40c6ac62146e644b2e49]

In [5]:
pipeline = Pipeline(stages = pipeline_stages)
pipeline_fit = pipeline.fit(df).transform(df)
df = pipeline_fit.select(["features","balance"])
df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- balance: integer (nullable = true)



## Data Splitting

In [6]:
training_data, testing_data = df.randomSplit([0.9, 0.1])
print("Training Dataset Count: " + str(training_data.count()))
print("Test Dataset Count: " + str(testing_data.count()))

Training Dataset Count: 10021
Test Dataset Count: 1141


## Model Implementation and Fitting

In [10]:
randomforestregressor = RandomForestRegressor(featuresCol = 'features', labelCol = 'balance', maxDepth = 30)
model = randomforestregressor.fit(training_data)
model

RandomForestRegressionModel (uid=RandomForestRegressor_4965b6448c2c4d10aff1) with 20 trees

## Model Prediction

In [11]:
predictions = model.transform(testing_data)
predictions.select("features","prediction","balance").show(10)

+--------------------+------------------+-------+
|            features|        prediction|balance|
+--------------------+------------------+-------+
|(14,[0,1,2,4,9,10...|         1383.2875|   -759|
|(14,[0,1,2,4,9,10...|1375.1083333333331|    258|
|(14,[0,1,2,4,9,10...|             917.1|    463|
|(14,[0,1,2,4,9,10...|  955.046696018229|   2744|
|(14,[0,1,2,4,9,10...| 717.1021409334832|   2565|
|(14,[0,1,2,4,9,10...| 1106.439553161086|    184|
|(14,[0,1,2,4,9,10...| 6115.849867724867|    744|
|(14,[0,1,2,4,9,10...|1669.2091666666668|   1005|
|(14,[0,1,2,4,9,10...|          764.2975|      5|
|(14,[0,1,2,4,9,10...|            655.95|      4|
+--------------------+------------------+-------+
only showing top 10 rows

