In [1]:
## Spark Libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression ## Logistic regression operations
from pyspark.ml.evaluation import MulticlassClassificationEvaluator ## create an evaluator
from pyspark import SparkContext ## Creating spark context
from pyspark.mllib.util import MLUtils ## Load,save ,preprocess the data in ml library
from pyspark.ml.feature import VectorAssembler ## convert features columns into vector 
import pandas as pd #for dataframe related operations
from pyspark.sql import Row ## New rows can be created
from pyspark.ml.feature import StandardScaler ## Scaling data of different metrics into a common scale
from pyspark.sql.functions import when ##for ifelse statement for preprocessing data
from pyspark.ml import Pipeline, PipelineModel, feature, regression, classification, evaluation ## all ml related 
from pyspark.ml.classification import DecisionTreeClassifier## decision tree operations
from pyspark.mllib.evaluation import BinaryClassificationMetrics ##evaluation performance of the model by auc score 
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metrics 
from sklearn.metrics import roc_curve, auc, classification_report # evaluation metrics
from matplotlib import pyplot as plt ## Plotting operations

In [2]:
#Creating spark session 
spark

In [3]:
#Reading data from data source in databricks
df= spark.read.table('credit_card_csv')

In [4]:
#display imported data
display(df)

In [5]:
df = df.drop('_c0')
df = df.drop('?ID')

In [6]:
df=df.withColumn('SEX', when(df.SEX==1, 0).otherwise(1)) 

In [7]:
df=df.withColumn('MARRIAGE', when(df.MARRIAGE==1, 0).otherwise(1))

In [8]:
#displaying refined data
display(df)

In [9]:
## Splitting Data Set
split = df.randomSplit([0.6,0.3,0.1]) #Defining split ratios train = 60%, validation= 30%, test = 10%
train = split[0]
validation= split[1]
test = split[2]

In [10]:
x = train.count()
z = validation.count()
y = test.count()


In [11]:
## Creating Dataframe  for training, validation and testing data 
l = [('Training',x),('Validation',z),('Testing',y)] ##put values for dataframe into list
rdd = sc.parallelize(l)
datasplit = rdd.map(lambda x: Row(DataType=x[0], Size=int(x[1])))
datasplt = sqlContext.createDataFrame(datasplit)

In [12]:
datasplt.show()

In [13]:
datasplt.show()
display(datasplt)

In [14]:
display(train)

In [16]:
assembler1 = VectorAssembler(inputCols=["LIMIT_BAL","AGE","BILL_AMT1","BILL_AMT2","BILL_AMT3","BILL_AMT4","BILL_AMT5","BILL_AMT6","PAY_AMT1","PAY_AMT2","PAY_AMT3","PAY_AMT4","PAY_AMT5","PAY_AMT6"],outputCol = 'scalingfeatures')

In [17]:
scaler_1 = StandardScaler(inputCol="scalingfeatures",outputCol="scaledFeatures",withStd=True, withMean=False)
scaled_training = assembler1.transform(df)
scalerModel = scaler_1.fit(scaled_training)

In [18]:
assembler2 = VectorAssembler(inputCols=["SEX","EDUCATION","MARRIAGE","PAY_0","PAY_2","PAY_3","PAY_4","PAY_5","PAY_6","scaledFeatures"],outputCol = 'features')

In [19]:
lr_2 = LogisticRegression(featuresCol='features', labelCol='default payment next month')
pipe_lr= Pipeline(stages=[assembler1,scaler_1,assembler2,lr_2])
pipe_model= pipe_lr.fit(train)

In [20]:
predictions_pipe = pipe_model.transform(validation)

In [21]:
print("Binomial coefficients: " + str(pipe_model.stages[3].coefficientMatrix))
print("Binomial intercepts: " + str(pipe_model.stages[3].interceptVector))

In [22]:
coeff=[stage.coefficients for stage in pipe_model.stages if hasattr(stage, "coefficients")]
coeff = pd.DataFrame(coeff)


In [23]:
from pyspark.sql import SQLContext

In [24]:
sqlContext = SQLContext(sc)
sqlCxt = sqlContext.createDataFrame(coeff)
display(sqlCxt)

In [25]:
display(predictions_pipe)

In [26]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bce = BinaryClassificationEvaluator()
bce.setLabelCol('default payment next month')
auclr = bce.evaluate(pipe_model.transform(validation))
auclr

In [27]:
predicted = predictions_pipe.select("features","prediction","default payment next month")

In [28]:
display(predicted)

In [29]:
## Defining Accuracy Evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="default payment next month", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_pipe)

In [30]:
accu1 = accuracy*100
print accu1

In [31]:

reg_param = 0.02
en_param = 0.3

lr_enr = classification.LogisticRegression(labelCol= 'default payment next month' , featuresCol="features").setRegParam(reg_param).\
       setElasticNetParam(en_param)
pipe_enr= Pipeline(stages=[assembler1,scaler_1,assembler2,lr_enr])
lr_2model  = pipe_enr.fit(train)


In [32]:
print("Binomial coefficients: " + str(lr_2model.stages[3].coefficientMatrix))
print("Binomial intercepts: " + str(lr_2model.stages[3].interceptVector))

In [33]:
from pyspark.sql import SQLContext

In [34]:
sqlContext = SQLContext(sc)
sqlCxt = sqlContext.createDataFrame(coeff)
display(sqlCxt)

In [35]:
## Predictions based on Elastic Net Regression
prediction_enr = lr_2model.transform(validation)

In [36]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bce = BinaryClassificationEvaluator()
bce.setLabelCol('default payment next month')
aucENR = bce.evaluate(lr_2model.transform(validation))
aucENR

In [37]:
dt = DecisionTreeClassifier(labelCol="default payment next month", featuresCol="features")
pipe_dt= Pipeline(stages=[assembler1,scaler_1,assembler2,dt])

In [38]:
model1 = pipe_dt.fit(train)

In [39]:
treeModel = model1.stages[3]

In [40]:
predictions2 = model1.transform(validation)

In [41]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bce = BinaryClassificationEvaluator()
bce.setLabelCol('default payment next month')
aucDT = bce.evaluate(model1.transform(validation))
aucDT

In [42]:
display(predictions2)

In [43]:

from pyspark.mllib.evaluation import BinaryClassificationMetrics as metrics
results = predictions2.select(['probability','default payment next month'])
results_collected = results.collect()
results_list=[(float(i[0][0]),1.0-float(i[1])) for i in results_collected]
scoresandlabels = sc.parallelize(results_list)

metrics = metrics(scoresandlabels)
aucDT = metrics.areaUnderROC
print(metrics.areaUnderROC)

In [44]:
predictions2.select("prediction", "default payment next month", "features").show(5)

In [45]:
accuracy2 = evaluator.evaluate(predictions2)

In [46]:
aacu2 = accuracy2*100
print aacu2

In [47]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="default payment next month", featuresCol="features", numTrees=10)
pipe_rf= Pipeline(stages=[assembler1,scaler_1,assembler2,rf])

In [48]:
model2 = pipe_rf.fit(train)
predictions3 = model2.transform(validation)

In [49]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bce = BinaryClassificationEvaluator()
bce.setLabelCol('default payment next month')
aucRF = bce.evaluate(model2.transform(validation))
aucRF

In [50]:
predictions3.select("features","default payment next month","prediction").show(5)

In [51]:
importances = model2.stages[3].featureImportances
importances

In [52]:
accuracy3 = evaluator.evaluate(predictions3)
accu3 = accuracy3*100
print ("accuracy of Random Forest : ", accu3,"%")

In [53]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="default payment next month", featuresCol="features", maxIter=10)
pipe_gbt= Pipeline(stages=[assembler1,scaler_1,assembler2,gbt])

model4 = pipe_gbt.fit(train)

predictions4 = model4.transform(validation)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
bce = BinaryClassificationEvaluator()
bce.setLabelCol('default payment next month')
aucGBM = bce.evaluate(model4.transform(validation))
aucGBM


In [54]:
predictions4.select("prediction", "default payment next month", "features").show(5)

accuracy4 = evaluator.evaluate(predictions4)

accu4 = accuracy4*100
print("Accuracy of Gradient boostin",accu4,"%")

In [55]:
## Calculating Auc score for GBM model
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metrics
results = predictions4.select(['probability','default payment next month'])
results_collected = results.collect()
results_list=[(float(i[0][0]),1.0-float(i[1])) for i in results_collected]
scoresandlabels = sc.parallelize(results_list)

metrics = metrics(scoresandlabels)
aucGBM = metrics.areaUnderROC
print(metrics.areaUnderROC)

In [56]:
##Creating Dataframe for visualization 
from pyspark.sql import Row
from pyspark.sql import SQLContext
l = [('Logistic Regression',auclr*100),('Elastic Net',aucENR*100),('Decision Tree',aucDT*100),('Random Forest',aucRF*100),('Gradient Boosting',aucGBM*100)]
rdd = sc.parallelize(l)
performance = rdd.map(lambda x: Row(Model=x[0], Auc_Score=int(x[1])))
modelperformance = sqlContext.createDataFrame(performance)

In [57]:
modelperformance.show()

In [58]:
display(modelperformance)

In [59]:
## Best Model is Random Forest and Gradient boosting
## Running Random Forest on Test Data
predictions_test = model2.transform(test)

In [60]:
accuracy_final= evaluator.evaluate(predictions_test)
accuracy_final

In [61]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bce = BinaryClassificationEvaluator()
bce.setLabelCol('default payment next month')
auc_final = bce.evaluate(model2.transform(validation))
auc_final

In [62]:
### Calculating AUC score and plotting ROC curve
fpr = dict()
tpr = dict()
roc_auc = dict()
 
y_test1 = [i[1] for i in results_list]
y_score1 = [i[0] for i in results_list]
 
fpr, tpr, threshold = roc_curve(y_test1, y_score1)
roc_auc = auc(fpr, tpr)
 
%matplotlib inline
plt.figure()
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()

display()

In [63]:
## input values
sex= 0
marriage= 0
Pay_0=-1
Pay_2=2
Pay_3=2
Pay_4=4
Pay_5=3
Pay_6=2
Limit_Bal=100000
Age=46
Bill_AMT1=6277
PAY_AMT1=1000

from pyspark.sql import Row
from pyspark.sql import SQLContext
l = [(sex,marriage,Pay_0,Pay_2,Pay_3,Pay_4,Pay_5,Pay_6,Limit_Bal,Age,Bill_AMT1,PAY_AMT1)]
rdd = sc.parallelize(l)
testing_performance = rdd.map(lambda x: Row(SEX=x[0], MARRIAGE=int(x[1]),PAY_0=int(x[2]),PAY_2=int(x[3]),PAY_3=int(x[4]),PAY_4=int(x[5]),PAY_5=int(x[6]),PAY_6=int(x[7]),LIMIT_BAL=int(x[8]), AGE=int(x[9]),BILL_AMT1=int(x[10]),PAY_AMT1=int(x[11])))
testingperformance = sqlContext.createDataFrame(testing_performance)

In [64]:
testingperformance.show()

In [65]:
## model building
assembler_test = VectorAssembler(inputCols=["LIMIT_BAL","AGE","BILL_AMT1","PAY_AMT1"],outputCol = 'scalingfeatures')

In [66]:
scaler_test = StandardScaler(inputCol="scalingfeatures",outputCol="scaledFeatures",withStd=True, withMean=False)
scaled_training = assembler_test.transform(df)
scaler_testModel = scaler_test.fit(scaled_training)

In [67]:
assembler2_test = VectorAssembler(inputCols=["SEX","MARRIAGE","PAY_0","PAY_2","PAY_3","PAY_4","PAY_5","PAY_6","scaledFeatures"],outputCol = 'features')

In [68]:
reg_param=0.02
en_param=0.3

lr_enr_test = classification.LogisticRegression(labelCol= 'default payment next month' , featuresCol="features").setRegParam(reg_param).\
       setElasticNetParam(en_param)
pipe_enr_test= Pipeline(stages=[assembler_test,scaler_test,assembler2_test,lr_enr_test])
lr_2model_test = pipe_enr_test.fit(train)

In [69]:
lr_2model_test.stages[3].coefficientMatrix

In [70]:

test_prediction = lr_2model_test.transform(testingperformance)

In [71]:
test_prediction.select("probability","prediction").show()