## To find spark in this instance and Creating spark context and session and reading data

In [2]:
import findspark
findspark.init()
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
#sc.stop()
sc = SparkContext('local')
spark = SparkSession(sc)

## Reading Binned train data to train the model using Decision Tree which is implemented

In [3]:
text_file = sc.textFile("COPY_binned_train_200.csv")

## Map Reduce 1 which takes entire data as input and generate ("class label", count) as key value pairs

In [39]:
mapRed1op = text_file.flatMap(lambda line: line.split("\n")) \
             .map(lambda y: (y[0], 1)) \
             .reduceByKey(lambda a, b: a + b)
mapRed1op.collect()

[('0', 241), ('1', 153)]

## Total Entropy calculation by collecting rdd and changing to dictionary form

In [5]:
map1op=mapRed1op.collect()
map1opDic=dict(map1op)
print("Converting mapReduce 1 output to dictionarty:"+str(map1opDic))
import math
n0=map1opDic.get('0')
n1=map1opDic.get('1')
n=n0+n1
totEntr=-(((n1/n)*(math.log(n1/n,2)))+((n0/n)*(math.log(n0/n,2))))
print("Total Entropy:"+str(totEntr))

Converting mapReduce 1 output to dictionarty:{'0': 241, '1': 153}
Total Entropy:0.9637100175188218


In [6]:
import itertools
all_tr_data=text_file.flatMap(lambda line: line.split("\n"))

## Custom Methods To implement Decision Tree and get the tree form 

### Func: 'get_branching_with_bestAttr' 
#### Inputs: Data(in rdd form) and Total entropy for Data
#### Ouput: Dictionary with best Attribute to split as 'index' and splits of data based on index as 'groups'

In [7]:
def get_branching_with_bestAttr(dataset,totEntr):
    index_val=get_bestAttr(dataset,totEntr)
    print("Best Attribute is "+str(index_val))
    groups=get_branches(index_val,dataset)
    display(groups)
    return {'index':index_val, 'groups':groups}

### Func: 'get_branches' 
#### Inputs: index (column number) based on which split should be done and Data(in rdd form)
#### Ouput: Two subsets of data as left and right ( left is with rows[index]= '0.0' and right is with rows[index]='1.0'

In [41]:
def get_branches(index, dataset):
    left =dataset.filter(lambda y: y.split(",")[index+1]=='0.0')
    right=dataset.filter(lambda y: y.split(",")[index+1]=='1.0')
    return left, right

## Map Reduce2 (Phase 2) to calculate key value pairs which will be used to calculate best attribute 
#### Here final (key,value) pairs will be ('col_colNumber_colValue_classLabel', count)

### Func: 'get_bestAttr' 
#### Inputs: index (column number) based on which split should be done and Data(in rdd form)
#### Ouput: Two subsets of data as left and right ( left is with rows[index]= '0.0' and right is with rows[index]='1.0'

In [8]:
def get_bestAttr(dataset,totEntr):
    a=dataset \
    .map(lambda  y:[("col_"+str(i+2)+"_"+str(v)+"_"+str(u),1) for i,(v,u) in enumerate(zip(y.split(",")[1:],itertools.repeat(y.split(",")[0])))]) \
    .flatMap(lambda z:z) \
    .reduceByKey(lambda x,y:(x+y))
    tm2op=a.collect()
    print("Printing first five key-value pairs after MapReduce2")
    print(tm2op[:5])
    tm2opDic=dict(tm2op)
    ind=get_Index(tm2opDic,totEntr)
    return ind  

### Func: 'get_Index' 
#### Inputs: MapReduce2 Output as dictionary and Total Entropy for that data
#### Ouput: Best Attribute based on the entropy and weighted entropy and Information gain

In [9]:
def get_Index(map2opDic,totEntr):
    gainlist=[]
    for i in range(2,17):
        tmplist1=[]
        tmplist2=[]
        for j in range(0,2):
            a0=map2opDic.get("col_"+str(i)+"_"+str(j)+".0"+"_0.0",0)
            a1=map2opDic.get("col_"+str(i)+"_"+str(j)+".0"+"_1.0",0)
            a=a1+a0
            tmpEntr=0
            if a0==0 and a1!=0:
                tmpEntr=tmpEntr+(-(((a1/a)*(math.log(a1/a,2)))))
            elif a1==0 and a0!=0:
                tmpEntr=tmpEntr+(-(((a0/a)*(math.log(a0/a,2)))))
            elif a1!=0 and a0!=0:
                tmpEntr=tmpEntr+(-(((a1/a)*(math.log(a1/a,2)))+((a0/a)*(math.log(a0/a,2)))))
            else:
                tmpEntr=0
            tmplist1.append(tmpEntr)
            tmplist2.append(a)
        sumEnt=0
        totalRec=0
        for k in range(len(tmplist1)):
            sumEnt=sumEnt+(tmplist1[k]*tmplist2[k])
            totalRec=totalRec+tmplist2[k]
        gainlist.append((totEntr-float(sumEnt/totalRec)))
    return gainlist.index(max(gainlist))

### Func: 'split' - split the main node recursively until given depth
#### Inputs: Main Node (Dictionary with left and right keys), max_depth (depth at which decision tree should stop),min_size( no of records in particular split at which no futhur split continue on that branch), Total entropy
#### Ouput:  Final Tree with best attribute values at each level and groups

### Func: 'terminate' - function to calculate the class label at end of each decision tree for that branch based on majority of classes in that particular group
#### Inputs: Group (Dictionary with left and right subsets of data)
#### Ouput: Class label based on majority (Here Map Reduce is used to calculate count of class labels) 

In [10]:
from operator import itemgetter

# Create a terminal node value
def terminate(group):
    #outcomes = [row[0] for row in group]
    tmp = group \
             .map(lambda y: (y[0], 1)) \
             .reduceByKey(lambda a, b: a + b)
    x=tmp.collect()
    #return x;
    if len(x)!=0:
        return max(x,key=itemgetter(1))[0]
    else:
        return 'NULL'
 
# Create child splits for a node or make terminal
def split(node, max_depth, min_size, depth,totEntr):
    left, right = node['groups']
    #left= left.cache()
    #right=right.cache()
    del(node['groups'])
    # check for a no split
    if not left or not right:
        node['left'] = node['right'] = terminate(left + right)
        return
    # check for max depth
    if depth >= max_depth:
        node['left'], node['right'] = terminate(left), terminate(right)
        return
    # process left child
    if len(left.collect()) <= min_size:
        node['left'] = terminate(left)
    else:
        node['left'] = get_branching_with_bestAttr(left,totEntr)
        split(node['left'], max_depth, min_size, depth+1,totEntr)
    # process right child
    if len(right.collect()) <= min_size:
        node['right'] = terminate(right)
    else:
        node['right'] = get_branching_with_bestAttr(right,totEntr)
        split(node['right'], max_depth, min_size, depth+1,totEntr)

## Calculating Root Node  (first best attribute and first split)

In [11]:
root=get_branching_with_bestAttr(all_tr_data,totEntr)

Printing first five key-value pairs after MapReduce2
[('col_2_0.0_0.0', 166), ('col_3_0.0_0.0', 183), ('col_4_1.0_0.0', 95), ('col_5_0.0_0.0', 186), ('col_6_0.0_0.0', 147)]
Best Attribute is 3


(PythonRDD[13] at RDD at PythonRDD.scala:52,
 PythonRDD[14] at RDD at PythonRDD.scala:52)

## From root node splitting and getting decision tree upto depth 3

In [12]:
split(root, 3, 5, 1,totEntr)

Printing first five key-value pairs after MapReduce2
[('col_2_0.0_0.0', 130), ('col_3_0.0_0.0', 147), ('col_4_1.0_0.0', 66), ('col_5_0.0_0.0', 186), ('col_6_0.0_0.0', 122)]
Best Attribute is 0


(PythonRDD[20] at RDD at PythonRDD.scala:52,
 PythonRDD[21] at RDD at PythonRDD.scala:52)

Printing first five key-value pairs after MapReduce2
[('col_2_0.0_0.0', 130), ('col_3_0.0_0.0', 104), ('col_4_1.0_0.0', 51), ('col_5_0.0_0.0', 130), ('col_6_0.0_0.0', 87)]
Best Attribute is 0


(PythonRDD[27] at RDD at PythonRDD.scala:52,
 PythonRDD[28] at RDD at PythonRDD.scala:52)

Printing first five key-value pairs after MapReduce2
[('col_2_1.0_0.0', 56), ('col_3_0.0_0.0', 43), ('col_4_0.0_0.0', 41), ('col_5_0.0_0.0', 56), ('col_6_1.0_0.0', 21)]
Best Attribute is 1


(PythonRDD[44] at RDD at PythonRDD.scala:52,
 PythonRDD[45] at RDD at PythonRDD.scala:52)

Printing first five key-value pairs after MapReduce2
[('col_2_0.0_0.0', 36), ('col_3_0.0_0.0', 36), ('col_4_1.0_0.0', 29), ('col_5_1.0_0.0', 55), ('col_6_1.0_0.0', 30)]
Best Attribute is 1


(PythonRDD[61] at RDD at PythonRDD.scala:52,
 PythonRDD[62] at RDD at PythonRDD.scala:52)

Printing first five key-value pairs after MapReduce2
[('col_2_0.0_0.0', 24), ('col_3_0.0_0.0', 36), ('col_4_1.0_0.0', 19), ('col_5_1.0_0.0', 36), ('col_6_1.0_0.0', 20)]
Best Attribute is 12


(PythonRDD[68] at RDD at PythonRDD.scala:52,
 PythonRDD[69] at RDD at PythonRDD.scala:52)

Printing first five key-value pairs after MapReduce2
[('col_2_0.0_0.0', 12), ('col_3_1.0_0.0', 19), ('col_4_1.0_0.0', 10), ('col_5_1.0_0.0', 19), ('col_6_0.0_0.0', 9)]
Best Attribute is 7


(PythonRDD[85] at RDD at PythonRDD.scala:52,
 PythonRDD[86] at RDD at PythonRDD.scala:52)

## Final Decision tree calculated based on implementation using two map reduce programs in a loop at different places

#### Here left is such that index ==0 and right is such that index ==1 and index is best attribute at that level in decision Tree 

In [13]:
display(root)

{'index': 3,
 'left': {'index': 0,
  'left': {'index': 0, 'left': '0', 'right': 'NULL'},
  'right': {'index': 1, 'left': '0', 'right': '0'}},
 'right': {'index': 1,
  'left': {'index': 12, 'left': '0', 'right': '0'},
  'right': {'index': 7, 'left': '0', 'right': '1'}}}

## Reading Binned test data to calculate performance metrics of  the model using Decision Tree which is implemented## 

In [14]:
test_file = sc.textFile("COPY_binned_test_200.csv")

In [15]:
all_test_data=test_file.flatMap(lambda line: line.split("\n"))

In [16]:
test_list=all_test_data.collect()

In [17]:
test_list_df=[]
for rw in test_list:
    test_list_df.append(rw.split(","))

### Func: 'predict' - function to calculate predictions for records in test data 
#### Inputs: Decision Tree and particular row (record)
#### Ouput: Class label based on tree

In [18]:
def predict(node, row):
    if row[node['index']+1] == '0.0':
        if isinstance(node['left'], dict):
            return predict(node['left'], row)
        else:
            return node['left']
    else:
        if isinstance(node['right'], dict):
            return predict(node['right'], row)
        else:
            return node['right']

## Iterating over test data and caclulating predictions

In [19]:
predictions=[]
org=[]
for row in test_list_df:
    pre=predict(root,row)
    predictions.append(pre)
    org.append(row[0])

## Custom Methods to calculate performance metrics

In [43]:
def create_conf_matrix(expected, predicted, n_classes):
    m = [[0] * n_classes for i in range(n_classes)]
    for pred, exp in zip(predicted, expected):
        if exp=='0.0':
            exp=0
        else:
            exp=1
        m[int(exp)][int(pred)] += 1
    return m

def recall_1(con_mat):
    if (con_mat[1][1] + con_mat[1][0])==0:
        return float('NaN')
    val=con_mat[1][1] / (con_mat[1][1] + con_mat[1][0] )
    return val
def recall_0(con_mat):
    if (con_mat[0][0] + con_mat[0][1])==0:
        return float('NaN')
    val=con_mat[0][0] / (con_mat[0][1] + con_mat[0][0] )
    return val
def prec_1(con_mat):
    if (con_mat[1][1] + con_mat[0][1])==0:
        return float('NaN')
    val=con_mat[1][1] / (con_mat[1][1] + con_mat[0][1] )
    return val
def prec_0(con_mat):
    if (con_mat[0][0] + con_mat[1][0])==0:
        return float('NaN')
    val=con_mat[0][0] / (con_mat[1][0] + con_mat[0][0] )
    return val
def accuracy(con_mat):
    val=(con_mat[0][0] +con_mat[1][1])/ (con_mat[0][0] + con_mat[1][0] +con_mat[0][1]+con_mat[1][1])
    return val

## Performance metrics calculated for Decision Tree Implemented

In [44]:
conMat=create_conf_matrix(org,predictions,2)
recall_1(conMat)
recall_0(conMat)
prec_1(conMat)
prec_0(conMat)
print('Precision of label-1 (M) ', prec_1(conMat))
print('Precision of label-0 (B)', prec_0(conMat))
print( 'Recall of label-1 (M)    ', recall_1(conMat))
print( 'Recall of label-0 (B)   ', recall_0(conMat))
#print ('F-1 Score         ', metrics.fMeasure())
print ('Accuracy        ', accuracy(conMat))
print ('Confusion Matrix\n', conMat)

Precision of label-1 (M)  0.94
Precision of label-0 (B) 0.904
Recall of label-1 (M)     0.7966101694915254
Recall of label-0 (B)    0.9741379310344828
Accuracy         0.9142857142857143
Confusion Matrix
 [[113, 3], [12, 47]]


## Tree Structure for Decision Tree Implemented

In [26]:
display(root)

{'index': 3,
 'left': {'index': 0,
  'left': {'index': 0, 'left': '0', 'right': 'NULL'},
  'right': {'index': 1, 'left': '0', 'right': '0'}},
 'right': {'index': 1,
  'left': {'index': 12, 'left': '0', 'right': '0'},
  'right': {'index': 7, 'left': '0', 'right': '1'}}}

## To Compare results on implemented Decision Tree with MLlib Decision Tree results

## Reading files which are having same set of train and test records but not binned

In [27]:
spark = SparkSession(sc)
training_data_1 = spark.read.csv('COPY_nobin_train_org_200.csv', header = False, inferSchema = True)
testing_data_1=spark.read.csv('COPY_nobin_test_org_200.csv', header = False, inferSchema = True)

## Decision Tree Classifier using MLlib with maxDepth=3 and impurity =entropy and maxBins=2

In [28]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree

def labelData(data):
    # label: row[end], features: row[0:end-1]
    return data.rdd.map(lambda row: LabeledPoint(row[0], row[1:]))

training_data = labelData(training_data_1)
testing_data=labelData(testing_data_1)

model = DecisionTree.trainClassifier(training_data, numClasses=2, maxDepth=3,
                                     categoricalFeaturesInfo={},
                                     impurity='entropy', maxBins=2)
print(model.toDebugString())

from pyspark.mllib.evaluation import MulticlassMetrics



DecisionTreeModel classifier of depth 3 with 13 nodes
  If (feature 3 <= 0.06445999999999999)
   If (feature 0 <= 19.075)
    Predict: 0.0
   Else (feature 0 > 19.075)
    If (feature 1 <= 561.15)
     Predict: 0.0
    Else (feature 1 > 561.15)
     Predict: 0.0
  Else (feature 3 > 0.06445999999999999)
   If (feature 1 <= 561.15)
    If (feature 12 <= 0.1313)
     Predict: 0.0
    Else (feature 12 > 0.1313)
     Predict: 0.0
   Else (feature 1 > 561.15)
    If (feature 7 <= 25.91)
     Predict: 0.0
    Else (feature 7 > 25.91)
     Predict: 1.0



## Performance metrics calculated for MLlib's Decision Tree of our dataset 

In [29]:
def getPredictionsLabels(model, test_data):
    predictions = model.predict(test_data.map(lambda r: r.features))
    return predictions.zip(test_data.map(lambda r: r.label))

def printMetrics(predictions_and_labels):
    metrics = MulticlassMetrics(predictions_and_labels)
    print('Precision of label-1 (M) ', metrics.precision(1))
    print('Precision of label-0 (B)', metrics.precision(0))
    print( 'Recall of label-1 (M)    ', metrics.recall(1))
    print( 'Recall of label-0 (B)   ', metrics.recall(0))
    print ('F-1 Score         ', metrics.fMeasure())
    print ('Accuracy        ', metrics.accuracy)
    print ('Confusion Matrix\n', metrics.confusionMatrix().toArray())

predictions_and_labels = getPredictionsLabels(model, testing_data)
printMetrics(predictions_and_labels)

Precision of label-1 (M)  0.94
Precision of label-0 (B) 0.904
Recall of label-1 (M)     0.7966101694915254
Recall of label-0 (B)    0.9741379310344828
F-1 Score          0.9142857142857143
Accuracy         0.9142857142857143
Confusion Matrix
 [[113.   3.]
 [ 12.  47.]]


## Final results

#### b. The choice of parameters and attribute selection metric (Gini index, info gain, etc.) used

I am using same parameters which I have used in problem 1, but here I have chosen no of bins=2. Since the library I have used is based on CART algorithm and always node at any level will have two child nodes (this internal best splits will be decided based on entropy and it will take every value in record as range for bin and caclulates best splits which takes more time). If no of bins=2 then CART algorithm is similar to ID3 algorithm which is easy to process)

## c. Any Assumptions Made

Same assumptions which are made in problem 1 are assumed here as follows :

1) Removed 'id' column assuming it does not effect 'diagnosis' label

2) Here highly correlated features are dropped by considering only one feature from highly correlated features assuming those feature won't effect the 'diagonsis' the column. (radius_mean, perimeter_mean, area_mean are correlated with each other so I am considering only area_mean. Apart from these, radius_se, perimeter_se and area_se are correlated and I only used area_se. radius_worst, perimeter_worst and area_worst are correlated so I used area_worst. Also compactness_mean, concavity_mean and concave points_mean are correlated with each other.Therefore I only choose concavity_mean. compactness_worst, concavity_worst and concave points_worst so I used concavity_worst. Compactness_se, concavity_se and concave points_se so I used concavity_se. texture_mean and texture_worst are correlated and I used texture_mean. area_worst and area_mean are correlated, I used area_mean. concavity_worst and concavity_mean are correlated, I used concavity_mean)

3) Tried with different seeds and data has been splitted 70:30 and randomly took seed 200 for no of bins =2 for which decent accuracy has been obtained (91.4%)

4) As mentioned above no of bins =2 has been considered and compared the both api and implemented decision tree preassuming both results will be same

5) CART implementation works same as ID3 when no of bins =2

6) Cross Validation has not been done while doing decision tree implementation as main purpose is to compare the results with that of Api. I got same results for both decision tree. (Also I have run with different seeds which is nothing but for different combination of training data and observed for seed =200 is giving good results which can be considered as validation test.)

## e. Decision tree Obtained

In [47]:
display(root)

{'index': 3,
 'left': {'index': 0,
  'left': {'index': 0, 'left': '0', 'right': 'NULL'},
  'right': {'index': 1, 'left': '0', 'right': '0'}},
 'right': {'index': 1,
  'left': {'index': 12, 'left': '0', 'right': '0'},
  'right': {'index': 7, 'left': '0', 'right': '1'}}}

#### Note: 
1)Here condition for left is value=='0.0' and condition for right is value=='1.0'

2)Here Null indicates that side no branching (pruned). To have uniform length/nodes at every level considering dummy brnach and finally assigning null value (Though while predicting it will never travers through that branch)

## f. Performance shown by the confusion matrix.

In [49]:
conMat=create_conf_matrix(org,predictions,2)
recall_1(conMat)
recall_0(conMat)
prec_1(conMat)
prec_0(conMat)
print('Precision of label-1 (M) ', prec_1(conMat))
print('Precision of label-0 (B)', prec_0(conMat))
print( 'Recall of label-1 (M)    ', recall_1(conMat))
print( 'Recall of label-0 (B)   ', recall_0(conMat))
#print ('F-1 Score         ', metrics.fMeasure())
print ('Accuracy        ', accuracy(conMat))
print ('Confusion Matrix\n', conMat)

Precision of label-1 (M)  0.94
Precision of label-0 (B) 0.904
Recall of label-1 (M)     0.7966101694915254
Recall of label-0 (B)    0.9741379310344828
Accuracy         0.9142857142857143
Confusion Matrix
 [[113, 3], [12, 47]]


## Comparing Results Comments

We got same tree and same performance metrics for both decision tree

## References:

1)  https://machinelearningmastery.com/implement-decision-tree-algorithm-scratch-python/

2)  https://spark.apache.org/docs/2.2.0/mllib-decision-tree.html

3)  https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html

4)  https://mapr.com/blog/churn-prediction-pyspark-using-mllib-and-ml-packages/

5)  https://www.kaggle.com/uciml/breast-cancer-wisconsin-data/kernels