In [1]:
# Create the Spark Session Object
!pip install pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('sml').getOrCreate()

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=7f0d8426a255ef35eb521dd96658ba3fe8f5ade4befe8b1d5c65a30e4eec04e6
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
# read the data
filename = "sample_data/bank-full.csv"

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv(filename, header=True, inferSchema=True, sep=';')
df.show()

+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management| married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician|  single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur| married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar| married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
| 33|     unknown|  single|  unknown|     no|      1|     no|  no|unknown|  5|  may

In [3]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)



In [4]:
# assemble feature vectors
from pyspark.ml.feature import VectorAssembler

# assemble individual columns to one column - 'features'
def assemble_vectors(df, features_list, target_variable_name):
    stages = []

    #assemble vectors
    assembler = VectorAssembler(inputCols=features_list, outputCol='features')
    stages = [assembler]

    #select all the columns + target + newly created 'features' column
    selectedCols = [target_variable_name, 'features'] + features_list

    #use pipeline to process sequentially
    pipeline = Pipeline(stages=stages)

    #assembler model
    assembleModel = pipeline.fit(df)

    #apply assembler model on data
    df = assembleModel.transform(df).select(selectedCols)

    return df

In [5]:
from pyspark.ml import Pipeline

# select the independent variables
linear_df = df.select(['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous'])
target_variable_name = 'balance'

# exclude target variable and select all other feature vectors
features_list = linear_df.columns

# features_list = char_vars #this option is used only for ChiSqselector
features_list.remove(target_variable_name)

# apply the function on our DataFrame
df = assemble_vectors(linear_df, features_list, target_variable_name)

# fit the regression model
from pyspark.ml.regression import LinearRegression
reg = LinearRegression(featuresCol='features', labelCol='balance')
reg_model = reg.fit(df) # fit model

# view the coefficients and intercepts for each variable
import pandas as pd
for k, v in df.schema["features"].metadata["ml_attr"]["attrs"].items():
    features_df = pd.DataFrame(v)

# print coefficient and intercept
print("The coefficients for each independent variable are as follows:", reg_model.coefficients, "and the intercept is: ", reg_model.intercept)

The coefficients for each independent variable are as follows: [28.083972908930026,3.30554636194966,0.2488284197090184,-14.142676297161422,-0.08248810233031972,23.46299280076253] and the intercept is:  124.9213009281818


In [6]:
df.show()

+-------+--------------------+---+---+--------+--------+-----+--------+
|balance|            features|age|day|duration|campaign|pdays|previous|
+-------+--------------------+---+---+--------+--------+-----+--------+
|   2143|[58.0,5.0,261.0,1...| 58|  5|     261|       1|   -1|       0|
|     29|[44.0,5.0,151.0,1...| 44|  5|     151|       1|   -1|       0|
|      2|[33.0,5.0,76.0,1....| 33|  5|      76|       1|   -1|       0|
|   1506|[47.0,5.0,92.0,1....| 47|  5|      92|       1|   -1|       0|
|      1|[33.0,5.0,198.0,1...| 33|  5|     198|       1|   -1|       0|
|    231|[35.0,5.0,139.0,1...| 35|  5|     139|       1|   -1|       0|
|    447|[28.0,5.0,217.0,1...| 28|  5|     217|       1|   -1|       0|
|      2|[42.0,5.0,380.0,1...| 42|  5|     380|       1|   -1|       0|
|    121|[58.0,5.0,50.0,1....| 58|  5|      50|       1|   -1|       0|
|    593|[43.0,5.0,55.0,1....| 43|  5|      55|       1|   -1|       0|
|    270|[41.0,5.0,222.0,1...| 41|  5|     222|       1|   -1|  

To represent this model in an equation:
# Predicted_balance = 124.92 + 28.08 * age + 3.30 * day + 0.25 * duration - 14.14 * campaign -0.08 * pdays + 23.46 * previous

When two or more variables are correlated, the model results are not interpretable. In a linear regression model, we should avoid multi-collinearity issues. Otherwise, the model output is not valid.

To check for multi-collinearity: Variance Inflation Factor (VIF). The industry standard for VIF's value is 10. When the VIF value is 1, the input features are completely uncorrelated. PCA components usually have a VIF close to 1. In our final model, we should include input variables that end up having a VIF value of less than 10.

In [7]:
def vif_calculator(df, features_list):
    vif_list = []
    for i in features_list:
        temp_features_list = features_list.copy()
        temp_features_list.remove(i)
        temp_target = i
        assembler = VectorAssembler(inputCols=temp_features_list,
        outputCol='features')
        temp_df = assembler.transform(df)
        reg = LinearRegression(featuresCol='features', labelCol=i)
        reg_model = reg.fit(temp_df) # fit model
        temp_vif = 1/(1 - reg_model.summary.r2)
        vif_list.append(temp_vif)
    return vif_list

features_df['vif'] = vif_calculator(linear_df, features_list)
print(features_df)

# From the below statistics, we can see that the vif for all independent variables are close to 1 which means that collinearity does not exist between them.

   idx      name       vif
0    0       age  1.000917
1    1       day  1.034350
2    2  duration  1.007627
3    3  campaign  1.039907
4    4     pdays  1.276182
5    5  previous  1.261321


In [8]:
from pyspark.ml.regression import RandomForestRegressor

reg = RandomForestRegressor(featuresCol='features', labelCol="balance")
reg_model = reg.fit(df)

print(reg_model.featureImportances)
print(reg_model.toDebugString)

(6,[0,1,2,3,4,5],[0.32223876802346274,0.2534157828808158,0.10844083061917757,0.058317632708494896,0.17904232897600897,0.07854465679204006])
RandomForestRegressionModel: uid=RandomForestRegressor_69f44be2eac5, numTrees=20, numFeatures=6
  Tree 0 (weight 1.0):
    If (feature 0 <= 54.5)
     If (feature 2 <= 894.5)
      If (feature 4 <= 192.5)
       If (feature 5 <= 0.5)
        Predict: 1195.159598927066
       Else (feature 5 > 0.5)
        If (feature 1 <= 9.5)
         Predict: 1421.1469957081545
        Else (feature 1 > 9.5)
         Predict: 1876.7004647232784
      Else (feature 4 > 192.5)
       If (feature 2 <= 191.5)
        If (feature 0 <= 30.5)
         Predict: 604.5823863636364
        Else (feature 0 > 30.5)
         Predict: 895.0575275397797
       Else (feature 2 > 191.5)
        If (feature 5 <= 8.5)
         Predict: 1160.8030933967875
        Else (feature 5 > 8.5)
         Predict: 1629.2738095238096
     Else (feature 2 > 894.5)
      If (feature 5 <= 1.5)
    

In [9]:
# Gradient Boosting is useful for modelling imbalanced target classes and reducing bias in predictions.
from pyspark.ml.regression import GBTRegressor

reg = GBTRegressor(featuresCol='features', labelCol="balance")
reg_model = reg.fit(df)

print(reg_model.featureImportances)
print(reg_model.toDebugString)

(6,[0,1,2,3,4,5],[0.3344285546485345,0.16732428493676402,0.22370187628356802,0.10159459300327969,0.12168151112857775,0.051269179999275966])
GBTRegressionModel: uid=GBTRegressor_c95175a1825c, numTrees=20, numFeatures=6
  Tree 0 (weight 1.0):
    If (feature 0 <= 54.5)
     If (feature 0 <= 37.5)
      If (feature 1 <= 17.5)
       If (feature 1 <= 3.5)
        If (feature 4 <= 275.5)
         Predict: 1282.604967948718
        Else (feature 4 > 275.5)
         Predict: 2310.1063829787236
       Else (feature 1 > 3.5)
        If (feature 4 <= 24.5)
         Predict: 937.5397395002658
        Else (feature 4 > 24.5)
         Predict: 1198.0977011494253
      Else (feature 1 > 17.5)
       If (feature 1 <= 21.5)
        If (feature 0 <= 29.5)
         Predict: 917.4211287988422
        Else (feature 0 > 29.5)
         Predict: 1723.0375874125873
       Else (feature 1 > 21.5)
        If (feature 4 <= 3.5)
         Predict: 952.7784339457568
        Else (feature 4 > 3.5)
         Predict: 

In [10]:
# Load the data in a new dataframe df1 to be able to implement other algorithms
df1 = spark.read.csv(filename, header=True, inferSchema=True, sep=';')
df1.show(5)

+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
| 33|     unknown| single|  unknown|     no|      1|     no|  no|unknown|  5|  may|     19

In [11]:
# We need to convert the categorical variable 'y' into numerical variable
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

# Converting string column y into numerical values using StringIndexer.
indexer = StringIndexer(inputCol = "y", outputCol = "y_Index")
df1 = indexer.fit(df1).transform(df1)

#ohe = OneHotEncoder(inputCol="yIndex", outputCol="yOHEVector")
#df = ohe.fit(df).transform(df)

df1.show()

+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+-------+
|age|         job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|y_Index|
+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+-------+
| 58|  management| married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|    0.0|
| 44|  technician|  single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|    0.0|
| 33|entrepreneur| married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|    0.0|
| 47| blue-collar| married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|    0.0|
| 33|     unknown|  single|

In [12]:
from pyspark.sql.types import IntegerType
# We need to convert y_Index from double into integer to be able to use the Decision Tree Classifier algorithm

df1 = df1.withColumn('y_Index', df1['y_Index'].cast(IntegerType()))

In [13]:
df1.printSchema() # Verification of the 'y_Index' variable is indeed integer type

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)
 |-- y_Index: integer (nullable = true)



In [14]:
from pyspark.ml import Pipeline

target_variable_name = "y_Index" # Now as integer
logistic_df1 = df1.select(['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous', 'y_Index'])

#exclude target variable and select all other feature vectors
features_list = logistic_df1.columns

#features_list = char_vars #this option is used only for ChiSqselector
features_list.remove(target_variable_name)

# apply the function on our dataframe
df1 = assemble_vectors(logistic_df1, features_list, target_variable_name)

In [15]:
# It is preferable to use multinomial fit when the target has more than 2 classes.
import numpy as np
from pyspark.ml.classification import LogisticRegression

binary_clf = LogisticRegression(featuresCol='features', labelCol="y_Index", family="binomial")

multinomial_clf = LogisticRegression(featuresCol='features', labelCol="y_Index", family="multinomial")

binary_clf_model = binary_clf.fit(df1) # fit binary model

multinomial_clf_model = multinomial_clf.fit(df1) # fit multinomial model
np.set_printoptions(precision=3, suppress=True)

#model coefficients for binary model
print("The model coefficients for binary model", binary_clf_model.coefficients)

#model coefficients for multinomial model
np.set_printoptions(precision=4, suppress=True)
print(multinomial_clf_model.coefficientMatrix)

print(binary_clf_model.intercept) #model intercept for binary model

#model intercept for multinomial model
print(multinomial_clf_model.interceptVector)

The model coefficients for binary model [0.007959273224588061,3.71814071340423e-05,-0.0016496709451332176,0.00363719757809717,-0.12804344622031283,0.002113573301657069,0.0859380305073598]
DenseMatrix([[-0.004 , -0.    ,  0.0008, -0.0018,  0.064 , -0.0011, -0.043 ],
             [ 0.004 ,  0.    , -0.0008,  0.0018, -0.064 ,  0.0011,  0.043 ]])
-3.469907028389409
[1.7349523781744063,-1.7349523781744063]


The equation for the binary model is provided here:

# log(odds class1) = -3.47 + 0.008*age + 0.0000372*balance - 0.0017*day + 0.0036*duration - 0.128*campaign + 0.0021*pdays + 0.0859*previous
 Similarly, the equation for the multinomial model is provided here:

# log(odds class0) = 1.735 - 0.004*age – 0*balance + 0.0008*day - 0.0018*duration + 0.064*campaign - 0.0011*pdays - 0.043*previous
 and

# log(odds class1) = -1.735 +0.004*age + 0*balance - 0.0008*day + 0.0018*duration - 0.064*campaign + 0.0011*pdays + 0.043*previous

The rule of thumb for interpreting odds ratio:

Odds ratio of 1 means there is no difference in impact.

Odds ratio greater than 1 means the odds increase as the variable changes.

Odds ratio less than 1 means the odds decrease as the variable changes.

In [16]:
from pyspark.ml.classification import RandomForestClassifier

clf = RandomForestClassifier(featuresCol='features', labelCol="y_Index")
clf_model = clf.fit(df1)

print(clf_model.featureImportances)
print(clf_model.toDebugString)

(7,[0,1,2,3,4,5,6],[0.08927537919392198,0.008538320447672772,0.014312732083670057,0.7335004663567367,0.004991685069896515,0.10977084794304692,0.03961056890505503])
RandomForestClassificationModel: uid=RandomForestClassifier_3f2a5333f4e3, numTrees=20, numClasses=2, numFeatures=7
  Tree 0 (weight 1.0):
    If (feature 0 <= 60.5)
     If (feature 3 <= 493.5)
      Predict: 0.0
     Else (feature 3 > 493.5)
      If (feature 5 <= 31.0)
       If (feature 3 <= 700.5)
        Predict: 0.0
       Else (feature 3 > 700.5)
        If (feature 3 <= 910.0)
         Predict: 0.0
        Else (feature 3 > 910.0)
         Predict: 1.0
      Else (feature 5 > 31.0)
       If (feature 3 <= 700.5)
        If (feature 5 <= 92.5)
         Predict: 1.0
        Else (feature 5 > 92.5)
         Predict: 0.0
       Else (feature 3 > 700.5)
        Predict: 1.0
    Else (feature 0 > 60.5)
     If (feature 6 <= 1.5)
      If (feature 1 <= 1398.5)
       If (feature 3 <= 202.5)
        Predict: 0.0
       Else 

In [17]:
from pyspark.ml.classification import GBTClassifier # Gradient Boosting

clf = GBTClassifier(featuresCol='features', labelCol="y_Index")
clf_model = clf.fit(df1)

print(clf_model.featureImportances)
print(clf_model.toDebugString)

(7,[0,1,2,3,4,5,6],[0.14269202781702842,0.08883553452582624,0.13612888126384667,0.44070060823510665,0.039929163142451325,0.13710655093271443,0.01460723408302615])
GBTClassificationModel: uid = GBTClassifier_ccbaf49088af, numTrees=20, numClasses=2, numFeatures=7
  Tree 0 (weight 1.0):
    If (feature 3 <= 556.5)
     If (feature 5 <= 9.0)
      If (feature 3 <= 318.5)
       If (feature 0 <= 59.5)
        If (feature 3 <= 130.5)
         Predict: -0.9774143302180686
        Else (feature 3 > 130.5)
         Predict: -0.9017945457114597
       Else (feature 0 > 59.5)
        If (feature 3 <= 130.5)
         Predict: -0.9005524861878453
        Else (feature 3 > 130.5)
         Predict: -0.45161290322580644
      Else (feature 3 > 318.5)
       If (feature 0 <= 59.5)
        If (feature 3 <= 426.5)
         Predict: -0.803076923076923
        Else (feature 3 > 426.5)
         Predict: -0.6484261501210654
       Else (feature 0 > 59.5)
        If (feature 4 <= 3.5)
         Predict: -0.045

In [18]:
from pyspark.ml.classification import LinearSVC

clf = LinearSVC(featuresCol='features', labelCol="y_Index")
clf_model = clf.fit(df1)
print(clf_model.intercept, clf_model.coefficients)

-1.000000057476241 [1.497445206471147e-10,-0.0,6.766960380352564e-10,1.4885182296856278e-10,-4.99308247508715e-09,-0.0,8.620405476582408e-09]


In [19]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

#output_layer is set to 2 because of binary target
clf = MultilayerPerceptronClassifier(featuresCol='features', labelCol="y_Index", layers=[4, 4, 2])
clf_model = clf.fit(df1)


In [20]:
print(clf_model)

MultilayerPerceptronClassificationModel: uid=MultilayerPerceptronClassifier_635c14772df6, numLayers=3, numClasses=2, numFeatures=4


# Interpretation of the model results
How does a decision tree pick the most important attribute/variable? Gini, Entropy and Information gain.

Entropy is the measure of randomness in the data. If entropy = 0, there is no randomness whereas if entropy  = 1, the dataset is completely random.

Information Gain, IG = Entropy (parent node) – (Entropy each child node *
proportion of observations in each child node)

Gini impurity measures the impurity in a split. When Gini = 0, the split is pure and when it is 0.5, the split is random and completely impure.

Gini = 1 - Summation of ((p(x))^2

In [21]:
from pyspark.ml.classification import DecisionTreeClassifier

clf = DecisionTreeClassifier(featuresCol='features', labelCol="y_Index", impurity="gini") #gini based model
clf_model = clf.fit(df1)

clf2 = DecisionTreeClassifier(featuresCol='features', labelCol="y_Index", impurity="entropy") #entropy based model
clf_model2 = clf2.fit(df1)

clf_model.transform(df) #future predictions

# gini feature importance
print(clf_model.featureImportances)
#output - (7,[0,3,5],[0.063,0.723,0.214])

print(clf_model2.featureImportances)
#output - (7,[0,2,3,4,5],[0.018,0.001,0.727,0.0004,0.254])

(7,[0,1,2,3,4,5],[0.0566760902031683,0.0018009374676099367,0.0032899037873435765,0.7193567572330268,0.002293538337891141,0.21658277297096015])
(7,[0,3,4,5],[0.057295855454643066,0.7282804581108963,0.0016368408718558146,0.2127868455626049])


In [22]:
from pyspark.ml.regression import DecisionTreeRegressor

reg = DecisionTreeRegressor(featuresCol='features', labelCol="balance", impurity="variance")
reg_model = reg.fit(df1)

print(reg_model.featureImportances) #feature importance
reg_model.transform(df1) #future predictions

(7,[0,1,2,3,4,5,6],[0.014930752401876964,0.9567897723288119,0.008926498758154103,0.013297676349694943,0.0012884322082402954,0.004753380771262643,1.3487181959169986e-05])


DataFrame[y_Index: int, features: vector, age: int, balance: int, day: int, duration: int, campaign: int, pdays: int, previous: int, prediction: double]

In [23]:
clf_model.toDebugString
reg_model.toDebugString

'DecisionTreeRegressionModel: uid=DecisionTreeRegressor_bc35959ef131, depth=5, numNodes=59, numFeatures=7\n  If (feature 1 <= 7713.0)\n   If (feature 1 <= 2358.5)\n    If (feature 1 <= 754.5)\n     If (feature 1 <= 223.5)\n      If (feature 1 <= -296.0)\n       Predict: -608.1291834002677\n      Else (feature 1 > -296.0)\n       Predict: 40.10420920887952\n     Else (feature 1 > 223.5)\n      If (feature 1 <= 500.5)\n       Predict: 351.0680972818312\n      Else (feature 1 > 500.5)\n       Predict: 618.1738192280029\n    Else (feature 1 > 754.5)\n     If (feature 1 <= 1403.5)\n      If (feature 1 <= 1033.5)\n       Predict: 885.9440973364025\n      Else (feature 1 > 1033.5)\n       Predict: 1213.117891816921\n     Else (feature 1 > 1403.5)\n      If (feature 1 <= 1965.5)\n       Predict: 1663.0956459671663\n      Else (feature 1 > 1965.5)\n       Predict: 2156.338958180484\n   Else (feature 1 > 2358.5)\n    If (feature 1 <= 4820.5)\n     If (feature 1 <= 3602.5)\n      If (feature 1 <=

In [24]:
filename = "sample_data/bank-full.csv"

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

df2 = spark.read.csv(filename, header=True, inferSchema=True, sep=';')

indexer1 = StringIndexer(inputCol = "job", outputCol = "job_Index")
df2 = indexer1.fit(df2).transform(df2)

df2.show()

+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+---------+
|age|         job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|job_Index|
+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+---------+
| 58|  management| married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|      1.0|
| 44|  technician|  single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|      2.0|
| 33|entrepreneur| married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|      7.0|
| 47| blue-collar| married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|      0.0|
| 33|     unk

In [25]:
from pyspark.sql.types import IntegerType

df2 = df2.withColumn('job_Index', df2['job_Index'].cast(IntegerType()))

In [26]:
indexer2 = StringIndexer(inputCol = "education", outputCol = "education_Index")
df2 = indexer2.fit(df2).transform(df2)

In [27]:
df2 = df2.withColumn('education_Index', df2['education_Index'].cast(IntegerType()))

In [28]:
df2.show()

+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+---------+---------------+
|age|         job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|job_Index|education_Index|
+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+---------+---------------+
| 58|  management| married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|        1|              1|
| 44|  technician|  single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|        2|              0|
| 33|entrepreneur| married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|        7|              0|
| 47| blue-collar| married|  unknown|     no|   1506|    y

In [29]:
target_variable_name = "education_Index"
multiclass_df = df2.select(['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous', 'job_Index', 'education_Index'])
features_list = multiclass_df.columns

#features_list = char_vars #this option is used only for ChiSqselector
features_list.remove(target_variable_name)

# apply the function on our dataframe
multiclass_df = assemble_vectors(multiclass_df, features_list, target_variable_name)

# fitting the one-vs-rest classifier
from pyspark.ml.classification import RandomForestClassifier, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# generate the train/test split.
(train, test) = multiclass_df.randomSplit([0.7, 0.3])

# instantiate the base classifier.
clf = RandomForestClassifier(featuresCol='features', labelCol="education_Index")

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=clf, featuresCol="features", labelCol="education_Index")

# train the multiclass model.
ovrModel = ovr.fit(train)

# score the model on test data.
predictions = ovrModel.transform(test)

# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy", labelCol="education_Index")
# compute the classification error on test data.

accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

## NEED TO CONVERT CATEGORICAL VARIABLES JOB AND EDUCATION INTO NUMERICAL VARIABLES TO PROCEED FURTHER

Test Error = 0.331091


In [30]:
indexer3 = StringIndexer(inputCol = "y", outputCol = "y_Index")
df2 = indexer3.fit(df2).transform(df2)

In [31]:
df2 = df2.withColumn('y_Index', df2['y_Index'].cast(IntegerType()))

In [32]:
target_variable_name = "y_Index"
nonneg_df = df2.select(['age', 'day', 'duration', 'campaign', 'previous', 'y_Index'])

#exclude target variable and select all other feature vectors
features_list = nonneg_df.columns

#features_list = char_vars #this option is used only for ChiSqselector
features_list.remove(target_variable_name)

# apply the function on our DataFrame
nonneg_df = assemble_vectors(nonneg_df, features_list, target_variable_name)

# fit Naïve Bayes model
from pyspark.ml.classification import NaiveBayes
clf = NaiveBayes(featuresCol='features', labelCol="y_Index")
clf_model = clf.fit(nonneg_df)

In [None]:
#A linear or logistic regression model can be built with a perceptron neural network(without a hidden layer)

#New weight = old weight – learningrate * error delta
#          Error delta = input * error * derivative(output)

In [None]:
# Other codes
from typing import Dict, List
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

class PysparkOnehotEncoder:
    def __init__(self):
        self.conditions: Dict[str, Dict[str, int]] = {}
        self.categories: Dict[str, Dict[str, int]] = {}

    def fit(self, df: DataFrame, cat_cols: List[str]) -> None:
        for col in cat_cols:
            cats = df.select(col).distinct().collect()
            length = len(cats)
            cats: List[str] = [df.select(col).distinct().collect()[cat][0] for cat  in range(length)]

            self.conditions[col] = {}
            self.categories[col] = {}
            for idx, cat in enumerate(cats):
                self.conditions[col][f"{col} == '{cat}'"] = idx
                self.categories[col][cat] = idx # useful for reverse_transform at some point

    def predict(self, df: DataFrame) -> DataFrame:
        for col, cats_dict in self.categories.items():
            for cat, _idx in cats_dict.items():
                # Build the dynamic expression using F.when and F.otherwise
                expression = F.expr(f"CASE WHEN {col} == '{cat}' THEN 1 ELSE 0 END")

                # Apply the expression to the DataFrame
                df = df.withColumn(str(col) + "_" + str(cat), F.lit(0))
                df = df.withColumn(str(col) + "_" + str(cat), expression)
            df = df.drop(col)
        return df

    def fit_predict(self, df: DataFrame, cat_cols: List[str]):
        self.fit(df, cat_cols)
        df = self.predict(df)
        return df

In [None]:
# Create features storing categorical & numerical variables, omitting the last column
categorical_cols = [item[0] for item in data.dtypes if item[1].startswith('string')]
print(categorical_cols)

numerical_cols = [item[0] for item in data.dtypes if item[1].startswith('int') | item[1].startswith('double')][:-1]
print(numerical_cols)

['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'poutcome', 'y']
['age', 'balance', 'day', 'duration', 'campaign', 'pdays']


In [None]:
# Print number of categorical as well as numerical features
print(str(len(categorical_cols)) + ' categorical features')
print(str(len(numerical_cols)) + ' numerical features')

10 categorical features
6 numerical features


In [None]:
# Use StringIndexer to convert string/text values into numerical values followed by OneHotEncoderEstimator
# Spark MLLib convert each Stringindexed or transformed values into One Hot Encoded values
# VectorAssembler assemble all the features into 1 vector from multiple columns that contain type double
# Appending every step of the process in a stages array

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
stages = []
for categoricalCol in categorical_cols:
  stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
  #print(stringIndexer)
  OHencoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "_catVec"])
  #print(OHencoder)

stages += [stringIndexer, OHencoder]
print(stages)
assemblerInputs = [c + "_catVec" for c in categorical_cols] + numerical_cols
print(assemblerInputs)
Vectassembler = VectorAssembler(inputCols = assemblerInputs, outputCol="features")

stages += [Vectassembler]

[StringIndexer_cab87a457fe6, OneHotEncoder_31e766b3bc35]
['job_catVec', 'marital_catVec', 'education_catVec', 'default_catVec', 'housing_catVec', 'loan_catVec', 'contact_catVec', 'month_catVec', 'poutcome_catVec', 'y_catVec', 'age', 'balance', 'day', 'duration', 'campaign', 'pdays']
VectorAssembler_eb847efdd671
[StringIndexer_cab87a457fe6, OneHotEncoder_31e766b3bc35, VectorAssembler_eb847efdd671]
