In [1]:
import numpy as np
import pandas as pd
import time
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from sklearn.linear_model import LogisticRegression
import queue
import math
from enum import Enum
from sklearn.metrics import accuracy_score

In [2]:
data_path = 'D:\\Projects\\MCTS\\datasets\\'
file_names = [{'name':'blood-transfusion-service-center.csv', 'target_class': 'Class', 'pos_class': 'numerical'},
             {'name':'credit-g.csv', 'target_class': 'class', 'pos_class': 'good'},
             {'name':'kr-vs-kp.csv', 'target_class': 'class', 'pos_class': '\'won\''},
             {'name':'monks-problems-2.csv', 'target_class': 'class', 'pos_class': 'numerical'},
             {'name':'diabetes.csv', 'target_class': 'class', 'pos_class': 'tested_positive'},
             {'name':'qsar-biodeg.csv', 'target_class': 'Class', 'pos_class': 'numerical'},
             {'name':'steel-plates-fault.csv', 'target_class': 'Class', 'pos_class': 'numerical'},
             {'name':'tic-tac-toe.csv', 'target_class': 'Class', 'pos_class': 'positive'},
             {'name':'wdbc.csv', 'target_class': 'Class', 'pos_class': 'numerical'},
             {'name':'hill-valley.csv', 'target_class': 'Class', 'pos_class': 'numerical'}]

## Metrics class

In [3]:
class BuildInMetrics:
    def __init__(self):
        self.__metrics = {
            'acc': accuracy_score
        }
        
    def get_metric(self, name):
        if name in self.__metrics:
            return self.__metrics[name]
        else:
            return None

## Scoring function

In [4]:
class ScoringFunctions():
    def __init__(self, name, params):
        self._params = params
        self._name = name
    
    def get_score(self, node, other_scores):
        if(self._name == 'default'):
            return self.default_scoring(node)
        else:
            return None
    
    def default_scoring(self, node):
        if(node._parent_node == None or node.T == 0):
            return float("Inf")
        else:
            return node.get_score() + math.sqrt(self._params['c_e'] * math.log(node._parent_node.T)/node.T)

## Node class

In [5]:
class Node:
    def __init__(self, feature_name, parent_node = None, is_subtree_full = False):
        self.feature_name = feature_name
        self.child_nodes = []
        self.T = 0
        self.score_sum = 0
        self._is_subtree_full = is_subtree_full
        self._parent_node = parent_node
        
    def add_child_node(self, node_name, is_subtree_full = False):
        new_node = Node(node_name, self, is_subtree_full)
        self.child_nodes.append(new_node)
        
    def add_child_nodes(self, node_names):
        for name in node_names:
            self.add_child_node(name, len(node_names) == 1)
      
    def update_node(self, score):
        self.score_sum += score
        self.T += 1
        
        if(len(self.child_nodes) != 0):
            is_subtree_full = True
            for node in self.child_nodes:
                if(not node._is_subtree_full):
                    is_subtree_full = False
                    break
            self._is_subtree_full = is_subtree_full
    
    def get_score(self):
        return float('Inf') if self.T == 0 else self.score_sum/self.T
    
    def update_scores_up(self, score):
        current_node = self
        
        while(current_node != None):
            current_node.update_node(score)
            current_node = current_node._parent_node

In [None]:
# Scenario 1: Creating a Node
print("Scenario 1:")
root = Node("")
print(root.feature_name == "")
print(root.T == 0)
print(root._is_subtree_full == False)
print(root._parent_node == None)
print(root.T == 0)
print(root.get_score() == float('Inf'))

# Scenario 2: Adding child nodes
print("Scenario 2:")
root.add_child_nodes(['b','c','d'])
print(len(root.child_nodes) == 3)
print(root.child_nodes[0].feature_name == 'b')
print(root.child_nodes[1].feature_name == 'c')
print(root.child_nodes[2].feature_name == 'd')
root.child_nodes[0].update_scores_up(1)
print(root.child_nodes[0].T == 1)
print(root.child_nodes[0].get_score() == 1)
print(root.get_score() == 1)
print(root.T == 1)
root.child_nodes[1].update_scores_up(0.8)
print(root.child_nodes[1].T == 1)
print(root.child_nodes[1].get_score() == 0.8)
print(root.get_score() == 0.9)
print(root.T == 2)
root.child_nodes[2].update_scores_up(0.6)
print(root.child_nodes[2].T == 1)
print(root.child_nodes[2].get_score() == 0.6)
print(root.get_score() == 2.4/3)
print(root.T == 3)

# Another level
print("Scenario 3")
root.child_nodes[0].add_child_nodes(['c','d'])
print(len(root.child_nodes[0].child_nodes) == 2)
print(root.child_nodes[0].child_nodes[0].feature_name == 'c')
print(root.child_nodes[0].child_nodes[1].feature_name == 'd')
print(root.child_nodes[0].child_nodes[0]._is_subtree_full == False)
root.child_nodes[0].child_nodes[0].update_scores_up(0.2)
print(root.child_nodes[0].child_nodes[0].T == 1)
print(root.child_nodes[0].child_nodes[0].get_score() == 0.2)
print(root.child_nodes[0].T == 2)
print(root.child_nodes[0].get_score() == 0.6)
print(root.T == 4)
print(root.get_score() == 2.6/4)
# Scenario 3: last 
print("Scenario 4:")
root.child_nodes[0].child_nodes[0].add_child_nodes(['d'])
print(len(root.child_nodes[0].child_nodes[0].child_nodes) == 1)
print(root.child_nodes[0].child_nodes[0].child_nodes[0]._is_subtree_full)
print(root.child_nodes[0].child_nodes[0]._is_subtree_full == False)
root.child_nodes[0].child_nodes[0].child_nodes[0].update_scores_up(0.4)
print(root.child_nodes[0].child_nodes[0]._is_subtree_full)
print(root.child_nodes[0].child_nodes[1]._is_subtree_full == False)
print(root.child_nodes[0]._is_subtree_full == False)
print(root.child_nodes[0].child_nodes[0].child_nodes[0].T == 1)
print(root.child_nodes[0].child_nodes[0].child_nodes[0].get_score() == 0.4)
print(root.child_nodes[0].child_nodes[0].T == 2)

## Multiarm strategies

In [176]:
class MultiArmStrategies:
    def __init__(self, name, all_node_names):
        self._name = name
        self._all_node_names = all_node_names
        
    def multiarm_strategy(self, node, used_features, scoring_function, other_scores):
        if(self._name == 'default'):
            return self._default_strategy(node, used_features, scoring_function, other_scores)
        else:
            return None    
    
    def _default_strategy(self, node, used_features, scoring_function, other_scores):
        #print("default strategy: " + node.feature_name)
        if(len(node.child_nodes) == 0):
            #print("first if")
            self._add_child_nodes(node, used_features)
            return node.child_nodes[0]
        else:
            #print("else")
            best_score = 0
            best_node = None
            tmp_score = 0
            
            for child_node in node.child_nodes:
                score = scoring_function(child_node, other_scores)
                if(score > best_score):
                    best_score = score
                    best_node = child_node
            return best_node
  
    def _add_child_nodes(self, node, used_features):
        #print('adding nodes to ' + node.feature_name + ' :' + ' '.join(self._all_node_names - used_features))
        node.add_child_nodes(self._all_node_names - used_features)   

## EndStrategies

In [177]:
class EndStrategies:
    def __init__(self, name):
        self._name = name
    
    def are_calculations_over(self, node, params):
        if(self._name == 'default'):
            return self._first_new_strategy(node, params)
        else:
            return None
        
    def _first_new_strategy(self, node, params):
        #print("_first_new_strategy:")
        if(node.T > 0 and not node._is_subtree_full):
            #print("first if")
            return False
        else:
            if(node._parent_node == None):
                #print("second if, no parent node")
                return False
            else:
                #print("else")
                return True


## DefaultSettings

In [178]:
class DefaultSettings:
    @staticmethod
    def get_default_params():
        return {
            "c_e": 2
        }
    

## MCTS

In [188]:
class MCTS:
    def __init__(self, 
                 task,
                 model,
                 calculactions_done_conditions = {'type': 'iterations', 'max_val': 10},
                 params = None,
                 metric = 'acc', 
                 scoring_function = 'default', 
                 multiarm_strategy = 'default', 
                 end_strategy = 'default'):
        self._metric_name = metric
        self._scoring_function_name = scoring_function
        self._multiarm_strategy_name = multiarm_strategy
        self._end_strategy_name = end_strategy
        self._best_features = None
        self._best_score = 0
        self._task = task
        self._root = Node("")
        self._feature_names = None
        self._calculactions_done_conditions = calculactions_done_conditions
        self._model = model
        self._time = 0
        self._iterations = 0
        
        if(params is None):
            self._params = DefaultSettings.get_default_params()
        
    def fit(self, data, out_variable):
        
        if(self._task == 'classification'):
            self._classification_fit(data, out_variable)
        else:
            self._regression_fit(data, out_variable)
            
    def _classification_fit(self, data, out_variable):
        self._init_fitting_values()
        while not self._is_fitting_over():
            self._single_classification_iteration(data, out_variable)
        
        self._model.fit(data.loc[:, self._best_features], out_variable)
    
    def _regression_fit(self, data, out_variable):
        return None
    
    def _single_classification_iteration(self, data, out_variable):
        #print('classification iteration')
        used_features = set()
        node = self._root
        is_iteration_over = False
        while not is_iteration_over:
            node = self._multiarm_strategy.multiarm_strategy(node, used_features, self._scoring_function.get_score, self._other_scores)
            #print("selected feature: " + node.feature_name)
            is_iteration_over = self._end_strategy.are_calculations_over(node, self._params)
            used_features.add(node.feature_name)
            #print("--------")
        
        X_train, X_test, y_train, y_test = train_test_split(data.loc[:,used_features], out_variable, test_size = 0.25, random_state = 123)
        self._model.fit(X_train, y_train)
        predicted = self._model.predict(X_test)
        score = self._metric(y_test, predicted)
        #print('score: ' + str(score))
        node.update_scores_up(score)
        #print(used_features)
        if(score > self._best_score):
            self._best_score = score
            self._best_features = used_features
        
        if(self._longest_tree_branch < len(used_features)):
            self._longest_tree_branch = len(used_features)
        
        #print("----------END OF ITERATION----------")
    
    def _single_regression_iteration(self):
        return None
    
    def _init_fitting_values(self):
        self._feature_names = set(data.columns)
        self._multiarm_strategy = MultiArmStrategies(self._multiarm_strategy_name, self._feature_names)
        self._end_strategy = EndStrategies(self._end_strategy_name)
        self._scoring_function = ScoringFunctions(self._scoring_function_name, self._params)
        self._metric = BuildInMetrics().get_metric(self._metric_name)
        self._other_scores = {}
        self._best_features = None
        self._best_score = 0
        self._iterations = 0
        self._longest_tree_branch = 0
        self._time = time.time()
    
    def _is_fitting_over(self):
        if(self._root._is_subtree_full):
            print('Whole tree searched, finishing prematurely')
            return True
        
        if(self._calculactions_done_conditions['type'] == 'iterations'):
            self._iterations += 1
            return self._iterations > self._calculactions_done_conditions['max_val'] 
        else:
            #print('Time ellapsed: ' + str(time.time() - self._time))
            return (time.time() - self._time) > self._calculactions_done_conditions['max_val'] 
        
    def predict(self, data):
        return self._model.predict(data.loc[:, self._best_features])

In [189]:
data = pd.read_csv(data_path + file_names[0]['name'])
data_labels = data.loc[:,file_names[0]['target_class']]
data = data.drop(columns = [file_names[0]['target_class']])
X_train, X_test, y_train, y_test = train_test_split(data, data_labels, test_size = 0.25, random_state = 123)

In [190]:
mcts = MCTS("classification", XGBClassifier(), calculactions_done_conditions = {'type': 'time', 'max_val': 10})
mcts.fit(X_train, y_train)
predicted = mcts.predict(X_test)
(predicted == y_test).sum()/len(y_test)

Whole tree searched, finishing prematurely


0.7593582887700535

In [191]:
mcts._best_features

{'V3', 'V4'}

In [149]:
data = pd.read_csv(data_path + file_names[9]['name'])
data_labels = data.loc[:,file_names[9]['target_class']]
data = data.drop(columns = [file_names[9]['target_class']])
X_train, X_test, y_train, y_test = train_test_split(data, data_labels, test_size = 0.25, random_state = 123)

In [150]:
mcts = MCTS("classification", XGBClassifier(), calculactions_done_conditions = {'type': 'time', 'max_val': 600})
mcts.fit(X_train, y_train)

In [151]:
predicted = mcts.predict(X_test)
(predicted == y_test).sum()/len(y_test)

0.5313531353135313