In [1]:
# importing data
import numpy as np
D = []
file_path = "Homework 2 data/Druns.txt"

with open(file_path, "r") as file:
    for line in file:
        # Split the line into dimensions and label
        parts = line.strip().split()
        if len(parts) == 3:  # Check if the line has both dimensions and a label
            try:
                # Parse dimensions and label as floats and integer
                dim1 = float(parts[0])
                dim2 = float(parts[1])
                label = int(parts[2])

                D.append([dim1, dim2, label])
            except ValueError:
                print(f"Skipping invalid line: {line}")

D = np.array(D)
D

array([[ 0.1, -2. ,  0. ],
       [ 0. , -1. ,  1. ],
       [ 0. ,  0. ,  0. ],
       [ 0. ,  1. ,  0. ],
       [ 0. ,  2. ,  0. ],
       [ 0. ,  3. ,  0. ],
       [ 0. ,  4. ,  0. ],
       [ 0. ,  5. ,  0. ],
       [ 0. ,  6. ,  1. ],
       [ 0. ,  7. ,  0. ],
       [ 0. ,  8. ,  1. ]])

In [2]:
class Node:
    def __init__(self):
        self.is_leaf = False

class LeafNode(Node):
    def __init__(self, class_label=None):
        super().__init__()
        self.is_leaf = True
        self.class_label = class_label

class InternalNode(Node):
    def __init__(self, split_feature=None, split_threshold=None):
        super().__init__()
        self.split_feature = split_feature  # Feature used for splitting
        self.split_threshold = split_threshold  # Threshold for the split
        self.left_child = None  
        self.right_child = None  

    def add_left_child(self, left_child):
        self.left_child = left_child
    
    def add_right_child(self, right_child):
        self.right_child = right_child


In [3]:
# Notes: GainRatio(D,S) = InfoGain(D,S)/H_D(S)
#        InfoGain(D,S) = H_D(S) - H_D(Y|S)
#        H_D(S) = -sum(P(y)*log_2(P(y)))        <- emperically determined 
#        H_D(Y|S) = f-sum(P(S=s)H(Y|S=s)) where 
#        H(Y|S=s) = -sum(P(Y=y|S=s)*log_2(P(Y=y|S=s)))

def MakeSubtree(D):

    """ MakeSubtree(set of training instances D)
      C = DetermineCandidateSplits(D)
      if stopping criteria is met
          make a leaf node N
          determine class label for N
      else
          make an internal node N
          S = FindBestSplit(D, C)
          for each group k of S
                  Dk = subset of training data in group k
                  kth child of N = MakeSubtree(Dk)
      return subtree rooted at N """
    
    if len(D) == 0: 
        N = LeafNode() # leaf node N
        N.class_label = 1 # classify y=1 if there's no data
        return N
    
    # check if all the labels are the same. Create leaf node if so
    # Check if all elements in the last column are the same value
    if np.all(D[:, -1] == D[0][-1]):
        N = LeafNode()
        N.class_label = int(D[0][-1])
        return N 
    
    C_x0 = DetermineCandidateSplits(D,0)
    C_x1 = DetermineCandidateSplits(D,1)
    C = np.concatenate((C_x0, C_x1), axis=0)

    if stoppingCriteria(D,C):
        # make leaf node 
        N = LeafNode()
        # determine class label
        labels = D[:, 2]
        # Calculate the mode of the label column
        mode_label = np.argmax(np.bincount(labels))
        # set the leaf value to the most frequent label
        N.class_label = int(mode_label)
    else:
        # make an internal node 
        N = InternalNode()
        S = FindBestSplit(D,C) # returns a single split of the form [x1, x2, label, keyFeatureIndex]

        # get split feature and threshold value
        N.split_feature = int(S[-1])
        featureAxis = int(S[-1])
        N.split_threshold = S[featureAxis]  

        # split the data into left and right childs
        # first, find the indices that sort D along the featureAxis
        sorted_indices = np.argsort(D[:, featureAxis])

        # then, sort D according to these indices 
        Dsorted = D[sorted_indices]

        # find the index in Dsorted corresponding to the best split S
        splitIndex = np.where(np.all(Dsorted == S[0:3], axis=1))
        splitIndex = int(splitIndex[0][0])
        # if there are key feature values in Dsorted with the same featue value but are behind the splitIndex, move the splitIndex down. 
        for j in range(splitIndex):
            # check if we're at the first index
            if splitIndex == 0:
                break
            if Dsorted[splitIndex-1][featureAxis] == S[featureAxis]:
                splitIndex = splitIndex - 1

        Dright = Dsorted[:splitIndex]
        Dleft = Dsorted[splitIndex:]
        Dsplit = [Dleft, Dright]

        for i in range(2): # let i=0 be the left group, i=1 be the right
            Dk = Dsplit[i] 
            if i == 0:
                N.add_left_child(MakeSubtree(Dk))
            else:
                N.add_right_child(MakeSubtree(Dk))

    return N 


def FindBestSplit(D,C):
    gain_ratios = []
    for i in range(len(C)):
        gain_ratios.append(GainRatio_and_SplitEntropy(D,C[i])[0])
    max_index = gain_ratios.index(max(gain_ratios))
    return C[max_index]

# works correctly with example 
def GainRatio_and_SplitEntropy(D,s):
    """ 
    Parameters:
        D: 2D np.array - data features and labels 
        
        s: np.array - a single split with the feature values, label, and feature index intended to split on.
            e.g.: np.array[float, float, int, int] e.g.: [0.234, 1.356, 0, 0]
    Output:
        G: list - the information gain ratio associated to a given split and the entropy of that split.
       """
    def HY(D):
        labels = D[:,2] # array[0,1,1,0,...]
        p_y1 = np.sum(labels)/len(labels) 
        p_y2 = 1 - p_y1
        if p_y1 == 0 or p_y2 == 0:
            return 0 # Entropy is zero is one of the probabilities is zero
        return -(p_y1*np.log2(p_y1) + p_y2*np.log2(p_y2))

    # calculate H_D(Y) = -sum[P(y)log(P(y))]                    TERM 1
    entropyY = HY(D)

    # calculate H_D(Y|S)                                        TERM 2
    #sort D by xi values into Dleft (>= s) and Dright (< s)
    # sorted_indices = np.argsort(D[:, int(s[3])])
    # Dsorted = D[sorted_indices]

    # # Find the indices where D equals the specified feature values
    # splitIndex = np.where(np.all(Dsorted == s[0:3], axis=1))

    # Dright = Dsorted[:splitIndex[0][0]]
    # Dleft = Dsorted[splitIndex[0][0]:]

    # split the data into left and right childs
    # first, find the indices that sort D along the featureAxis
    featureAxis = int(s[3])
    sorted_indices = np.argsort(D[:, featureAxis])

    # then, sort D according to these indices 
    Dsorted = D[sorted_indices]

    # find the index in Dsorted corresponding to the best split S
    splitIndex = np.where(np.all(Dsorted == s[0:3], axis=1))
    splitIndex = int(splitIndex[0][0])
    # if there are key feature values in Dsorted with the same featue value but are behind the splitIndex, move the splitIndex down. 
    for j in range(splitIndex):
        # check if we're at the first index
        if splitIndex == 0:
            break
        if Dsorted[splitIndex-1][featureAxis] == s[featureAxis]:
            splitIndex = splitIndex - 1

    Dright = Dsorted[:splitIndex]
    Dleft = Dsorted[splitIndex:]


    # calculate H(Y|Sleft) and H(Y|Sright)
    entropyY_given_Sleft = HY(Dleft)
    entropyY_given_Sright = HY(Dright)

    # finally calculate H_D(Y|S)
    Pleft = len(Dleft)/len(D)
    Pright = 1 - Pleft
    entropyY_givenS = Pleft*entropyY_given_Sleft + Pright*entropyY_given_Sright

    # calcualte H_D(S)                                          DENOMINATOR
    l = len(Dleft)
    r = len(Dright)
    t = len(D)
    entropyS = -(l/t*np.log2(l/t) + r/t*np.log2(r/t))

    # put it all together                                       FULL EXPRESSION
    GainRatio = (entropyY - entropyY_givenS)/entropyS   

    # check for zero entropies:
    if np.isnan(GainRatio):
        GainRatio = 0.0
    if np.isnan(entropyS):
        entropyS = 0.0
          
    return [GainRatio, entropyS]           


def stoppingCriteria(D,C):
    """ 
     Parameters: D  - np.array with data containing features and labels
                 C  - np.array with rows from D indicating candidate splits and an 
                      extra column indicating the index of the key splitting feature.

     Returns:    bool: weather or not to stop splitting
       """
    if len(D) == 0:
        return True
    # see if H_D(S) = 0 for ALL candidate split
    G = []
    H_S = []
    for i in range(len(C)):
        # calculate GainRatio G and split entropy H_S
        G.append(GainRatio_and_SplitEntropy(D,C[i])[0])
        H_S.append(GainRatio_and_SplitEntropy(D,C[i])[1])
    
    # if the GainRatio or SplitEntropy are zero for all candidate splits, stop.
    if all(item == 0 for item in H_S) or all(item == 0 for item in G):
        return True
    else:
        return False


def DetermineCandidateSplits(D, Xi):
    """ // Run this subroutine for each numeric feature at each node of DT induction
DetermineCandidateNumericSplits(set of training instances D, feature Xi)
      C = {} // initialize set of candidate splits for feature Xi
      let vj denote the value of Xi
      for the jth data point
      sort the dataset using vj
      as the key for each data point
      for each pair of adjacent vj, vj+1 in the sorted order
          if the corresponding class labels are different
              add candidate split vj to C
return C """

    """ Parameters: D  - np.array with two dimensions with features and labels
                    Xi - int index of feature to use for splitting
        Returns:    C  - np.array with two dimensions with candidate splits. The last column
                      in C is key feature that the split is based on.
       """
    
    C = [] # candidate splits for feature Xi
    sorted_indices = np.argsort(D[:, Xi])
    featureColumn = []
    v = D[sorted_indices]
    for i in range(len(v)-1):
        # check for when labels change 
        if v[i,2] != v[i+1,2]:
            # check for duplicate splits 
            # extract column Xi from C
            if len(C) != 0:
                featureColumn = [row[Xi] for row in C]
            # check if v[Xi] is in column. If yes, duplicate is True. Else, duplicate is False.
            duplicate = v[i+1][Xi] in featureColumn
            if (not duplicate):
                C.append(v[i+1])
    C = np.array(C)

    # add column indicating key feature index
    new_column = np.full((C.shape[0], 1), Xi, dtype=int)

    # Append the 'new_column' to C
    C = np.concatenate((C, new_column), axis=1)
    return C

# C = DetermineCandidateSplits(D,0)
# C
# D = np.array([[0,0,1],[0.1,0.2,0],[0.3,0.4,1]])
# s = [0.3,0.4,1,0]
# print(D)
# print(s)
# print(GainRatio_and_SplitEntropy(D,s))
#D = np.vstack((D, [0.2,0.1,0]))
myTree = MakeSubtree(D)
print('Done')

  p_y1 = np.sum(labels)/len(labels)
  entropyS = -(l/t*np.log2(l/t) + r/t*np.log2(r/t))
  entropyS = -(l/t*np.log2(l/t) + r/t*np.log2(r/t))
  p_y1 = np.sum(labels)/len(labels)
  entropyS = -(l/t*np.log2(l/t) + r/t*np.log2(r/t))
  entropyS = -(l/t*np.log2(l/t) + r/t*np.log2(r/t))
  p_y1 = np.sum(labels)/len(labels)
  entropyS = -(l/t*np.log2(l/t) + r/t*np.log2(r/t))
  entropyS = -(l/t*np.log2(l/t) + r/t*np.log2(r/t))
  p_y1 = np.sum(labels)/len(labels)
  entropyS = -(l/t*np.log2(l/t) + r/t*np.log2(r/t))
  entropyS = -(l/t*np.log2(l/t) + r/t*np.log2(r/t))


Done


In [4]:
# visualizing tree
def visualize_tree(node, depth=0, parent_prefix="", prefix=""):
    if node is not None:
        if isinstance(node, LeafNode):
            label = "Leaf Node (Class: {})".format(node.class_label)
        elif isinstance(node, InternalNode):
            label = "Internal Node\n(Split: {}, Threshold: {})".format(node.split_feature, node.split_threshold)
        else:
            label = "Node"

        line_prefix = "│   " * depth + parent_prefix
        if depth > 0:
            line_prefix += "├───"

        print(line_prefix + prefix + label)

        if isinstance(node, InternalNode):
            visualize_tree(node.left_child, depth + 1, "└───", "Left: ")
            visualize_tree(node.right_child, depth + 1, "└───", "Right: ")

visualize_tree(myTree)

Internal Node
(Split: 1, Threshold: 8.0)
│   └───├───Left: Leaf Node (Class: 1)
│   └───├───Right: Internal Node
(Split: 1, Threshold: 0.0)
│   │   └───├───Left: Internal Node
(Split: 1, Threshold: 6.0)
│   │   │   └───├───Left: Internal Node
(Split: 1, Threshold: 7.0)
│   │   │   │   └───├───Left: Leaf Node (Class: 0)
│   │   │   │   └───├───Right: Leaf Node (Class: 1)
│   │   │   └───├───Right: Leaf Node (Class: 0)
│   │   └───├───Right: Internal Node
(Split: 0, Threshold: 0.1)
│   │   │   └───├───Left: Leaf Node (Class: 0)
│   │   │   └───├───Right: Leaf Node (Class: 1)
