In [1]:
bankdata=spark.read.options(header="true",inferSchema="true",delimiter = ";").csv("/home/mojeed/bank-full.csv")


In [2]:
datapd=bankdata.toPandas()

In [3]:
bankdata.printSchema()


root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)



In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer,VectorAssembler

In [14]:
categoricalcols=["marital","education","default","housing","loan","contact","month","campaign","poutcome"]

In [15]:
numericalcols=["age","balance","day","duration","campaign","pdays"]

In [16]:
stages=[]
for category in categoricalcols:
    stringIndexer=StringIndexer(inputCol=category,outputCol=category+"index")
    encoder=OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],outputCols=[category+"classVec"])
    stages+=[stringIndexer,encoder]
    

In [19]:
label_stringIdx=StringIndexer(inputCol="y",outputCol="label")
stages+=[label_stringIdx]

In [33]:
assemblerInputs=[c+"classVec" for c in categoricalcols]+numericalcols
assembler=VectorAssembler(inputCols=assemblerInputs,outputCol="features")
stages+=[assembler]

In [34]:
from pyspark.ml.classification import LogisticRegression

In [35]:
partialPipeline=Pipeline().setStages(stages)

In [36]:
ppelineModel=partialPipeline.fit(bankdata)

In [37]:
preppedDataDF=ppelineModel.transform(bankdata)

In [38]:
lrModel=LogisticRegression().fit(preppedDataDF)

In [39]:
cols=bankdata.columns

In [41]:
selectedcols=["label","features"]+cols

In [42]:
dataset=preppedDataDF.select(selectedcols)

In [45]:
(train,test)=dataset.randomSplit([0.7,0.3],seed=100)

In [47]:
print(train.count())

31661


In [48]:
print(test.count())

13550


In [50]:
lr=LogisticRegression(labelCol="label",featuresCol="features",maxIter=10)

In [52]:
lrModel=lr.fit(train)

In [53]:
predictions=lrModel.transform(test)

In [54]:
selected=predictions.select("label","prediction")

In [56]:
selected.show(4)

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 4 rows



In [58]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [59]:
evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.9095245532633057

In [60]:
evaluator.getMetricName()

'areaUnderROC'

In [61]:
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The

In [64]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator

In [66]:
paramGrid=(ParamGridBuilder().addGrid(lr.regParam,[0.01,0.5,2.0])
          .addGrid(lr.elasticNetParam,[0.0,0.5,1.0])
          .addGrid(lr.maxIter,[1,5,10]).build())

In [70]:
cv=CrossValidator(estimator=lr,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=5)

In [71]:
cvModel=cv.fit(train)

In [72]:
predictions=cvModel.transform(test)

In [73]:
evaluator.evaluate(predictions)

0.9089000769592595

In [75]:
from pyspark.ml.classification import RandomForestClassifier
rf=RandomForestClassifier(labelCol="label",featuresCol="features")
rfModel=rf.fit(train)
prediction=rfModel.transform(test)
predictions.printSchema()
selected=predictions.select("label","prediction")
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



0.9089000769592595

In [76]:
paramGrid=(ParamGridBuilder().addGrid(rf.maxDepth,[2,4,6])
          .addGrid(rf.maxBins,[20,60])
          .addGrid(rf.numTrees,[5,20]).build())

In [77]:
cv=CrossValidator(estimator=rf,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=5)

In [78]:
cvModel=cv.fit(train)

In [79]:
predictions=cvModel.transform(test)

In [80]:
evaluator.evaluate(predictions)

0.9009351573698009