The goal of this project is to implement the original Isolation Forest algorithm which focus on the anomalies, which are few and different. <br>

In [3]:
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix
import sys

import warnings
warnings.filterwarnings("ignore")

In [4]:
# class for external nodes
class exNode:
    def __init__(self, size):
        self.size = size

# class for internal nodes
class inNode:
    def __init__(self, left, right, attribute, split_val):
        self.left = left
        self.right = right
        self.q = attribute
        self.p = split_val

In [5]:
# IsolationTree class creates a tree and depending on improved value calls the respective functions.
# fit_tree is the original method of fitting a tree and fit_improved splits smartly to fit when noise is added to the data.

class IsolationTree:
    def __init__(self, height_limit):
        self.root = None
        self.height_limit = height_limit
        self.n_nodes = 0

    def efficient_split(self, X, size):
        """
        This function randomly checks for new attributes and new value to split the data
        and compares which split is better. 
        Returns : left_data, right_data, q(attribute), p(value)
        """
        mini = X.shape[0]+1 
        
        # get multiple attributes 
        Q = np.arange(X.shape[1])
        q = np.random.choice(Q, size=size, replace=False) 
            
        for att in q: 
            min_p = X[:,att].min()      
            max_p = X[:,att].max()
            if min_p == max_p:  
                continue
            
            # get multiple split values
            p = np.random.uniform(min_p, max_p, size)
            #p = np.array([min_p + 1, max_p - 1])
            
            for val in p:
                X_left = X[X[:,att] < val]
                X_right = X[X[:,att] >= val]
                min_count = np.minimum(X_left.shape[0], X_right.shape[0])
            
                # check for best split
                if mini > min_count:   
                    mini = min_count
                    l_tree_X = X_left
                    r_tree_X = X_right
                    attribute = att
                    value = val
            
        return l_tree_X, r_tree_X, attribute, value
    
    
    def fit_improved(self, X, length):
        """
        Runs when improved==True
        Splits smartly.
        """
        if (length >= self.height_limit) or (X.shape[0] <=1 ): 
            self.n_nodes += 1
            return exNode(X.shape[0])
        else:
            # when all rows all duplicate
            eq = np.all(X == X[0,:], axis = 0)
            if (np.ones(X.shape[1])==eq).all():  
                self.n_nodes += 1
                return exNode(X.shape[0])
                
            # otherwise
            X_left, X_right, q, p = self.efficient_split(X, 3)
            self.n_nodes += 1
            left_tree = self.fit_tree(X_left, length+1)
            right_tree = self.fit_tree(X_right, length+1)
            
        return inNode(left_tree, right_tree, q, p)
    
    def fit_tree(self, X, length):
        """
        Runs when improved=False.
        """
        if (length >= self.height_limit) or (X.shape[0] <=1 ): 
            self.n_nodes += 1
            return exNode(X.shape[0])
        else:
            Q = np.arange(X.shape[1]) # this gives column index
            q = np.random.choice(Q)   # choose an attribute randomly
            min_p = X[:,q].min()      
            max_p = X[:,q].max()
            
            # all data in Q have same value
            if min_p == max_p:
                self.n_nodes += 1
                return exNode(X.shape[0])
            
            p = np.random.uniform(min_p, max_p)  # choose p randomly between min and max
            X_left = X[X[:,q] < p]
            X_right = X[X[:,q] >= p]
            
            # splitting the tree
            self.n_nodes += 1
            left_tree = self.fit_tree(X_left, length+1)
            right_tree = self.fit_tree(X_right, length+1)
        
        return inNode(left_tree, right_tree, q, p)
        
        
    def fit(self, X:np.ndarray, improved=False):
        """
        Given a 2D matrix of observations, create an isolation tree. Set field
        self.root to the root of that tree and return it.
        If you are working on an improved algorithm, check parameter "improved"
        and switch to your new functionality else fall back on your original code.
        """
        if improved==True:
            self.root = self.fit_improved(X, length=0)
            return self.root
        
        self.root = self.fit_tree(X, length=0)
        return self.root
    


In [6]:
# This class fits n_trees to form an ensembledTree and calculates score given by approach in the paper.
class IsolationTreeEnsemble:
    def __init__(self, sample_size, n_trees=10):
        self.sample_size = sample_size
        self.n_trees = n_trees
        self.trees = []  # store all trees here

    def fit(self, X:np.ndarray, improved=False):
        """
        Given a 2D matrix of observations, create an ensemble of IsolationTree
        objects and store them in a list: self.trees.  Convert DataFrames to
        ndarray objects.
        """
        if isinstance(X, pd.DataFrame):
            X = X.values
        height_limit = np.ceil(np.log2(self.sample_size))
        
        # fit n_trees
        for i in range(self.n_trees):
            X_new = X[np.random.choice(X.shape[0], size=self.sample_size, replace=False)] # take sample and fit a tree on this
            tree = IsolationTree(height_limit)
            tree.root = tree.fit(X_new, improved)
            self.trees.append(tree)
        
        return self
    
    def path_in_one_tree(self, tree, x, height = 0):
        """
        Find path for one instance x in one tree
        """
#         # Recursion way      
#         if isinstance(tree, exNode) == True:   # terminal node
#             return height + self.get_c(tree.size)
#         else:
#             attribute = tree.q
#             if x[attribute] < tree.p: # visit left tree
#                 height += 1
#                 left_tree = tree.left
#                 height = self.path_in_one_tree(left_tree, x, height)
#             elif x[attribute] >= tree.p:                                # visit right tree
#                 height += 1
#                 right_tree = tree.right
#                 height = self.path_in_one_tree(right_tree, x, height)
#         return height
        
        # iterative way
        while isinstance(tree, inNode):
            attribute = tree.q
            if x[attribute] < tree.p:
                height += 1
                tree = tree.left
            else:
                height += 1
                tree = tree.right
                
        return height + self.get_c(tree.size)
    
    def path_length(self, X:np.ndarray) -> np.ndarray:
        """
        Given a 2D matrix of observations, X, compute the average path length
        for each observation in X.  Compute the path length for x_i using every
        tree in self.trees then compute the average for each x_i.  Return an
        ndarray of shape (len(X),1).
        """ 
        if isinstance(X, pd.DataFrame):
            X = X.values
        path = np.zeros(X.shape[0])
        
        for i in range(X.shape[0]):         # get x_i   
            obs = np.zeros(self.n_trees)
            for t in range(self.n_trees):   # get each tree
                
                # get the parent root of the tree first
                obs[t] = self.path_in_one_tree(self.trees[t].root, X[i])  # take height per tree for same obs
            path[i] = np.mean(obs)

        return path
    
    
    def get_c(self, size):
        """
        Calculates c(n)
        """
    
        if size == 2:
            return 1
        elif size > 2:
            H_i = np.log(size-1) + 0.5772156649
            return 2*H_i - 2*(size-1)/(size)
        
        return 0
                
    def anomaly_score(self, X:np.ndarray) -> np.ndarray:
        """
        Given a 2D matrix of observations, X, compute the anomaly score
        for each x_i observation, returning an ndarray of them.
        """

        E = self.path_length(X)
        c = self.get_c(self.sample_size)
        
        return 2**(-(E)/c)
    
    def predict_from_anomaly_scores(self, scores:np.ndarray, threshold:float) -> np.ndarray:
        """
        Given an array of scores and a score threshold, return an array of
        the predictions: 1 for any score >= the threshold and 0 otherwise.
        """

        return np.where(scores>=threshold, 1, 0)
    
    def predict(self, X:np.ndarray, threshold:float) -> np.ndarray:
        "A shorthand for calling anomaly_score() and predict_from_anomaly_scores()."
        
        score = self.anomaly_score(X)
        y_pred = self.predict_from_anomaly_scores(score, threshold)
        
        return y_pred


In [7]:
# find the threshold (this is data specific), this changes for each dataset.
def find_TPR_threshold(y, scores, desired_TPR):
    """
    Start at score threshold 1.0 and work down until we hit desired TPR.
    Step by 0.01 score increments. For each threshold, compute the TPR
    and FPR to see if we've reached to the desired TPR. If so, return the
    score threshold and FPR.
    """
    
    threshold = 1.0
    prev_FPR = 0
    prev_TPR = 0
    flag = False
    
    while flag==False:
        y_pred = np.where(scores>=threshold, 1, 0)
        confusion = confusion_matrix(y, y_pred)
        TN, FP, FN, TP = confusion.flat
        TPR = TP / (TP + FN)
        FPR = FP / (FP + TN)
        
        # drop threshold only when TPR is small
        if TPR < desired_TPR:
            prev_FPR = FPR
            prev_TPR = TPR
            threshold -= 0.01
        
        # stopping condition
        if TPR >= desired_TPR:
            flag = True
            
    # return the one which is nearest to desired_TPR
    if np.abs(desired_TPR-TPR) > np.abs(desired_TPR-prev_TPR):
        return threshold+0.01, prev_FPR
        
    return threshold, FPR


In [None]:
## The END ##