In [None]:
from pyspark.sql import SparkSession
import numpy as np

In [1]:
spark = SparkSession.builder.appName("CSCI316-process_one_DTtree(simple)") \
.config("spark-master", "local") \
.getOrCreate()

df_FD = spark \
.read \
.format("csv") \
.option("header", "true").load("block_1.csv")

df_FD.printSchema()
df_FD.show(10)

root
 |-- id_1: string (nullable = true)
 |-- id_2: string (nullable = true)
 |-- cmp_fname_c1: string (nullable = true)
 |-- cmp_fname_c2: string (nullable = true)
 |-- cmp_lname_c1: string (nullable = true)
 |-- cmp_lname_c2: string (nullable = true)
 |-- cmp_sex: string (nullable = true)
 |-- cmp_bd: string (nullable = true)
 |-- cmp_bm: string (nullable = true)
 |-- cmp_by: string (nullable = true)
 |-- cmp_plz: string (nullable = true)
 |-- is_match: string (nullable = true)

+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|37291|53113|0.833333333333333|           ?|           1|           ?|      1|     1|     1|     1|      0|    TRUE|
|39086|47614|                1|           ?|  

In [2]:
##following is to do preprocessing
from pyspark.sql.functions import when   
from pyspark.sql.functions import regexp_replace,col
df_FD = df_FD.withColumn('is_match', regexp_replace(col('is_match'), "FALSE", "1"))
df_FD = df_FD.withColumn('is_match', regexp_replace(col('is_match'), "TRUE", "0"))

In [7]:
#convert each tuples of RDD to list
rdd1 = df_FD.rdd.map(list)
rdd1.first()

['37291',
 '53113',
 '0.833333333333333',
 '?',
 '1',
 '?',
 '1',
 '1',
 '1',
 '1',
 '0',
 '0']

In [8]:
#delete first two columns and covert "?" to integer 2 ,otherwise float it. 
def preprocessing(pieces):
    
    scores = [ 2 if p=='?' else float(p) for p in pieces[2:12]]
    
    return scores

#convert rdd to list type
dataset = rdd1.map(lambda x: preprocessing(x)).collect()

#covert to numpy arrray list type
record_linkage = np.array(dataset)

print(type(record_linkage))
print(type(dataset))
print(record_linkage[:1])

<class 'list'>
[[0.833333333333333, 2, 1.0, 2, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0], [1.0, 2, 1.0, 2, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0], [1.0, 2, 1.0, 2, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0], [1.0, 2, 1.0, 2, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0], [1.0, 2, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0], [1.0, 2, 1.0, 2, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0], [1.0, 2, 1.0, 2, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0], [1.0, 2, 1.0, 2, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0], [1.0, 2, 1.0, 2, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0], [1.0, 2, 1.0, 2, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0]]


In [25]:
from numpy import *
import time
from scipy import stats
import matplotlib.pyplot as plt


def calAccuracy(pred, label):
    acc = 0.0
    for i in range(len(pred)):
        if (pred[i] == label[i][-1]):
            acc += 1
    return acc / len(pred)


def classify2(node, dataSet):
    if not isinstance(node, dict):
        return np.argmax(node)

    if (dataSet[node['spInd']] < node['spVal']):
        C = classify2(node['left'], dataSet)
    else:
        C = classify2(node['right'], dataSet)

    return C

def classify1(node, dataSet):
    result = np.zeros(len(dataSet))
    for i in range(len(dataSet)):
        sample = dataSet[i]
        result[i] = classify2(node, sample)

    return result

def BinarySplit(dataSet, featIndex, splitVal):
    subData0 = dataSet[dataSet[:, featIndex] < splitVal]
    subData1 = dataSet[dataSet[:, featIndex] >= splitVal]

    return subData0, subData1

def NewS(subData0, subData1):
    if (len(subData0) == 0 or len(subData1) == 0):
        return inf

    total = len(subData0) + len(subData1)
    p0 = len(subData0) / total
    p1 = len(subData1) / total

    newS = ShannoEnt(subData0) * p0 + ShannoEnt(subData1) * p1

    return newS


def BaseS(dataSet):
    
    S = ShannoEnt(dataSet)

    return S

def ShannoEnt(dataSet):
    numOfG = dataSet[:, -1].sum()
    pG = numOfG / len(dataSet)
    pH = 1 - pG

    if (pH == 0 or pG == 0):
        return 0
    shannonEnt = -pH * np.log2(pH) - pG * np.log2(pG)

    return shannonEnt


def chooseBestSplit(dataSet,ops=(0.05, 6)):
    tolS = ops[0]  #early stoping for information gain but didint implement
    tolN = ops[1]  #early stoping for number of coloumn
    n = dataSet.shape[1]
    S = BaseS(dataSet)  #base entropy
    bestS = inf;bestIndex = 0;bestSplitVal = 0
    #call for lops to iterate each available split condition,then calculate thier entropy,
    #then calculate thier criterion，such asinformaton gain,finally contrast thier criterion to choose best splition
    for featIndex in range(n - 1):
       # print("---------------------------------------------")
       # print("featIndex  "+str(featIndex))
        for splitVal in set(dataSet[:, featIndex]):
            #print("splitVal  "+str(splitVal))
            #call binary split to choose what sub data
            subDS0, subDS1 = BinarySplit(dataSet, featIndex, splitVal)
           # print("subDS0  "+str(subDS0))
            newS = NewS(subDS0, subDS1)
           # print("newS  "+str(newS))
             #contrast
            if (newS < bestS):
                bestS = newS
                bestIndex = featIndex
                bestSplitVal = splitVal

    if (S - bestS) < tolS:
        return None, 0

    # check valid split again and return bestIndex, bestSplitValue

    subDS0, subDS1 = BinarySplit(dataSet, bestIndex, bestSplitVal)

    if (np.shape(subDS0)[0] < tolN) or (np.shape(subDS1)[0] < tolN):
        return None, 0
    return bestIndex, bestSplitVal

def induction(dataSet):
    n = {}
    total = dataSet[:, -1].sum()
    freqClasses = np.array([len(dataSet) - total, total])
    #If it has only one category, the split is stopped
    if len(set(dataSet[:, -1])) == 1:
         return dataSet[0, -1]
    #judge which criteria it belongs to
    #the data divided may not all belong to a class, at this time, the classification of the sub-data set needs to be determined according to the majority voting rule.

    bestIndex, bestSplitVal = chooseBestSplit(dataSet)
    
    # no feature ,so stop slit
    if (bestIndex == None):
        return freqClasses

     #call binary split to choose what sub data
    subDataLeft, subDataRight = BinarySplit(dataSet, bestIndex, bestSplitVal)

    n['spInd'] = bestIndex
    n['spVal'] = bestSplitVal

    n_chaild_l = induction(subDataLeft)
    n['left'] = n_chaild_l

    n_chaild_r = induction(subDataRight)
    n['right'] = n_chaild_r
    
    
    return n

class DecisionTree:
    
    def __init__(self, train_data):
        self.train_data = train_data
        self.root = induction(train_data)
        print(self.root)
        print(type(self.root))

    def predict(self, test_data):
        return classify1(self.root, test_data)

    def calAcc(self, test_data):
        predict = self.predict(test_data)
        return calAccuracy(predict, test_data)


if __name__ == '__main__':
    
    dataSet = record_linkage

    #here can set the ratio og remaining dataset,because speed of calculating is alright,so I set 1,means keep original dataset------------------------ 
    ratio=1
    
    reduced_dataSet_number=int(ratio*len(dataSet))
    
    print("numer of samples: " +  str(reduced_dataSet_number))
    
    shuffled_indices=np.random.permutation(reduced_dataSet_number)
    
    reduced_index=shuffled_indices[:reduced_dataSet_number]
    
    reduced_data=dataSet[reduced_index, :]
    #----------------------------------------------------------------------------------------------------------
    
    
    l = len(reduced_data)
    
    indices = np.random.permutation(reduced_data.shape[0])

    train_idx, test_idx = indices[:int(2 * l / 3)], indices[int(2 * l / 3):]

    train_data, test_data = reduced_data[train_idx, :], reduced_data[test_idx, :]

    M_IG = DecisionTree(train_data)

    label = test_data[:, -1]

    pred = M_IG.predict(test_data)
           
    conf_mat = np.zeros([2, 2])
    for i in range(len(pred)):

         row = int(1 - label[i])
         col = int(1 - pred[i])
         conf_mat[row][col] += 1

    TP = conf_mat[0][0]
    FP = conf_mat[1][0]
    FN = conf_mat[0][1]
    TN = conf_mat[1][1]
    P = conf_mat[0].sum()
    N = conf_mat[1].sum()
    All = P + N
    Precision=TP / (TP + FP)
    Recall=TP /(TP + FN)
    
    print(" ")
    print("Confusion matrix:")
    print("\t", conf_mat[0])
    print("\t", conf_mat[1])
    print("\tAcc: ", (TP + TN) / All)
    print("\tPrecision : ", TP / (TP + FP))
    print("\tRecall: ",TP /(TP + FN))
    print("\tF1-score: ",2*(Recall * Precision) / (Recall + Precision))
    print("-------------------------")
    print()
    print("\tAccuracy from scratch: ",(TP + TN) / All)

numer of samples: 574913
[  1357. 381918.]
<class 'numpy.ndarray'>
 
Confusion matrix:
	 [190902.      0.]
	 [736.   0.]
	Acc:  0.9961594255836526
	Precision :  0.9961594255836526
	Recall:  1.0
	F1-score:  0.9980760181941758
-------------------------

	Accuracy from scratch:  0.9961594255836526
