<a href="https://colab.research.google.com/github/malikbaqi12/Applied-data-science-using-pyspark-code-files/blob/main/Supervised_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Default Parameters

In [1]:
filename = "bank-full.csv"
target_variable_name = "y"

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=11fbb664f200d4c97f2bbf0b8457ba230a8650eac6ec8bc3202c516637b5d33f
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


# Load Dataset

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = spark.read.csv(filename, header=True, inferSchema=True, sep=';')
data.show()

+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management| married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician|  single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur| married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar| married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
| 33|     unknown|  single|  unknown|     no|      1|     no|  no|unknown|  5|  may

# Length of the data

In [5]:
data.count()

45211

# Describe data

In [7]:
data.describe().toPandas()

Unnamed: 0,summary,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
0,count,45211.0,45211,45211,45211,45211,45211.0,45211,45211,45211,45211.0,45211,45211.0,45211.0,45211.0,45211.0,45211,45211
1,mean,40.93621021432837,,,,,1362.2720576850766,,,,15.80641879188693,,258.1630797814691,2.763840658246887,40.19782796222158,0.5803233726305546,,
2,stddev,10.618762040975408,,,,,3044.7658291685243,,,,8.322476153044596,,257.5278122651709,3.098020883279184,100.12874599059812,2.3034410449312204,,
3,min,18.0,admin.,divorced,primary,no,-8019.0,no,no,cellular,1.0,apr,0.0,1.0,-1.0,0.0,failure,no
4,max,95.0,unknown,single,unknown,yes,102127.0,yes,yes,unknown,31.0,sep,4918.0,63.0,871.0,275.0,unknown,yes


# Check Data types of each column

In [8]:
data.groupby('marital').count().show()

+--------+-----+
| marital|count|
+--------+-----+
|divorced| 5207|
| married|27214|
|  single|12790|
+--------+-----+



In [9]:
data.dtypes

[('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('balance', 'int'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('day', 'int'),
 ('month', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('y', 'string')]

In [10]:
data.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)



In [11]:
from pyspark.sql.functions import * 
data.groupBy(target_variable_name).agg({'balance':'avg', 'age': 'avg'}).show()

+---+------------------+------------------+
|  y|      avg(balance)|          avg(age)|
+---+------------------+------------------+
| no|1303.7149691899203| 40.83898602274435|
|yes|1804.2679145396105|41.670069956513515|
+---+------------------+------------------+



# Cardinality Check

In [12]:
from pyspark.sql.functions import approxCountDistinct, countDistinct

"""
Note: approxCountDistinct and countDistinct can be used interchangeably. Only difference is the computation time. 

"approxCountDistinct" is useful for large datasets 
"countDistinct" for small and medium datasets.

"""

def cardinality_calculation(df, cut_off=1):
    cardinality = df.select(*[approxCountDistinct(c).alias(c) for c in df.columns])
    
    ## convert to pandas for efficient calculations
    final_cardinality_df = cardinality.toPandas().transpose()
    final_cardinality_df.reset_index(inplace=True) 
    final_cardinality_df.rename(columns={0:'Cardinality'}, inplace=True) 
    
    #select variables with cardinality of 1
    vars_selected = final_cardinality_df['index'][final_cardinality_df['Cardinality'] <= cut_off] 
    
    return final_cardinality_df, vars_selected

cardinality_df, cardinality_vars_selected = cardinality_calculation(data)



In [13]:
cardinality_df

Unnamed: 0,index,Cardinality
0,age,76
1,job,11
2,marital,3
3,education,4
4,default,2
5,balance,7375
6,housing,2
7,loan,2
8,contact,3
9,day,32


In [14]:
cardinality_vars_selected

Series([], Name: index, dtype: object)

# Missing value check

In [15]:
#missing values check
from pyspark.sql.functions import count, when, isnan, col

# miss_percentage is set to 80% as discussed in the book
def missing_calculation(df, miss_percentage=0.80):
    
    #checks for both NaN and null values
    missing = df.select(*[count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])
    length_df = df.count()
    ## convert to pandas for efficient calculations
    final_missing_df = missing.toPandas().transpose()
    final_missing_df.reset_index(inplace=True) 
    final_missing_df.rename(columns={0:'missing_count'}, inplace=True) 
    final_missing_df['missing_percentage'] = final_missing_df['missing_count']/length_df
    
    #select variables with cardinality of 1
    vars_selected = final_missing_df['index'][final_missing_df['missing_percentage'] >= miss_percentage] 
    
    return final_missing_df, vars_selected

In [16]:
missing_df, missing_vars_selected = missing_calculation(data)

In [17]:
missing_df

Unnamed: 0,index,missing_count,missing_percentage
0,age,0,0.0
1,job,0,0.0
2,marital,0,0.0
3,education,0,0.0
4,default,0,0.0
5,balance,0,0.0
6,housing,0,0.0
7,loan,0,0.0
8,contact,0,0.0
9,day,0,0.0


In [18]:
missing_vars_selected

Series([], Name: index, dtype: object)

# Identify variable types

In [19]:
def variable_type(df):
    
    vars_list = df.dtypes
    char_vars = []
    num_vars = []
    for i in vars_list:
        if i[1] in ('string'):
            char_vars.append(i[0])
        else:
            num_vars.append(i[0])
    
    return char_vars, num_vars

In [20]:
char_vars, num_vars = variable_type(data)

In [21]:
char_vars

['job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'poutcome',
 'y']

In [22]:
num_vars

['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous']

In [23]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

def category_to_index(df, char_vars):
    
    char_df = df.select(char_vars)
    indexers = [StringIndexer(inputCol=c, outputCol=c+"_index", handleInvalid="keep") for c in char_df.columns]
    pipeline = Pipeline(stages=indexers)
    char_labels = pipeline.fit(char_df)
    df = char_labels.transform(df)
    return df, char_labels

In [24]:
data, char_labels = category_to_index(data, char_vars)

In [25]:
data.dtypes

[('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('balance', 'int'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('day', 'int'),
 ('month', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('y', 'string'),
 ('job_index', 'double'),
 ('marital_index', 'double'),
 ('education_index', 'double'),
 ('default_index', 'double'),
 ('housing_index', 'double'),
 ('loan_index', 'double'),
 ('contact_index', 'double'),
 ('month_index', 'double'),
 ('poutcome_index', 'double'),
 ('y_index', 'double')]

In [26]:
data = data.select([c for c in data.columns if c not in char_vars])

In [27]:
def rename_columns(df, char_vars):
    mapping = dict(zip([i + '_index' for i in char_vars], char_vars))
    df = df.select([col(c).alias(mapping.get(c, c)) for c in df.columns])
    return df

In [28]:
data = rename_columns(data, char_vars)

In [29]:
data.dtypes

[('age', 'int'),
 ('balance', 'int'),
 ('day', 'int'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('job', 'double'),
 ('marital', 'double'),
 ('education', 'double'),
 ('default', 'double'),
 ('housing', 'double'),
 ('loan', 'double'),
 ('contact', 'double'),
 ('month', 'double'),
 ('poutcome', 'double'),
 ('y', 'double')]

In [30]:
data.groupBy('y').count().show() 

+---+-----+
|  y|count|
+---+-----+
|0.0|39922|
|1.0| 5289|
+---+-----+



In [31]:
linear_df = data.select(['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous'])

In [32]:
linear_df

DataFrame[age: int, balance: int, day: int, duration: int, campaign: int, pdays: int, previous: int]

In [33]:
target_variable_name = 'balance'

# Assemble input vectors

In [34]:
from pyspark.ml.feature import VectorAssembler

#assemble individual columns to one column - 'features'
def assemble_vectors(df, features_list, target_variable_name):
    stages = []
    #assemble vectors
    assembler = VectorAssembler(inputCols=features_list, outputCol='features')
    stages = [assembler]
    #select all the columns + target + newly created 'features' column
    selectedCols = [target_variable_name, 'features'] + features_list
    #use pipeline to process sequentially
    pipeline = Pipeline(stages=stages)
    #assembler model
    assembleModel = pipeline.fit(df)
    #apply assembler model on data
    df = assembleModel.transform(df).select(selectedCols)

    return df

In [35]:
#exclude target variable and select all other feature vectors
features_list = linear_df.columns
#features_list = char_vars #this option is used only for ChiSqselector
features_list.remove(target_variable_name)

In [36]:
features_list

['age', 'day', 'duration', 'campaign', 'pdays', 'previous']

In [37]:
# apply the function on our dataframe
df = assemble_vectors(linear_df, features_list, target_variable_name)

In [38]:
df.show()

+-------+--------------------+---+---+--------+--------+-----+--------+
|balance|            features|age|day|duration|campaign|pdays|previous|
+-------+--------------------+---+---+--------+--------+-----+--------+
|   2143|[58.0,5.0,261.0,1...| 58|  5|     261|       1|   -1|       0|
|     29|[44.0,5.0,151.0,1...| 44|  5|     151|       1|   -1|       0|
|      2|[33.0,5.0,76.0,1....| 33|  5|      76|       1|   -1|       0|
|   1506|[47.0,5.0,92.0,1....| 47|  5|      92|       1|   -1|       0|
|      1|[33.0,5.0,198.0,1...| 33|  5|     198|       1|   -1|       0|
|    231|[35.0,5.0,139.0,1...| 35|  5|     139|       1|   -1|       0|
|    447|[28.0,5.0,217.0,1...| 28|  5|     217|       1|   -1|       0|
|      2|[42.0,5.0,380.0,1...| 42|  5|     380|       1|   -1|       0|
|    121|[58.0,5.0,50.0,1....| 58|  5|      50|       1|   -1|       0|
|    593|[43.0,5.0,55.0,1....| 43|  5|      55|       1|   -1|       0|
|    270|[41.0,5.0,222.0,1...| 41|  5|     222|       1|   -1|  

# Linear Regression

In [39]:
from pyspark.ml.regression import LinearRegression
reg = LinearRegression(featuresCol='features', labelCol='balance')
reg_model = reg.fit(df) # fit model

In [40]:
import pandas as pd
for k, v in df.schema["features"].metadata["ml_attr"]["attrs"].items():
    features_df = pd.DataFrame(v)

# print coefficient and intercept
print(reg_model.coefficients, reg_model.intercept)

features_df['coefficients'] = reg_model.coefficients

[28.08397290892997,3.3055463619496286,0.24882841970901756,-14.142676297161454,-0.08248810233032043,23.462992800762525] 124.92130092818479


In [41]:
features_df

Unnamed: 0,idx,name,coefficients
0,0,age,28.083973
1,1,day,3.305546
2,2,duration,0.248828
3,3,campaign,-14.142676
4,4,pdays,-0.082488
5,5,previous,23.462993


In [42]:
#prediction result
pred_result = reg_model.transform(df)

In [43]:
pred_result

DataFrame[balance: int, features: vector, age: int, day: int, duration: int, campaign: int, pdays: int, previous: int, prediction: double]

In [44]:
reg_model.summary.r2

0.010568116511551984

In [45]:
features_list

['age', 'day', 'duration', 'campaign', 'pdays', 'previous']

# Variance Inflation factor

In [46]:
def vif_calculator(df, features_list):
    vif_list = []
    for i in features_list:
        temp_features_list = features_list.copy()
        temp_features_list.remove(i)
        temp_target = i
        assembler = VectorAssembler(inputCols=temp_features_list, outputCol='features')
        temp_df = assembler.transform(df)
        reg = LinearRegression(featuresCol='features', labelCol=i)
        reg_model = reg.fit(temp_df) # fit model
        temp_vif = 1/(1 - reg_model.summary.r2)
        vif_list.append(temp_vif)
    return vif_list

In [47]:
features_df['vif'] = vif_calculator(linear_df, features_list)

In [48]:
features_df

Unnamed: 0,idx,name,coefficients,vif
0,0,age,28.083973,1.000917
1,1,day,3.305546,1.03435
2,2,duration,0.248828,1.007627
3,3,campaign,-14.142676,1.039907
4,4,pdays,-0.082488,1.276182
5,5,previous,23.462993,1.261321


# Logistic Regression

In [80]:
target_variable_name = "y"
logistic_df = data.select(['age', 'balance', 'day', 'duration', 'campaign', 
'pdays', 'previous', 'y'])
#exclude target variable and select all other feature vectors
features_list = logistic_df.columns
#features_list = char_vars #this option is used only for ChiSqselector
features_list.remove(target_variable_name)
# apply the function on our dataframe
df = assemble_vectors(logistic_df, features_list, target_variable_name)

In [81]:
import numpy as np
from pyspark.ml.classification import LogisticRegression
binary_clf = LogisticRegression(featuresCol='features', labelCol='y', 
family='binomial')

In [84]:
multinomial_clf = LogisticRegression(featuresCol='features', labelCol='y', 
family='multinomial')
binary_clf_model = binary_clf.fit(df) # fit binary model
multinomial_clf_model = multinomial_clf.fit(df) # fit multinomial model
np.set_printoptions(precision=3, suppress=True) #model coefficients for binary model
print(binary_clf_model.coefficients) #model coefficients for multinomial model
np.set_printoptions(precision=4, suppress=True)
print(multinomial_clf_model.coefficientMatrix)
print(binary_clf_model.intercept) #model intercept for binary model #model intercept for multinomial model
print(multinomial_clf_model.interceptVector)

DenseMatrix([[ 0.0161,  0.    ,  0.0207,  0.0038, -0.3267,  0.0006, -0.0157],
             [ 0.0241,  0.0001,  0.0191,  0.0074, -0.4548,  0.0027,  0.0702],
             [-0.0402, -0.0001, -0.0398, -0.0112,  0.7815, -0.0033, -0.0546]])
[26.4380595938345,22.96815598984732,-49.40621558368182]


# Decision Trees

In [85]:
from pyspark.ml.classification import DecisionTreeClassifier

In [86]:
target_variable_name = "y"
logistic_df = data.select(['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous', 'y'])
#exclude target variable and select all other feature vectors
features_list = logistic_df.columns
#features_list = char_vars #this option is used only for ChiSqselector
features_list.remove(target_variable_name)
# apply the function on our dataframe
binary_df = assemble_vectors(logistic_df, features_list, target_variable_name)

In [87]:
target_variable_name = "balance"
linear_df = data.select(['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous'])
#exclude target variable and select all other feature vectors
features_list = linear_df.columns
#features_list = char_vars #this option is used only for ChiSqselector
features_list.remove(target_variable_name)
# apply the function on our dataframe
continuous_df = assemble_vectors(linear_df, features_list, target_variable_name)

In [88]:
from pyspark.ml.classification import DecisionTreeClassifier

clf = DecisionTreeClassifier(featuresCol='features', labelCol='y', impurity='gini')
clf_model = clf.fit(binary_df)
clf2 = DecisionTreeClassifier(featuresCol='features', labelCol='y', impurity='entropy')
clf_model2 = clf2.fit(binary_df)

In [89]:
print(clf_model.featureImportances)

(7,[0,2,3,4,5],[0.06190670882603403,0.004216877896410375,0.7193629411427086,0.0032743507248847495,0.21123912140996226])


In [90]:
print(clf_model2.featureImportances)

(7,[0,3,5],[0.06411554347284239,0.7110985153536679,0.22478594117348974])


In [91]:
from pyspark.ml.regression import DecisionTreeRegressor

reg = DecisionTreeRegressor(featuresCol='features', labelCol='balance', impurity='variance')
reg_model = reg.fit(continuous_df)

In [92]:
clf_model.toDebugString

'DecisionTreeClassificationModel: uid=DecisionTreeClassifier_4f3073b9d34a, depth=5, numNodes=41, numClasses=3, numFeatures=7\n  If (feature 3 <= 491.5)\n   If (feature 5 <= 9.5)\n    If (feature 0 <= 60.5)\n     Predict: 0.0\n    Else (feature 0 > 60.5)\n     If (feature 3 <= 129.5)\n      Predict: 0.0\n     Else (feature 3 > 129.5)\n      If (feature 3 <= 208.5)\n       Predict: 0.0\n      Else (feature 3 > 208.5)\n       Predict: 1.0\n   Else (feature 5 > 9.5)\n    If (feature 3 <= 160.5)\n     Predict: 0.0\n    Else (feature 3 > 160.5)\n     If (feature 5 <= 189.5)\n      If (feature 5 <= 93.5)\n       Predict: 1.0\n      Else (feature 5 > 93.5)\n       Predict: 0.0\n     Else (feature 5 > 189.5)\n      Predict: 0.0\n  Else (feature 3 > 491.5)\n   If (feature 3 <= 871.5)\n    If (feature 5 <= 8.5)\n     If (feature 3 <= 678.5)\n      If (feature 0 <= 60.5)\n       Predict: 0.0\n      Else (feature 0 > 60.5)\n       Predict: 1.0\n     Else (feature 3 > 678.5)\n      If (feature 0 <= 

## String to Json parser

In [93]:
def parse(lines):
    block = []
    while lines :

        if lines[0].startswith('If'):
            bl = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
            block.append({'id':bl, 'children':parse(lines)})


            if lines[0].startswith('Else'):
                be = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
                block.append({'id':be, 'children':parse(lines)})
        elif not lines[0].startswith(('If','Else')):
            block2 = lines.pop(0)
            block.append({'id':block2})
        else:
            break
    return block

def tree_json(tree):
    data = []
    for line in tree.splitlines() : 
        if line.strip():
            line = line.strip()
            data.append(line)
        else : break
        if not line : break
    res = []
    res.append({'id':'Root', 'children':parse(data[1:])})
    return res[0]

In [94]:
result = tree_json(clf_model.toDebugString)

In [95]:
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructField, StructType

cSchema = StructType([StructField("age", IntegerType())\
                      ,StructField("gender", IntegerType())\
                      ,StructField("y", IntegerType())])

test_list = [[30, 0, 1], 
             [25, 1, 0], 
             [45, 0, 0], 
             [57, 1, 1],
             [27, 0, 1], 
             [54, 1, 1], 
             [35, 1, 1]]


test_df = spark.createDataFrame(test_list, schema=cSchema)
test_df.show()

+---+------+---+
|age|gender|  y|
+---+------+---+
| 30|     0|  1|
| 25|     1|  0|
| 45|     0|  0|
| 57|     1|  1|
| 27|     0|  1|
| 54|     1|  1|
| 35|     1|  1|
+---+------+---+



In [100]:
test_df = assemble_vectors(test_df, ['age','gender'], 'y')
test_clf = DecisionTreeClassifier(featuresCol='features', labelCol='y')
test_clf_model = test_clf.fit(test_df)

IllegalArgumentException: ignored

In [98]:
clf_model.featureImportances

SparseVector(7, {0: 0.0386, 1: 0.0045, 2: 0.0177, 3: 0.7391, 4: 0.0022, 5: 0.1016, 6: 0.0964})

In [99]:
test_clf_model.toDebugString

'DecisionTreeClassificationModel: uid=DecisionTreeClassifier_f9ab41be2ee9, depth=3, numNodes=7, numClasses=2, numFeatures=2\n  If (feature 0 <= 26.0)\n   Predict: 0.0\n  Else (feature 0 > 26.0)\n   If (feature 0 <= 40.0)\n    Predict: 1.0\n   Else (feature 0 > 40.0)\n    If (feature 0 <= 49.5)\n     Predict: 0.0\n    Else (feature 0 > 49.5)\n     Predict: 1.0\n'

# Random Forest

In [96]:
from pyspark.ml.classification import RandomForestClassifier

clf = RandomForestClassifier(featuresCol='features', labelCol='y')
clf_model = clf.fit(binary_df)
print(clf_model.featureImportances)
print(clf_model.toDebugString)

(7,[0,1,2,3,4,5,6],[0.038557953092905356,0.004465787812830624,0.01766791759408475,0.7390687544878465,0.00223711556797955,0.10164267136787343,0.09635980007647986])
RandomForestClassificationModel: uid=RandomForestClassifier_b6f970d193c0, numTrees=20, numClasses=3, numFeatures=7
  Tree 0 (weight 1.0):
    If (feature 5 <= 8.5)
     If (feature 2 <= 4.5)
      If (feature 6 <= 6.5)
       If (feature 3 <= 477.5)
        Predict: 0.0
       Else (feature 3 > 477.5)
        If (feature 1 <= -307.0)
         Predict: 1.0
        Else (feature 1 > -307.0)
         Predict: 0.0
      Else (feature 6 > 6.5)
       If (feature 3 <= 76.5)
        Predict: 0.0
       Else (feature 3 > 76.5)
        Predict: 1.0
     Else (feature 2 > 4.5)
      If (feature 3 <= 654.5)
       Predict: 0.0
      Else (feature 3 > 654.5)
       If (feature 3 <= 843.5)
        Predict: 0.0
       Else (feature 3 > 843.5)
        Predict: 1.0
    Else (feature 5 > 8.5)
     If (feature 5 <= 187.5)
      If (feature 3 <

In [101]:
from pyspark.ml.regression import RandomForestRegressor

reg = RandomForestRegressor(featuresCol='features', labelCol='balance')
reg_model = reg.fit(continuous_df)
print(reg_model.featureImportances)
print(reg_model.toDebugString)

(6,[0,1,2,3,4,5],[0.3454518221845037,0.2499752699818787,0.09618437019664351,0.04905892770748404,0.21060501575167243,0.048724594177817424])
RandomForestRegressionModel: uid=RandomForestRegressor_0fe0558268ba, numTrees=20, numFeatures=6
  Tree 0 (weight 1.0):
    If (feature 0 <= 52.5)
     If (feature 1 <= 17.5)
      If (feature 4 <= 30.0)
       If (feature 0 <= 35.5)
        If (feature 3 <= 16.5)
         Predict: 919.2836909871245
        Else (feature 3 > 16.5)
         Predict: 2352.148936170213
       Else (feature 0 > 35.5)
        If (feature 0 <= 42.5)
         Predict: 1161.107847341338
        Else (feature 0 > 42.5)
         Predict: 1329.5615006150063
      Else (feature 4 > 30.0)
       If (feature 5 <= 4.5)
        If (feature 1 <= 2.5)
         Predict: 1582.1331269349846
        Else (feature 1 > 2.5)
         Predict: 1184.1075334143377
       Else (feature 5 > 4.5)
        If (feature 4 <= 168.5)
         Predict: 2702.2953586497893
        Else (feature 4 > 168.5)


# Gradient boosting

In [102]:
from pyspark.ml.classification import GBTClassifier

clf = GBTClassifier(featuresCol='features', labelCol='y')
clf_model = clf.fit(binary_df)
print(clf_model.featureImportances)
print(clf_model.toDebugString)

(7,[0,1,2,3,4,5,6],[0.11376371243731193,0.09554440652822713,0.1408168583959305,0.4543466406308598,0.04521532518556668,0.12804410659759036,0.02226895022451372])
GBTClassificationModel: uid = GBTClassifier_0f28c6c5abec, numTrees=20, numClasses=2, numFeatures=7
  Tree 0 (weight 1.0):
    If (feature 3 <= 475.5)
     If (feature 5 <= 16.0)
      If (feature 0 <= 59.5)
       If (feature 3 <= 204.5)
        If (feature 0 <= 28.5)
         Predict: -0.8707360861759426
        Else (feature 0 > 28.5)
         Predict: -0.9660419670030435
       Else (feature 3 > 204.5)
        If (feature 0 <= 25.5)
         Predict: -0.518324607329843
        Else (feature 0 > 25.5)
         Predict: -0.8520653218059558
      Else (feature 0 > 59.5)
       If (feature 3 <= 204.5)
        If (feature 3 <= 120.5)
         Predict: -0.9247648902821317
        Else (feature 3 > 120.5)
         Predict: -0.6050955414012739
       Else (feature 3 > 204.5)
        If (feature 0 <= 66.5)
         Predict: -0.3626373

In [103]:
from pyspark.ml.regression import GBTRegressor

reg = GBTRegressor(featuresCol='features', labelCol='balance')
reg_model = reg.fit(continuous_df)
print(reg_model.featureImportances)
print(reg_model.toDebugString)

(6,[0,1,2,3,4,5],[0.18572781726960613,0.3753777882263563,0.263284691474992,0.07600049084927761,0.0679712754044124,0.03163793677535541])
GBTRegressionModel: uid=GBTRegressor_aa90b6f897bc, numTrees=20, numFeatures=6
  Tree 0 (weight 1.0):
    If (feature 0 <= 54.5)
     If (feature 0 <= 37.5)
      If (feature 1 <= 17.5)
       If (feature 1 <= 3.5)
        If (feature 4 <= 377.5)
         Predict: 1289.7027450980393
        Else (feature 4 > 377.5)
         Predict: 3244.75
       Else (feature 1 > 3.5)
        If (feature 4 <= 25.5)
         Predict: 937.5397395002658
        Else (feature 4 > 25.5)
         Predict: 1198.0977011494253
      Else (feature 1 > 17.5)
       If (feature 1 <= 21.5)
        If (feature 0 <= 29.5)
         Predict: 917.4211287988422
        Else (feature 0 > 29.5)
         Predict: 1723.0375874125873
       Else (feature 1 > 21.5)
        If (feature 4 <= 3.5)
         Predict: 952.7784339457568
        Else (feature 4 > 3.5)
         Predict: 1535.129541864

# Support vector machines

In [109]:
from pyspark.sql.functions import when

In [110]:
binary_df = binary_df.withColumn("y", when(binary_df["y"] == 2, 1).otherwise(binary_df["y"]))

In [111]:
from pyspark.ml.classification import LinearSVC

In [112]:
clf = LinearSVC(featuresCol='features', labelCol='y')
clf_model = clf.fit(binary_df)
print(clf_model.intercept, clf_model.coefficients)

-1.0000000574762387 [1.497446446917058e-10,-0.0,6.76696042699316e-10,1.488516520343328e-10,-4.993085808396494e-09,-0.0,8.620405268852088e-09]


In [113]:
from pyspark.ml.classification import LinearSVC
clf = LinearSVC(featuresCol='features', labelCol='y')
clf_model = clf.fit(binary_df)
print(clf_model.intercept, clf_model.coefficients)

-1.0000000574762387 [1.497446446917058e-10,-0.0,6.76696042699316e-10,1.488516520343328e-10,-4.993085808396494e-09,-0.0,8.620405268852088e-09]


In [117]:
import numpy as np
from pyspark.ml.classification import LogisticRegression

In [118]:
binary_clf = LogisticRegression(featuresCol='features', labelCol='y', family='binomial')

In [119]:
multinomial_clf = LogisticRegression(featuresCol='features', labelCol='y', family='multinomial')

In [122]:
target_variable_name = "y"
logistic_df = data.select(['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous', 'y'])
features_list = logistic_df.columns
features_list.remove(target_variable_name)

# assemble feature vectors using vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=features_list, outputCol="features")
df = assembler.transform(logistic_df).select("features", target_variable_name)

# train multinomial logistic regression model
from pyspark.ml.classification import LogisticRegression
multinomial_clf = LogisticRegression(featuresCol='features', labelCol='y', family='multinomial')
multinomial_clf_model = multinomial_clf.fit(df)

# print model coefficients
import numpy as np
np.set_printoptions(precision=4, suppress=True)
print(multinomial_clf_model.coefficientMatrix)
print(multinomial_clf_model.interceptVector)


DenseMatrix([[ 0.0161,  0.    ,  0.0207,  0.0038, -0.3267,  0.0006, -0.0157],
             [ 0.0241,  0.0001,  0.0191,  0.0074, -0.4548,  0.0027,  0.0702],
             [-0.0402, -0.0001, -0.0398, -0.0112,  0.7815, -0.0033, -0.0546]])
[26.4380595938345,22.96815598984732,-49.40621558368182]


# Neural networks

In [124]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

#output_layer is set to 2 because of binary target
clf = MultilayerPerceptronClassifier(featuresCol='features', labelCol='y', layers=[4, 4, 2])
clf_model = clf.fit(binary_df)

# One vs rest classifier

In [125]:
target_variable_name = "education"
multiclass_df = data.select(['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous', 'job', 'education'])
features_list = multiclass_df.columns
#features_list = char_vars #this option is used only for ChiSqselector
features_list.remove(target_variable_name)
# apply the function on our dataframe
multiclass_df = assemble_vectors(multiclass_df, features_list, target_variable_name)

In [126]:
from pyspark.ml.classification import RandomForestClassifier, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# generate the train/test split.
(train, test) = multiclass_df.randomSplit([0.7, 0.3])
# instantiate the base classifier.
clf = RandomForestClassifier(featuresCol='features', labelCol='education')
# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=clf, featuresCol='features', labelCol='education')
# train the multiclass model.
ovrModel = ovr.fit(train)
# score the model on test data.
predictions = ovrModel.transform(test)
# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy", labelCol='education')
# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.33684


# Naive Bayes classifier

In [127]:
target_variable_name = "y"
nonneg_df = data.select(['age', 'day', 'duration', 'campaign', 'previous', 'y'])
#exclude target variable and select all other feature vectors
features_list = nonneg_df.columns
#features_list = char_vars #this option is used only for ChiSqselector
features_list.remove(target_variable_name)
# apply the function on our dataframe
nonneg_df = assemble_vectors(nonneg_df, features_list, target_variable_name)

In [128]:
from pyspark.ml.classification import NaiveBayes

#output_layer is set to 2 because of binary target
clf = NaiveBayes(featuresCol='features', labelCol='y')
clf_model = clf.fit(nonneg_df)