In [None]:
!pip install -q findspark
!pip install -q pyspark

[K     |████████████████████████████████| 204.2MB 75kB/s 
[K     |████████████████████████████████| 204kB 55.4MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import  math
import pandas as pd
from sklearn.model_selection import * 
import numpy as np

In [None]:
sc = SparkContext(appName="Credit Risk")
sqlContext = SQLContext(sc)
"""defining the sparkcontext which used for establishing a connection to the spark cluster and sql context which enables us to perform sql-like operations on our dataset"""

'defining the sparkcontext which used for establishing a connection to the spark cluster and sql context which enables us to perform sql-like operations on our dataset'

In [None]:
from google.colab import drive
drive.mount('/content/drive')
"""since the size of the dataset is large, in order to upload the dataset, we store it in a google drive and mount it on colab"""

Mounted at /content/drive


'since the size of the dataset is large, in order to upload the dataset, we store it in a google drive and mount it on colab'

In [None]:
data = sqlContext.read.option("header","true").option("inferSchema","true").csv("/content/drive/MyDrive/BigDataProject/credit_card_approval1.csv")
# reading the dataset

In [None]:
from pyspark.sql.functions import rand
data = data.orderBy(rand())

In [None]:
data.show(n=10)

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+----------+------------+------+------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL| NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|       JOB|BEGIN_MONTHS|STATUS|TARGET|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+----------+------------+------+------+
|5053861|          M|           Y|              Y| 2+ children|        121500.0|Secondary / secon...|             Married|House / apartment|    -10991|         -637|         1|              0|         0|         0|  Laborers|          -8|   

In [None]:
def FindCount(rows):
    """For each type of test variable in the dataset, the cnt variable is incremented"""
    cnt = {}  
    for r in rows:
        # the last column, i.e, the target column is the lab for the dataset we have chosen
        lab = r[-1]
        if lab not in cnt:
            cnt[lab] = 0
        cnt[lab] += 1
    return cnt

In [None]:
def label_counts(rows):
    """For each type of test variable in the dataset, the count variable is incremented"""
    cnts = {}  
    for r in rows:
        # the last column, i.e, the target column is the lab for the dataset we have chosen
        lab = r[-1]
        if lab not in cnts:
            cnts[lab] = 0
        cnts[lab] += 1
        sorted_res = sorted(cnts, key=cnts.__getitem__, reverse=True)
    return sorted_res[0]

In [None]:
def CheckNumeric(value):
    #Checking for numeric attributes
    return isinstance(value, int) or isinstance(value, float)

In [None]:
class criterion:
    """Inorder to partition a datset, the right questions are required.
    This 'criterion' class assigns and records a number and a value for a column. The algorithm makes use of
    this matching method to compare the feature value in an example to the feature value stored in the
    crit. 
    """

    def __init__(self, column, value, header):
        self.column = column
        self.value = value
        self.header = header

    def CompAndMatch(self, example):
        """A matching method to compare the feature value in an example to the feature value stored in the crit"""
        v = example[self.column]
        if CheckNumeric(v):
            return v >= self.value
        else:
            return v == self.value

    def __repr__(self):
        """In order to print the crit in a user readable format, this method is used."""
        cond = "=="
        if CheckNumeric(self.value):
            cond = ">="
        return "Is %s %s %s?" % (
            self.header[self.column], cond, str(self.value))

In [None]:
def datasetPartition(rows, crit):
    """This method is used to perform the partitioning of the dataset
    We are reading each row in the dataset and comparing them with the criteria. If they are same,
    append it to 'rows_T', otherwise, append it to 'rows_F'.
    """
    rows_T, rows_F = [], []
    for r in rows:
        if crit.CompAndMatch(r):
            rows_T.append(r)
        else:
            rows_F.append(r)
    return rows_T, rows_F

In [None]:
def Cal_GiniImp(rows):
    """A method to calculate the Gini Impurity.
    Definition: Gini Impurity is a measurement of the likelihood of an incorrect classification of a new instance of a random variable,
     if that new instance were randomly classified according to the distribution of class labels from the data set.
    """
    cnts = FindCount(rows)
    imp = 1
    for lb in cnts:
        lb_prob = cnts[lb] / float(len(rows))
        imp -= lb_prob**2
    return imp

In [None]:
def CalcEntropy(rows):
    """We are calculating the entropy of each attribute in this function
    The formula for entropy is given by summation 1 to n of -pilog2pi
    """
    cnts = FindCount(rows)
    imp = 0
    for lb in cnts:
        lb_prob = cnts[lb] / float(len(rows))
        imp += lb_prob * math.log(lb_prob, 2)
    return -imp

In [None]:
def Calc_InfoGain(left, right, unc_curr):
    """A method to calculate the Information Gain for the attribute.
    Note: Information Gain can be defined as the uncertainty of the starting node,
     minus the weighted imp of two child nodes.
     Info Gain is calculated by the formula Entropy(X)-Entropy(T,X)
    """
    p = float(len(left)) / (len(left) + len(right))
    return unc_curr - p * Cal_GiniImp(left) - (1 - p) * Cal_GiniImp(right)

In [None]:
def best_sp(rows, header):
    """This is used to determine the best splitting point that is to be performed next on the decision tree.
    We choose the property with the highest info gain to be the next edge of the decision tree."""
    highest_gain = 0  # keep track of the highest gain
    best_criterion = None  # keep track of the feature / value that gives the highest gain
    unc_curr = Cal_GiniImp(rows)
    no_of_feat = len(rows[0]) - 1  # number of columns

    for col in range(no_of_feat):  # repeat for each feature

        values = set([r[col] for r in rows])  # For each column, the unique values

        for v in values:  # repeat for each value

            crit = criterion(col, v, header)

            # splitting the dataset using the datasetPartition method
            rows_T, rows_F = datasetPartition(rows, crit)

            # If the dataset is not partitioned, skip this split
            
            if len(rows_T) == 0 or len(rows_F) == 0:
                continue

            # Calculating the information gain from this split using Calc_InfoGain method
            gain = Calc_InfoGain(rows_T, rows_F, unc_curr)

            # The choice of > or >= is upto the user and the structure
            # of the tree slightly changes accordingly.
            
            if gain >= highest_gain:
                highest_gain, best_criterion = gain, crit

    return highest_gain, best_criterion

In [None]:
class Leaf_node:
    """The leaf node contains the target variable and is used to classify the data entry.
    This keeps track of a dictionary of the class -> number of times
    it appears in the rows from the training data that reach this leaf.
    """

    def __init__(self, rows, id, Tree_depth, lab=""):
        self.preds = FindCount(rows)
        self.lab_predicted = label_counts(rows)
        self.id = id
        self.Tree_depth = Tree_depth
        self.lab = lab

In [None]:
class Dec_Node:
    """This is the class that actually holds the decision tree that we are going to create in this project.
    """

    def __init__(self,crit,branch_T,branch_F,Tree_depth,id,rows,pruned=0):
        self.crit = crit
        self.branch_T = branch_T
        self.branch_F = branch_F
        self.Tree_depth = Tree_depth
        self.id = id
        self.rows = rows
        self.pruned = pruned
        self.Tree_depth = Tree_depth

In [None]:
def build_tree(rows, header, Tree_depth, id=0):
    #Building the decision tree.
    
    # Tree_depth which is the depth of the decision tree intialized as 0
    # dataset is partitioned based on each each unique attribute
    # we calculate the info gain,
    # and finally return the crit or split criterion that produces the highest gain.

    gain, crit = best_sp(rows, header)

    # Base case
    # The case with no further gain
    # A leaf is returned once no further questions can be asked
    if gain == 0:
        return Leaf_node(rows, id, Tree_depth,header)

    # Once the code is run to this part,  a useful feature / value
    # to datasetPartition the dataset is obtained.
    # nodeLst.append(id)
    rows_T, rows_F = datasetPartition(rows, crit)

    # Building the true branch recursively
    branch_T = build_tree(rows_T, header, Tree_depth+1,2*id+2)

    # Building the false branch recursively
    branch_F = build_tree(rows_F, header, Tree_depth+1,2*id+1)

    # Return the decision node
    # Saves the best feature / value to ask a question,
    # inorder to barnch further
    return Dec_Node(crit, branch_T, branch_F, Tree_depth, id, rows)

In [None]:
def Tree_Pruning(node, pruningList):
    """ build and optimize the decision tree through parameter tuning
    """

    # Base case
    # if we reach the leaf node
    if isinstance(node, Leaf_node):
        return node
    # if we reach a pruned node
    if int(node.id) in pruningList:
        return Leaf_node(node.rows, node.id, node.Tree_depth, node.crit.value)

    # recursive call for a true branch
    node.branch_T = Tree_Pruning(node.branch_T, pruningList)

    # recursive call for a false branch
    node.branch_F = Tree_Pruning(node.branch_F, pruningList)

    return node

In [None]:
def TFbranch_classify(r, node):
    """A function to determine which branch is the best course of action"""

    # Base case
    # if we reach the leaf node
    if isinstance(node, Leaf_node):
        return node.lab_predicted

    # We determine the course of action by comparing the 
    # feature / value stored in the node,
    # to that of the considered example.
    if node.crit.CompAndMatch(r):
        return TFbranch_classify(r, node.branch_T)
    else:
        return TFbranch_classify(r, node.branch_F)

In [None]:
def PrintTree(node, Tree_depth = 0, spacing=""):
    """This function prints the entire decision tree."""

    # Base case
    # case where we reached the leaf node
    if isinstance(node, Leaf_node):
        print(spacing + "Leaf_node with node id " + str(node.id) + " Predict " + str(node.preds) + " Class = " + str(node.lab_predicted))
        return

    # Print the question or the split criterion
    print(spacing + str(node.crit) + " Tree_depth = " + str(node.Tree_depth) + " id = " + str(node.id))

    # For a true branch, perform a recursive call of this function
    print(spacing + '--> True:')
    PrintTree(node.branch_T,  " Tree_depth = " + str(node.Tree_depth+1), spacing + "  ")

    # For a false  branch, perform a recursive call of this function
    print(spacing + '--> False:')
    PrintTree(node.branch_F,  " Tree_depth = " + str(node.Tree_depth+1), spacing + "  ")

In [None]:
def PrintLeafNode(cnts):
    tot = sum(cnts.values()) * 1.0
    probs = {}
    for lb in cnts.keys():
        probs[lb] = str(int(cnts[lb] / tot * 100)) + "%"
    return probs

In [None]:
def get_Node_List(node, Node_List=[], LeafNode_List=[]):
    # Returns all nodes other than the leaf nodes
    # Base case
    # The case where the leaf node reached
    spacing = ' '
    if isinstance(node, Leaf_node):
        LeafNode_List.append(node.id)
        return

    Node_List.append(node.id)

    # For true branch, performing recursive call of this function
    get_Node_List(node.branch_T, Node_List)

    # For false branch, performing recursive call of this function
    get_Node_List(node.branch_F, Node_List)

    return Node_List, LeafNode_List

In [None]:
def get_LeafNode_List(node, Leaf_Nodes =[]):

    # This will create and track the leaf nodes.
    # Base case
    # The case where we reach the leaf node
    spacing = ' '
    if isinstance(node, Leaf_node):
        Leaf_Nodes.append(node)
        return

    get_LeafNode_List(node.branch_T, Leaf_Nodes)
    get_LeafNode_List(node.branch_F, Leaf_Nodes)

    return Leaf_Nodes

In [None]:
inner_Nodes =[]

In [None]:
def get_Inner_Nodes(node):

    # This function creates and stores the inner nodes of the graph i.e. all nodes that are neither leaf nor root nodes. 
    # Base case:
    # The case for the leaf node is reached
    spacing = ''
    if isinstance(node, Leaf_node):
        return

    inner_Nodes.append(node)

    get_Inner_Nodes(node.branch_T)
    get_Inner_Nodes(node.branch_F)

    return inner_Nodes

In [None]:
def calc_Acc(rows, node):
  """ Function to compute the accuacy measure"""
  num_acc = 0
  num_rows = len(rows)
  if num_rows == 0:
    return 0
  for r in rows:
    # the last column is the lab for the dataset chosen
    lab = r[-1]
    pred = TFbranch_classify(r, node)
    if lab == pred:
      num_acc = num_acc + 1
  return round(num_acc/num_rows, 2)

In [None]:
data1 = [list(row) for row in data.collect()]

In [None]:
header = ['ID','CODE_GENDER','FLAG_OWN_CAR','FLAG_OWN_REALTY','CNT_CHILDREN','AMT_INCOME_TOTAL','NAME_EDUCATION_TYPE','NAME_FAMILY_STATUS','NAME_HOUSING_TYPE','DAYS_BIRTH','DAYS_EMPLOYED','FLAG_MOBIL','FLAG_WORK_PHONE','FLAG_PHONE','FLAG_EMAIL','JOB','BEGIN_MONTHS','STATUS','TARGET']

In [None]:
tree = build_tree(data1,header,0)

In [None]:
PrintTree(tree)
nodes, leafNodes = get_Node_List(tree)
print("------------- Leaf nodes ------------------")
LNode_List = get_LeafNode_List(tree)
for leaf in LNode_List:
    print("id = " + str(leaf.id) + " depth =" + str(leaf.Tree_depth))
print("------------- Non-leaf nodes -----------------")
inner_Nodes = get_Inner_Nodes(tree)
for inner in inner_Nodes:
    print("id = " + str(inner.id) + " depth =" + str(inner.Tree_depth))

Is STATUS == 5? Tree_depth = 0 id = 0
--> True:
  Leaf_node with node id 2 Predict {1: 1087} Class = 1
--> False:
  Is STATUS == 2? Tree_depth = 1 id = 1
  --> True:
    Leaf_node with node id 4 Predict {1: 542} Class = 1
  --> False:
    Is STATUS == 3? Tree_depth = 2 id = 3
    --> True:
      Leaf_node with node id 8 Predict {1: 181} Class = 1
    --> False:
      Is STATUS == 4? Tree_depth = 3 id = 7
      --> True:
        Leaf_node with node id 16 Predict {1: 152} Class = 1
      --> False:
        Leaf_node with node id 15 Predict {0: 3449} Class = 0
------------- Leaf nodes ------------------
id = 2 depth =1
id = 4 depth =2
id = 8 depth =3
id = 16 depth =4
id = 15 depth =4
id = 2 depth =1
id = 4 depth =2
id = 8 depth =3
id = 16 depth =4
id = 15 depth =4
------------- Non-leaf nodes -----------------
id = 0 depth =0
id = 1 depth =1
id = 3 depth =2
id = 7 depth =3


In [None]:
max_acc=0
Tree_Pruning(tree,inner_Nodes)
test = data1[0]
lab = TFbranch_classify(test, tree)
test = data1[0:150]
acc=calc_Acc(test, tree)
if(acc>max_acc):
	max_acc=acc
print("Maximum acc = "+str(max_acc))

Maximum acc = 1.0


In [None]:
print(lab)
print(tree.crit)

0
Is STATUS == 5?


In [None]:
df = data.randomSplit([0.2,0.8])
trainDF = df[0]
testDF = df[1]
train = [list(r) for r in trainDF.collect()]
test = [list(r) for r in testDF.collect()]
tree = build_tree(train, header,0)
final_acc = calc_Acc(test, tree)
print("acc on test = " + str(final_acc))

acc on test = 0.9895
