In [1]:
# Importing the library
import pyspark
import findspark
from pyspark.sql import SparkSession

# Creating object of spark session
spark = SparkSession.builder.appName('bank').getOrCreate()

In [2]:
# Read the csv file
df = spark.read.csv('bank.csv' , header = True , inferSchema = True )


In [3]:
# Checking dimensions
print(df.count(),len(df.columns))

11162 17


In [4]:
# Check for schema
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [5]:
# Top 5 rows
df.show(5)

+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|       job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
| 59|    admin.|married|secondary|     no|   2343|    yes|  no|unknown|  5|  may|    1042|       1|   -1|       0| unknown|    yes|
| 56|    admin.|married|secondary|     no|     45|     no|  no|unknown|  5|  may|    1467|       1|   -1|       0| unknown|    yes|
| 41|technician|married|secondary|     no|   1270|    yes|  no|unknown|  5|  may|    1389|       1|   -1|       0| unknown|    yes|
| 55|  services|married|secondary|     no|   2476|    yes|  no|unknown|  5|  may|     579|       1|   -1|       0| unknown|    yes|
| 54|    admin.|married| tertiary|     no|    184|     no|  no|unknown|  5| 

In [6]:
#Droping unwanted columns
my_data = df.drop(*['contact' , 'day' , 'month' , 'default'])

In [7]:
# Null value count of columns
Dict_Null = {col:my_data.filter(my_data[col].isNull()).count() for col in my_data.columns}
Dict_Null

{'age': 0,
 'job': 0,
 'marital': 0,
 'education': 0,
 'balance': 0,
 'housing': 0,
 'loan': 0,
 'duration': 0,
 'campaign': 0,
 'pdays': 0,
 'previous': 0,
 'poutcome': 0,
 'deposit': 0}

In [8]:
my_data.describe().show()

+-------+------------------+-------+--------+---------+------------------+-------+-----+------------------+------------------+------------------+------------------+--------+-------+
|summary|               age|    job| marital|education|           balance|housing| loan|          duration|          campaign|             pdays|          previous|poutcome|deposit|
+-------+------------------+-------+--------+---------+------------------+-------+-----+------------------+------------------+------------------+------------------+--------+-------+
|  count|             11162|  11162|   11162|    11162|             11162|  11162|11162|             11162|             11162|             11162|             11162|   11162|  11162|
|   mean|41.231947679627304|   null|    null|     null|1528.5385235620856|   null| null|371.99381831213043| 2.508421429851281| 51.33040673714388|0.8325568894463358|    null|   null|
| stddev|11.913369192215518|   null|    null|     null| 3225.413325946149|   null| null|34

In [9]:
# value counts of columns
my_data.groupBy('job').count().show()
print()
my_data.groupBy('marital').count().show()
print()
my_data.groupBy('education').count().show()
print()
my_data.groupBy('loan').count().show()
print()
my_data.groupBy('poutcome').count().show()
print()
my_data.groupBy('deposit').count().show()

+-------------+-----+
|          job|count|
+-------------+-----+
|   management| 2566|
|      retired|  778|
|      unknown|   70|
|self-employed|  405|
|      student|  360|
|  blue-collar| 1944|
| entrepreneur|  328|
|       admin.| 1334|
|   technician| 1823|
|     services|  923|
|    housemaid|  274|
|   unemployed|  357|
+-------------+-----+


+--------+-----+
| marital|count|
+--------+-----+
|divorced| 1293|
| married| 6351|
|  single| 3518|
+--------+-----+


+---------+-----+
|education|count|
+---------+-----+
|  unknown|  497|
| tertiary| 3689|
|secondary| 5476|
|  primary| 1500|
+---------+-----+


+----+-----+
|loan|count|
+----+-----+
|  no| 9702|
| yes| 1460|
+----+-----+


+--------+-----+
|poutcome|count|
+--------+-----+
| success| 1071|
| unknown| 8326|
|   other|  537|
| failure| 1228|
+--------+-----+


+-------+-----+
|deposit|count|
+-------+-----+
|     no| 5873|
|    yes| 5289|
+-------+-----+



In [10]:
my_data.dtypes

[('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('balance', 'int'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('deposit', 'string')]

In [11]:
#Preprocessing steps
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# create object of StringIndexer class and specify input and output column
SI_job = StringIndexer(inputCol='job',outputCol='job_Index')
SI_marital = StringIndexer(inputCol='marital',outputCol='marital_Index')
SI_education = StringIndexer(inputCol='education',outputCol='education_Index')
SI_housing = StringIndexer(inputCol='housing',outputCol='housing_Index')
SI_loan = StringIndexer(inputCol='loan',outputCol='loan_Index')
SI_poutcome = StringIndexer(inputCol='poutcome',outputCol='poutcome_Index')
SI_deposit = StringIndexer(inputCol='deposit',outputCol='deposit_Index')


# transform the data
my_data = SI_job.fit(my_data).transform(my_data)
my_data = SI_marital.fit(my_data).transform(my_data)
my_data = SI_education.fit(my_data).transform(my_data)
my_data = SI_housing.fit(my_data).transform(my_data)
my_data = SI_loan.fit(my_data).transform(my_data)
my_data = SI_poutcome.fit(my_data).transform(my_data)
my_data = SI_deposit.fit(my_data).transform(my_data)

In [12]:
# view the transformed data
my_data.select('job', 'job_Index', 'marital', 'marital_Index','housing','housing_Index','poutcome','poutcome_Index','deposit','deposit_Index').show(10)

+----------+---------+--------+-------------+-------+-------------+--------+--------------+-------+-------------+
|       job|job_Index| marital|marital_Index|housing|housing_Index|poutcome|poutcome_Index|deposit|deposit_Index|
+----------+---------+--------+-------------+-------+-------------+--------+--------------+-------+-------------+
|    admin.|      3.0| married|          0.0|    yes|          1.0| unknown|           0.0|    yes|          1.0|
|    admin.|      3.0| married|          0.0|     no|          0.0| unknown|           0.0|    yes|          1.0|
|technician|      2.0| married|          0.0|    yes|          1.0| unknown|           0.0|    yes|          1.0|
|  services|      4.0| married|          0.0|    yes|          1.0| unknown|           0.0|    yes|          1.0|
|    admin.|      3.0| married|          0.0|     no|          0.0| unknown|           0.0|    yes|          1.0|
|management|      0.0|  single|          1.0|    yes|          1.0| unknown|           0

In [13]:
# create object and specify input and output column
OHE = OneHotEncoder(inputCols=['job_Index', 'marital_Index','education_Index','housing_Index','loan_Index','poutcome_Index','deposit_Index'],outputCols=['job_OHE', 'marital_OHE','education_OHE','housing_OHE','loan_OHE','poutcome_OHE','deposit_OHE'])

# transform the data
my_data = OHE.fit(my_data).transform(my_data)

# view and transform the data
my_data.select('job', 'job_Index', 'job_OHE','education','education_Index','education_OHE').show(10)

+----------+---------+--------------+---------+---------------+-------------+
|       job|job_Index|       job_OHE|education|education_Index|education_OHE|
+----------+---------+--------------+---------+---------------+-------------+
|    admin.|      3.0|(11,[3],[1.0])|secondary|            0.0|(3,[0],[1.0])|
|    admin.|      3.0|(11,[3],[1.0])|secondary|            0.0|(3,[0],[1.0])|
|technician|      2.0|(11,[2],[1.0])|secondary|            0.0|(3,[0],[1.0])|
|  services|      4.0|(11,[4],[1.0])|secondary|            0.0|(3,[0],[1.0])|
|    admin.|      3.0|(11,[3],[1.0])| tertiary|            1.0|(3,[1],[1.0])|
|management|      0.0|(11,[0],[1.0])| tertiary|            1.0|(3,[1],[1.0])|
|management|      0.0|(11,[0],[1.0])| tertiary|            1.0|(3,[1],[1.0])|
|   retired|      5.0|(11,[5],[1.0])|secondary|            0.0|(3,[0],[1.0])|
|technician|      2.0|(11,[2],[1.0])|secondary|            0.0|(3,[0],[1.0])|
|  services|      4.0|(11,[4],[1.0])|secondary|            0.0|(

In [14]:
from pyspark.ml.feature import VectorAssembler

# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['age',
                                       'job_Index',
                                       'marital_Index',
                                       'education_Index',
                                       'balance',
                                       'housing_Index',
                                       'loan_Index',
                                       'duration',
                                       'campaign',
                                       'pdays',
                                       'previous',
                                       'poutcome_Index',
                                       'job_OHE',
                                       'marital_OHE',
                                       'housing_OHE',
                                       'education_OHE',
                                       'loan_OHE',
                                       'poutcome_OHE'],
                           outputCol='features')

# fill the null values
my_data = my_data.fillna(0)

# transform the data
final_data = assembler.transform(my_data)


In [15]:
# view the transformed vector
final_data.select('features','deposit_Index').show()

+--------------------+-------------+
|            features|deposit_Index|
+--------------------+-------------+
|(33,[0,1,4,5,7,8,...|          1.0|
|(33,[0,1,4,7,8,9,...|          1.0|
|(33,[0,1,4,5,7,8,...|          1.0|
|(33,[0,1,4,5,7,8,...|          1.0|
|(33,[0,1,3,4,7,8,...|          1.0|
|(33,[0,2,3,5,6,7,...|          1.0|
|(33,[0,3,4,5,6,7,...|          1.0|
|(33,[0,1,2,4,5,7,...|          1.0|
|(33,[0,1,4,5,7,8,...|          1.0|
|(33,[0,1,2,4,5,7,...|          1.0|
|(33,[0,1,2,4,5,7,...|          1.0|
|(33,[0,1,4,5,7,8,...|          1.0|
|(33,[0,3,4,5,6,7,...|          1.0|
|(33,[0,1,2,3,4,5,...|          1.0|
|(33,[0,1,2,3,4,5,...|          1.0|
|(33,[0,2,3,4,5,7,...|          1.0|
|(33,[0,1,2,3,4,5,...|          1.0|
|(33,[0,1,4,5,7,8,...|          1.0|
|(33,[0,1,4,5,7,8,...|          1.0|
|(33,[0,1,2,4,5,6,...|          1.0|
+--------------------+-------------+
only showing top 20 rows



In [16]:
#Model_Dataframe
model_df = final_data.select(['features','deposit_Index'])
model_df = model_df.withColumnRenamed("deposit_Index","label")
model_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



In [17]:
#Split into training & testing Dataframe
training_df,test_df = model_df.randomSplit([0.75,0.25])

In [18]:
#Create a logistic regression model object
from pyspark.ml.classification import LogisticRegression
log_reg=LogisticRegression().fit(training_df)

In [19]:
lr_summary=log_reg.summary

In [20]:
#Overall accuracy of the classification model
lr_summary.accuracy

0.7998570407433881

In [21]:
#Precision of both classes
print(lr_summary.precisionByLabel)

[0.788539662176609, 0.8140973903685768]
