## Project
# The following procedural of the pipeline were followed:

1. Data Loading

2. Model Learning

3. Model Evaluation

# The following logistic regression models have been deployed and the accuracies observed against each are:

1. RandomForestClassifier (79%)
2. LogisticRegression (81%)
3. DecisionTreeClassifier (82)%
4. MultilayerPerceptronClassifier (82%)



In [169]:
# Import findspark 
import findspark

# Or use this alternative
findspark.init()

In [170]:
# Import SparkSession
from pyspark.sql import SparkSession

# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Project") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext
#sc.stop()

In [171]:
from pyspark.sql import SQLContext

In [172]:
sqlContext = SQLContext(sc) 

In [173]:
from pyspark.sql.types import *

In [195]:
sqlContext = SQLContext(sc)
schema = StructType([ \
    StructField("ID", DoubleType(), True), \
    StructField("LIMIT_BAL", DoubleType(), True), \
    StructField("SEX", DoubleType(), True), \
    StructField("EDUCATION",DoubleType() , True), \
    StructField("MARRIAGE",DoubleType() , True), \
    StructField("AGE", DoubleType(), True), \
    StructField("PAY_0", DoubleType(), True), \
    StructField("PAY_2", DoubleType(), True), \
    StructField("PAY_3", DoubleType(), True), \
    StructField("PAY_4", DoubleType(), True), \
    StructField("PAY_5", DoubleType(), True), \
    StructField("PAY_6", DoubleType(), True), \
    StructField("BILL_AMT1", DoubleType(), True), \
    StructField("BILL_AMT2", DoubleType(), True), \
    StructField("BILL_AMT3", DoubleType(), True), \
    StructField("BILL_AMT4", DoubleType(), True), \
    StructField("BILL_AMT5", DoubleType(), True), \
    StructField("BILL_AMT6", DoubleType(), True), \
    StructField("PAY_AMT1", DoubleType(), True), \
    StructField("PAY_AMT2", DoubleType(), True), \
    StructField("PAY_AMT3", DoubleType(), True), \
    StructField("PAY_AMT4", DoubleType(), True), \
    StructField("PAY_AMT5", DoubleType(), True), \
    StructField("PAY_AMT6", DoubleType(), True), \
    StructField("default payment next month", DoubleType(), True)])


In [175]:
default_data = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .load('C:\\Users\\sures\\Downloads\\default_credit_card.csv', schema = schema)

In [176]:
#default_data = spark.read.load("C:\\Users\\sures\\Downloads\\default_credit_card.csv", header= 'true',format = "csv")
#default_data.show(5)

In [177]:
#default_data.na.drop()
default_data.take(5)

[Row(ID=None, LIMIT_BAL=None, SEX=None, EDUCATION=None, MARRIAGE=None, AGE=None, PAY_0=None, PAY_2=None, PAY_3=None, PAY_4=None, PAY_5=None, PAY_6=None, BILL_AMT1=None, BILL_AMT2=None, BILL_AMT3=None, BILL_AMT4=None, BILL_AMT5=None, BILL_AMT6=None, PAY_AMT1=None, PAY_AMT2=None, PAY_AMT3=None, PAY_AMT4=None, PAY_AMT5=None, PAY_AMT6=None, default payment next month=None),
 Row(ID=1.0, LIMIT_BAL=20000.0, SEX=2.0, EDUCATION=2.0, MARRIAGE=1.0, AGE=24.0, PAY_0=2.0, PAY_2=2.0, PAY_3=-1.0, PAY_4=-1.0, PAY_5=-2.0, PAY_6=-2.0, BILL_AMT1=3913.0, BILL_AMT2=3102.0, BILL_AMT3=689.0, BILL_AMT4=0.0, BILL_AMT5=0.0, BILL_AMT6=0.0, PAY_AMT1=0.0, PAY_AMT2=689.0, PAY_AMT3=0.0, PAY_AMT4=0.0, PAY_AMT5=0.0, PAY_AMT6=0.0, default payment next month=1.0),
 Row(ID=2.0, LIMIT_BAL=120000.0, SEX=2.0, EDUCATION=2.0, MARRIAGE=2.0, AGE=26.0, PAY_0=-1.0, PAY_2=2.0, PAY_3=0.0, PAY_4=0.0, PAY_5=0.0, PAY_6=2.0, BILL_AMT1=2682.0, BILL_AMT2=1725.0, BILL_AMT3=2682.0, BILL_AMT4=3272.0, BILL_AMT5=3455.0, BILL_AMT6=3261.0, PAY_

In [178]:
(train, test) = default_data.randomSplit([0.75, 0.25])

In [179]:
print("number of records in training data are: ", train.count())
print("number of records in test data are: ", test.count())

number of records in training data are:  22415
number of records in test data are:  7589


In [180]:
#from pyspark.ml.feature import StringIndexer

In [181]:
#label_indexer = StringIndexer(inputCol = 'churned', outputCol = 'label')

In [182]:
#plan_indexer = StringIndexer(inputCol = 'intl_plan', outputCol = 'intl_plan_indexed')

In [183]:
from pyspark.ml.feature import VectorAssembler

In [184]:
numeric_cols = ["LIMIT_BAL", "SEX", "EDUCATION",
                        "MARRIAGE", "AGE", "PAY_0", "PAY_2","PAY_3","PAY_4","PAY_5","PAY_6",
                        "BILL_AMT1", "BILL_AMT2","BILL_AMT3","BILL_AMT4","BILL_AMT5","BILL_AMT6",
                        "PAY_AMT1", "PAY_AMT2","PAY_AMT3","PAY_AMT4","PAY_AMT5","PAY_AMT6"]

In [185]:
assembler = VectorAssembler(
    inputCols = numeric_cols,
    outputCol = 'features')

In [186]:
assembler.setHandleInvalid("skip").transform(default_data).show

<bound method DataFrame.show of DataFrame[ID: double, LIMIT_BAL: double, SEX: double, EDUCATION: double, MARRIAGE: double, AGE: double, PAY_0: double, PAY_2: double, PAY_3: double, PAY_4: double, PAY_5: double, PAY_6: double, BILL_AMT1: double, BILL_AMT2: double, BILL_AMT3: double, BILL_AMT4: double, BILL_AMT5: double, BILL_AMT6: double, PAY_AMT1: double, PAY_AMT2: double, PAY_AMT3: double, PAY_AMT4: double, PAY_AMT5: double, PAY_AMT6: double, default payment next month: double, features: vector]>

In [187]:
from pyspark.ml.classification import RandomForestClassifier
# labelCol = Target Variable , featuresCol = Input Variables
classifier = RandomForestClassifier(labelCol = 'default payment next month', featuresCol = 'features')


from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, classifier])
model = pipeline.fit(train)
predictions = model.transform(test)

In [188]:
predictions

DataFrame[ID: double, LIMIT_BAL: double, SEX: double, EDUCATION: double, MARRIAGE: double, AGE: double, PAY_0: double, PAY_2: double, PAY_3: double, PAY_4: double, PAY_5: double, PAY_6: double, BILL_AMT1: double, BILL_AMT2: double, BILL_AMT3: double, BILL_AMT4: double, BILL_AMT5: double, BILL_AMT6: double, PAY_AMT1: double, PAY_AMT2: double, PAY_AMT3: double, PAY_AMT4: double, PAY_AMT5: double, PAY_AMT6: double, default payment next month: double, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [189]:
# now we look at the predictions
predictions.select(predictions["LIMIT_BAL"],predictions["AGE"],predictions["SEX"], predictions["EDUCATION"], predictions["default payment next month"]).show(5)

+---------+----+---+---------+--------------------------+
|LIMIT_BAL| AGE|SEX|EDUCATION|default payment next month|
+---------+----+---+---------+--------------------------+
| 120000.0|26.0|2.0|      2.0|                       1.0|
|  70000.0|30.0|1.0|      2.0|                       1.0|
|  50000.0|23.0|2.0|      3.0|                       0.0|
|  50000.0|47.0|2.0|      3.0|                       0.0|
|  10000.0|22.0|1.0|      2.0|                       0.0|
+---------+----+---+---------+--------------------------+
only showing top 5 rows



In [190]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="default payment next month")

accuracy = evaluator.evaluate(predictions)

print("The accuracy of the RandomForest model on the test data is: ", accuracy)

The accuracy of the RandomForest model on the test data is:  0.8035711414885208


In [191]:
#sc.stop()

In [192]:
#from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression


classifier = LogisticRegression(labelCol = 'default payment next month', featuresCol = 'features', maxIter=10)

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, classifier])

# train the model
model = pipeline.fit(train)

# select example rows to display.
predictions = model.transform(test)
predictions.show(5)

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="default payment next month", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy of Logistic Regression = " + str(accuracy))

+----+---------+---+---------+--------+----+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+--------------------+--------------------+--------------------+----------+
|  ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE| AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|default payment next month|            features|       rawPrediction|         probability|prediction|
+----+---------+---+---------+--------+----+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+--------------------+--------------------+--------------------+----------+
| 2.0| 120000.0|2.0|      2.0|     2.0|26.0| -1.0|  2.0|  0.0|  0.0|  0.0|  2.0|   2682.0|   1725.0|   2682.0|  

In [193]:
from pyspark.ml.classification import DecisionTreeClassifier

#dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

classifier = DecisionTreeClassifier(labelCol = 'default payment next month', featuresCol = 'features')

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, classifier])

# train the model
model = pipeline.fit(train)

# select example rows to display.
predictions = model.transform(test)
predictions.show(5)

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="default payment next month", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy of DecisionTree is = " + str(accuracy))


+----+---------+---+---------+--------+----+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+--------------------+----------------+--------------------+----------+
|  ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE| AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|default payment next month|            features|   rawPrediction|         probability|prediction|
+----+---------+---+---------+--------+----+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+--------------------+----------------+--------------------+----------+
| 2.0| 120000.0|2.0|      2.0|     2.0|26.0| -1.0|  2.0|  0.0|  0.0|  0.0|  2.0|   2682.0|   1725.0|   2682.0|   3272.0|   3

In [194]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [4, 5, 4, 3]

# create the trainer and set its parameters
classifier = MultilayerPerceptronClassifier(labelCol = 'default payment next month', featuresCol = 'features', maxIter=100, layers=layers, blockSize=128, seed=1234)
#trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# train the model
model = pipeline.fit(train)

# compute accuracy on the test set
result = model.transform(test)
predictionAndLabels = result.select("prediction", "default payment next month")
predictions.show(5)
#evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
evaluator = MulticlassClassificationEvaluator(labelCol="default payment next month", predictionCol="prediction",
                                              metricName="accuracy")
print("Test set accuracy pf Perceptron is = " + str(evaluator.evaluate(predictionAndLabels)))

+----+---------+---+---------+--------+----+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+--------------------+----------------+--------------------+----------+
|  ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE| AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|default payment next month|            features|   rawPrediction|         probability|prediction|
+----+---------+---+---------+--------+----+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+--------------------+----------------+--------------------+----------+
| 2.0| 120000.0|2.0|      2.0|     2.0|26.0| -1.0|  2.0|  0.0|  0.0|  0.0|  2.0|   2682.0|   1725.0|   2682.0|   3272.0|   3