# Random forest classifier - Code

In [1]:
import numpy as np
from collections import Counter

In [2]:
class GiniIndex:
    def getClasses(self, group):
        return set(group[:,-1])
    
    def getProportion(self, group, classValue):
        return (group[:,-1] == classValue).sum() / len(group)
    
    def getScore(self, group):
        score = 0.0
        classes = self.getClasses(group)
        for classValue in classes:
            proportion = self.getProportion(group, classValue)
            score += proportion * proportion
        return score
        
    def giniIndexOfGroup(self, group, samplesCount):
        if len(group) == 0:
            return 0
        score = self.getScore(group)
        giniIndexValue = (1.0 - score) * (len(group) / samplesCount)
        return giniIndexValue
        
    def calculate(self, leftGroup, rightGroup):
        samplesCount = len(leftGroup) + len(rightGroup)
        giniIndexValue = self.giniIndexOfGroup(leftGroup, samplesCount)
        giniIndexValue += self.giniIndexOfGroup(rightGroup, samplesCount)
        return giniIndexValue

In [3]:
class Node:
    def __init__(self):
        self.featureIndex = None
        self.treshold = None
        self.giniIndexValue = None
        self.isLeaf = False
        self.outcome = None
        self.leftChild = None
        self.rightChild = None
        self.leftGroup = None
        self.rightGroup = None
        
    def update(self, featureIndex, treshold, giniIndexValue, leftGroup, rightGroup):
        self.featureIndex = featureIndex
        self.treshold = treshold
        self.giniIndexValue = giniIndexValue
        self.leftGroup = leftGroup
        self.rightGroup = rightGroup
        
class DecisionTreeClassifier:
    def __init__(self, maxDepth=100, minSplitSize=2, bootstrap=False, maxFeatures="None"):
        self.rootNode = Node()
        self.maxDepth = maxDepth
        self.minSplitSize = minSplitSize
        self.bootstrap = bootstrap
        self.maxFeatures = maxFeatures
        self.randomFeatureIndices = None
        
    def splitDataset(self, dataset, featureIndex, treshold):
        leftGroup = dataset[dataset[:,featureIndex] < treshold]
        rightGroup = dataset[dataset[:,featureIndex] >= treshold]
        return leftGroup, rightGroup 
        
    def findBestSplit(self, dataset, node):
        leftGroup, rightGroup = None, None
        node.giniIndexValue = 9999999

        for featureIndex in range(len(dataset[0])-1):
            for row in dataset:
                treshold = row[featureIndex]
                leftGroup, rightGroup = self.splitDataset(dataset, featureIndex, treshold)
                giniIndexValue = GiniIndex().calculate(leftGroup, rightGroup)
                if giniIndexValue < node.giniIndexValue:
                    node.update(featureIndex, treshold, giniIndexValue, leftGroup, rightGroup)
                    
    def makeLeafNode(self, node):
        dataset = None
        if len(node.leftGroup) == 0:
            dataset = node.rightGroup
        elif len(node.rightGroup) == 0:
            dataset = node.leftGroup
        else:
            dataset = np.concatenate((node.leftGroup, node.rightGroup))
        node.outcome = Counter(dataset[:,-1]).most_common(1)[0][0]
        node.isLeaf = True
        
    def processNoSplit(self, node):
        if len(node.leftGroup) == 0 or len(node.rightGroup) == 0:
            self.makeLeafNode(node)
            return True
        return False
    
    def processMaxDepth(self, node, depth):
        if depth >= self.maxDepth:
            self.makeLeafNode(node)
            return True
        return False
    
    def processChild(self, node, group, depth):
        if len(group) < self.minSplitSize:
            node.leftGroup, node.rightGroup = group, np.array([])
            self.makeLeafNode(node)
        else:
            self.findBestSplit(group, node)
            self.recursiveSplit(node, depth+1)
            
    def recursiveSplit(self, node, depth):
        if self.processNoSplit(node):
            return
        if self.processMaxDepth(node, depth):
            return
        
        node.leftChild, node.rightChild = Node(), Node()
        self.processChild(node.leftChild, node.leftGroup, depth)
        self.processChild(node.rightChild, node.rightGroup, depth)
        
    def bootstrapSamples(self, data, target):
        indices = np.random.randint(0,data.shape[0], data.shape[0])
        return data[indices], target[indices]
        
    def selectRandomFeatures(self, data):
        featureIndices = [i for i in range(X_train.shape[1])]
        featuresCount = int(round(np.sqrt(X_train.shape[1])))
        self.randomFeatureIndices = np.random.choice(featureIndices, featuresCount)
        return data[:,self.randomFeatureIndices]
        
    def drawDataset(self, data, target):
        if self.maxFeatures == "sqrt":
            data = self.selectRandomFeatures(data)
        if self.bootstrap:
            data, target = self.bootstrapSamples(data, target)
        return data, target
    
    def fit(self, data, target):
        data, target = self.drawDataset(data, target)
        dataset = np.append(data,np.swapaxes([target],0,1), axis=1)
        self.findBestSplit(dataset, self.rootNode)
        self.recursiveSplit(self.rootNode, 1)
    
    def predictRow(self, row, node):
        if node.isLeaf:
            return node.outcome
        else:
            if row[node.featureIndex] < node.treshold:
                return self.predictRow(row, node.leftChild)
            else:
                return self.predictRow(row, node.rightChild)
    
    def accuracyScore(self, predictions, trueValues):
        return sum(predictions == trueValues) / len(predictions)
        
    def predict(self, data, target):
        dataset = np.append(data,np.swapaxes([target],0,1), axis=1)
        if self.maxFeatures != "None":
            dataset = dataset[:,self.randomFeatureIndices]
        predictions = []
        for row in dataset:
            predictions.append(self.predictRow(row, self.rootNode))
        return np.array(predictions)

In [4]:
class RandomForestClassifier:
    def __init__(self, nTrees=30, maxDepth=100, minSplitSize=50):
        self.nTrees = nTrees
        self.maxDepth = 100
        self.minSplitSize = 50
        self.trees = list()
        
    
    def trainDecisionTree(self, data, target):
        decisionTree = DecisionTreeClassifier(maxDepth=self.maxDepth, 
                                              minSplitSize=self.minSplitSize,
                                             bootstrap=True,
                                             maxFeatures="sqrt")
        decisionTree.fit(data, target)
        return decisionTree
        
    def fitTreesSequentially(self, data, target):
        for i in range(self.nTrees):
            tree = self.trainDecisionTree(data, target)
            self.trees.append(tree)
            
    def fit(self, data, target):
        self.fitTreesSequentially(data, target)
        
    def majorityVote(self, arr):
        return Counter(arr).most_common(1)[0][0]  
    
    def averagePredictions(self, predictions):
        return np.apply_along_axis(self.majorityVote, 0, predictions)
    
    def predictSequentially(self, data, target):
        predictions = [] 
        for decisionTree in self.trees:
            predictions.append(decisionTree.predict(data, target))
        return self.averagePredictions(predictions)
            
    def predict(self, data, target):
        predictions = self.predictSequentially(data, target)
        print(DecisionTreeClassifier().accuracyScore(predictions, target))
        return predictions

# Unit tests

In [5]:
import unittest

class TestGiniIndex(unittest.TestCase):    
    def setUp(self):
        self.testGroup = np.array([[2, 1, 1], [3, -1, 0], [0, 0, 2], [0, 0, 2]])
        
    def testGetClasses(self):
        classes = GiniIndex().getClasses(self.testGroup)
        self.assertEqual({1, 0, 2}, classes, "Should be {0, 1, 2}")
        
    def testGetProportion(self):
        proportion = GiniIndex().getProportion(self.testGroup, 1)
        self.assertEqual(0.25, proportion, "Should be 0.25")
        
    def testGetScore(self):
        score = GiniIndex().getScore(self.testGroup)
        self.assertEqual(0.375, score, "Should be 0.375")
        
    def testGiniIndexOfGroup(self):
        giniIndexValue = GiniIndex().giniIndexOfGroup(self.testGroup, 25)
        self.assertEqual(0.1, giniIndexValue, "Should be 0.1")
        
    def testCalculate(self):
        testGroupRight = np.array([[2, 1, 1], [3, -1, 0], [0, 0, 0], [0, 0, 0]])
        giniIndexValue = GiniIndex().calculate(self.testGroup, testGroupRight)
        self.assertEqual(0.5, giniIndexValue, "Should be 0.5")
    
class TestDecisionTreeClassifier(unittest.TestCase):
    def setUp(self):
        self.testDataset = np.array([[2, 1, 1], [3, -1, 0], [0, 0, 2], [0, 0, 2]])
        
    def testSplitDataset(self):
        (leftGroup, rightGroup) = DecisionTreeClassifier().splitDataset(self.testDataset, 1, 0)
        self.assertTrue(([[3, -1, 0]] == leftGroup).all(), "Should be [3, -1, 0]")
        self.assertTrue(([[2, 1, 1], [0, 0, 2], [0, 0, 2]] == rightGroup).all(), 
                        "Should be [[2, 1, 1], [0, 0, 2], [0, 0, 2]] ")
        
    def testFindBestSplit(self):
        node = Node()
        DecisionTreeClassifier().findBestSplit(self.testDataset, node)
        
        self.assertEqual(0, node.featureIndex, "Should be 0")
        self.assertEqual(2, node.treshold, "Should be 2")
    
    def testMakeLeafNode(self):
        node = Node()
        node.leftGroup = np.array([[3, -1, 0]])
        node.rightGroup = np.array([[2, 1, 1], [0, 0, 2], [0, 0, 2]])
        DecisionTreeClassifier().makeLeafNode(node)
        
        self.assertEqual(2, node.outcome, "Should be 2")
        
    def testProcessNoSplit(self):
        node = Node()
        node.leftGroup = np.array([])
        node.rightGroup = np.array([[2, 1, 1], [0, 0, 2], [0, 0, 2]])
        
        self.assertTrue(DecisionTreeClassifier().processNoSplit(node))
        self.assertTrue(node.isLeaf)
        
    def testProcessMaxDepth(self):
        node = Node()
        node.leftGroup = np.array([[3, -1, 0]])
        node.rightGroup = np.array([[2, 1, 1], [0, 0, 2], [0, 0, 2]])
        actual = DecisionTreeClassifier(maxDepth=5).processMaxDepth(node, 5)
        self.assertTrue(actual)
        
    def testProcessChild(self):
        node = Node()
        group = np.array([[2, 1, 0], [1, 5, 0], [-2, 0, 1], [-1, 4, 1]])
        DecisionTreeClassifier().processChild(node, group, 1)
        self.assertFalse(node.isLeaf)
        

In [6]:
unittest.main(argv=[''], verbosity=2, exit=False)

testFindBestSplit (__main__.TestDecisionTreeClassifier) ... ok
testMakeLeafNode (__main__.TestDecisionTreeClassifier) ... ok
testProcessChild (__main__.TestDecisionTreeClassifier) ... ok
testProcessMaxDepth (__main__.TestDecisionTreeClassifier) ... ok
testProcessNoSplit (__main__.TestDecisionTreeClassifier) ... ok
testSplitDataset (__main__.TestDecisionTreeClassifier) ... ok
testCalculate (__main__.TestGiniIndex) ... ok
testGetClasses (__main__.TestGiniIndex) ... ok
testGetProportion (__main__.TestGiniIndex) ... ok
testGetScore (__main__.TestGiniIndex) ... ok
testGiniIndexOfGroup (__main__.TestGiniIndex) ... ok

----------------------------------------------------------------------
Ran 11 tests in 0.025s

OK


<unittest.main.TestProgram at 0x58fa978>

# Tests

Loading libraries

In [7]:
from sklearn import datasets, linear_model, ensemble
from sklearn.model_selection import train_test_split

## Iris dataset

Load data

In [21]:
X = datasets.load_iris()['data']
y = datasets.load_iris()['target']


In [22]:
print(X.shape)
set(y)

(150, 4)


{0, 1, 2}

Train/test split

In [9]:
X_train, X_test, y_train, y_test  = train_test_split(X, y, test_size=0.2)

Traning classifier

In [10]:
tree = RandomForestClassifier(nTrees=100, maxDepth=5, minSplitSize=50)
tree.fit(X_train, y_train)

Prediction and accuraccy

In [11]:
print('Train accuracy: ')
p = tree.predict(X_train, y_train)
print('Test accuracy: ')
p = tree.predict(X_test, y_test)

Train accuracy: 
0.9833333333333333
Test accuracy: 
0.9


Comparison with scikit-learn random forest classifier

In [12]:
cfl = ensemble.RandomForestClassifier(n_estimators=100,max_depth=5, min_samples_split=50)
cfl.fit(X_train, y_train)
print('Train accuracy: ' + str(sum(cfl.predict(X_train) == y_train) / len(X_train)))
print('Test accuracy: ' + str(sum(cfl.predict(X_test) == y_test) / len(X_test)))

Train accuracy: 0.975
Test accuracy: 0.9


# Digits dataset

Load data

In [24]:
X = datasets.load_digits()['data']
y = datasets.load_digits()['target']
X.shape

(1797, 64)

Train/test split

In [16]:
set(y_train)

{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}

In [14]:
X_train, X_test, y_train, y_test  = train_test_split(X, y, test_size=0.2)

Training

In [17]:
tree = RandomForestClassifier(nTrees=30, maxDepth=5, minSplitSize=50)
tree.fit(X_train, y_train)

Prediction

In [18]:
print('Train accuracy: ')
p = tree.predict(X_train, y_train)
print('Test accuracy: ')
p = tree.predict(X_test, y_test)

Train accuracy: 
0.9526791927627001
Test accuracy: 
0.9


Comparison with scikit-learn random forest classifier

In [29]:
cfl = ensemble.RandomForestClassifier(n_estimators=30,max_depth=5, min_samples_split=50)
cfl.fit(X_train, y_train)
print('Train accuracy: ' + str(sum(cfl.predict(X_train) == y_train) / len(X_train)))
print('Test accuracy: ' + str(sum(cfl.predict(X_test) == y_test) / len(X_test)))

Train accuracy: 0.953375086986778
Test accuracy: 0.9166666666666666
