# subscribed a deposit with LogisticRegression


In [224]:
import findspark
findspark.init() 

In [225]:
import pyspark
from pyspark.sql import SparkSession
spark= SparkSession.builder.getOrCreate()

## read file

In [226]:
df =spark.read.csv('bank-full.csv', header=True, inferSchema=True)

In [227]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- Target: string (nullable = true)



In [228]:
# Find Count of Null, None, NaN of All DataFrame Columns
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()

+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+------+
|age|job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|Target|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+------+
|  0|  0|      0|        0|      0|      0|      0|   0|      0|  0|    0|       0|       0|    0|       0|       0|     0|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+------+



Input variables:
   # bank client data:
   1 - age (numeric)
   
   2 - job : type of job (categorical: "admin.","unknown","unemployed","management","housemaid","entrepreneur","student",
                                       "blue-collar","self-employed","retired","technician","services")
                                       
   3 - marital : marital status (categorical: "married","divorced","single"; note: "divorced" means divorced or widowed)
   
   4 - education (categorical: "unknown","secondary","primary","tertiary")
   
   5 - default: has credit in default? (binary: "yes","no")
   
   6 - balance: average yearly balance, in euros (numeric)
   
   7 - housing: has housing loan? (binary: "yes","no")
   
   8 - loan: has personal loan? (binary: "yes","no")
   # related with the last contact of the current campaign:
   9 - contact: contact communication type (categorical: "unknown","telephone","cellular")
   
  10 - day: last contact day of the month (numeric)
  
  11 - month: last contact month of year (categorical: "jan", "feb", "mar", ..., "nov", "dec")
  
  12 - duration: last contact duration, in seconds (numeric)
   # other attributes:
  13 - campaign: number of contacts performed during this campaign and for this client (numeric, includes last contact)
  
  14 - pdays: number of days that passed by after the client was last contacted from a previous campaign (numeric, -1 means client was not previously contacted)
  
  15 - previous: number of contacts performed before this campaign and for this client (numeric)
  
  16 - poutcome: outcome of the previous marketing campaign (categorical: "unknown","other","failure","success")

  Output variable (desired target):
  17 - y - has the client subscribed a term deposit? (binary: "yes","no")

## indexing the output

In [229]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler 

In [230]:
indexer = StringIndexer(inputCol='Target', outputCol='TargetOHE')

In [231]:
indexer_fitted = indexer.fit(df)

In [232]:
df = indexer_fitted.transform(df)


In [233]:
df.show(5)

+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+------+---------+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|Target|TargetOHE|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+------+---------+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown|    no|      0.0|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown|    no|      0.0|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown|    no|      0.0|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown|    no|      0.0|

## split data to train and test


In [234]:
trainDF, testDF= df.randomSplit([0.8,0.2], seed= 42)

In [235]:
trainDF.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- Target: string (nullable = true)
 |-- TargetOHE: double (nullable = false)



## OneHotEncoding


In [236]:
#extract the categorical columns using list comprehension
CatCols= [c for (c,d) in trainDF.dtypes if ((d=='string') and c !='Target')]
CatCols

['job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'poutcome']

In [237]:
indCols= [c+ "Index" for c in CatCols]
indCols 

['jobIndex',
 'maritalIndex',
 'educationIndex',
 'defaultIndex',
 'housingIndex',
 'loanIndex',
 'contactIndex',
 'monthIndex',
 'poutcomeIndex']

In [238]:
oheCols= [c+ "OHE" for c in CatCols]
oheCols

['jobOHE',
 'maritalOHE',
 'educationOHE',
 'defaultOHE',
 'housingOHE',
 'loanOHE',
 'contactOHE',
 'monthOHE',
 'poutcomeOHE']

In [239]:
StrInd= StringIndexer(inputCols= CatCols, outputCols=indCols)

In [240]:
oheEncoder= OneHotEncoder(inputCols= indCols, outputCols=oheCols)

In [241]:
numericCols= [c for (c,d) in trainDF.dtypes if (d=='int')]
numericCols

['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous']

In [242]:
assemblerCols= oheCols + numericCols # kol el coloums
assemblerCols

['jobOHE',
 'maritalOHE',
 'educationOHE',
 'defaultOHE',
 'housingOHE',
 'loanOHE',
 'contactOHE',
 'monthOHE',
 'poutcomeOHE',
 'age',
 'balance',
 'day',
 'duration',
 'campaign',
 'pdays',
 'previous']

In [243]:
VecAssembler = VectorAssembler(inputCols=assemblerCols, outputCol='features') 


## Create model 

In [244]:
from pyspark.ml.classification import LogisticRegression

In [245]:
logreg = LogisticRegression(featuresCol='features',labelCol='TargetOHE')

In [246]:
from pyspark.ml import Pipeline
pipeline_LogReg= Pipeline(stages= [StrInd,oheEncoder,VecAssembler,logreg])

In [247]:
pipeline_LogReg_model=pipeline_LogReg.fit(trainDF)

In [248]:
predDF_LogReg = pipeline_LogReg_model.transform(testDF)

In [249]:
predDF_LogReg.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- Target: string (nullable = true)
 |-- TargetOHE: double (nullable = false)
 |-- defaultIndex: double (nullable = false)
 |-- loanIndex: double (nullable = false)
 |-- monthIndex: double (nullable = false)
 |-- poutcomeIndex: double (nullable = false)
 |-- educationIndex: double (nullable = false)
 |-- contactIndex: double (nullable = false)
 |-- maritalIndex: double (nu

In [250]:
predDF_LogReg.select('TargetOHE','prediction').show()

+---------+----------+
|TargetOHE|prediction|
+---------+----------+
|      0.0|       0.0|
|      0.0|       0.0|
|      1.0|       1.0|
|      0.0|       0.0|
|      1.0|       1.0|
|      0.0|       0.0|
|      1.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      1.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      1.0|       0.0|
+---------+----------+
only showing top 20 rows



### Make model evaluation 


In [251]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="TargetOHE", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predDF_LogReg)
print("Test Error = %g " % (1.0 - accuracy))
print("Test accuracy = %g " % (accuracy))

Test Error = 0.096817 
Test accuracy = 0.903183 


### Save Model

In [269]:
pipeline_LogReg_model.save('Untitled Folder 2')

In [270]:
pipeline_LogReg_model.write().overwrite().save('pipeline_LogReg_model')