In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os 
import sys 
# project_dir_path = '/home/onoue/ws/lukasiewicz_1'
project_dir_path = '/Users/keisukeonoue/ws/lukasiewicz_1/'
sys.path.append(project_dir_path)

import pandas as pd
import numpy as np

from sklearn.model_selection import KFold
from sklearn.ensemble import RandomForestClassifier

from src.rulefit import RuleFitClassifier
from src.setup_problem_primal_modular import Setup


# RuleFitClassifier のなかの tree generator (random forest)


In [3]:
import os
from typing import Dict, Any, List
import json

# from .setup_problem import Setup
class Setup_:
    """
    型ヒント用（circular import の回避のため）
    """
    def __init__(self):
        pass

import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score, f1_score
# from sklearn.metrics import confusion_matrix, classification_report

from src.misc import is_symbol
from src.operators import negation



class EvaluateModelRuleFit:
    def __init__(self,
                 path_discretized: str,
                 model: object,
                 KB_origin: List[List[str]],
                 random_state: int,
                 n_splits: int,
                 train_index: np.ndarray,
                 test_index: np.ndarray,
                 name: str = None,
                 note: str = None) -> None:

        self.path_discretized = path_discretized
        self.model = model
        self.KB_origin = KB_origin
        self.random_state = random_state 
        self.n_splits = n_splits
        
        self.train_index = train_index
        self.test_index = test_index

        self.result_dict = {
            'name'     : name,
            'note'     : note,
            'Accuracy' : None,
            'Precision': None,
            'Recall'   : None,
            'F1-score' : None,
            'Auc'      : None,
            'len_U'    : None,
            'Rules'    : {'violation': 0, 'total': len(self.KB_origin)},
            'Rules_detail': {}
        }

    def calculate_scores(self) -> None:
        data = pd.read_csv(self.path_discretized, index_col=0)
        data = data.reset_index(drop=True)

        X = data.drop('Outcome', axis=1)
        y = data['Outcome']
        y.replace(0, -1, inplace=True)

        X_train, y_train = X.loc[self.train_index, :], y.loc[self.train_index]
        X_test, y_test = X.loc[self.test_index, :], y.loc[self.test_index]

        feature_names = list(X_train.columns)
        X_train = X_train.values
        X_test  = X_test.values
        y_train = y_train.values
        y_test  = y_test.values        
        self.model.fit(X_train, y_train, feature_names=feature_names)


        y_pred = self.model.tree_generator.predict(X_test)

        y_pred_interpreted = np.where(y_pred == 0, -1, y_pred)
        
        y_pred = self.model.tree_generator.predict_proba(X_test)[:, 1]

        # 精度等の一般的な評価指標の計算
        accuracy = accuracy_score(y_test, y_pred_interpreted)
        precision = precision_score(y_test, y_pred_interpreted)
        recall = recall_score(y_test, y_pred_interpreted)
        f1 = f1_score(y_test, y_pred_interpreted)
        roc_auc = roc_auc_score(y_test, y_pred)

        self.result_dict['Accuracy'] = float(accuracy)
        self.result_dict['Precision'] = float(precision)
        self.result_dict['Recall'] = float(recall)
        self.result_dict['F1-score'] = float(f1)
        self.result_dict['Auc'] = float(roc_auc)
        
        # ルール違反
        rules_tmp = []
        for rule in self.KB_origin:
            if "Outcome" in rule:
                tmp = {}
                for idx, item in enumerate(rule):
                    if not is_symbol(item):
                        if idx == 0 or rule[idx - 1] != '¬':
                            tmp[item] = 1
                        elif item != "Outcome":
                            tmp[item] = 0
                        else:
                            tmp[item] = -1
                rules_tmp.append(tmp)

        
        X_test = pd.DataFrame(X_test, columns=feature_names, index=self.test_index)
        y_pred_interpreted = pd.DataFrame(y_pred_interpreted, index=self.test_index)
        

        for i, rule in enumerate(rules_tmp):
            outcome = rule["Outcome"]
            condition = " & ".join([f"{column} == {value}" for column, value in rule.items() if column != "Outcome"])

            tmp = y_pred_interpreted.loc[X_test.query(condition).index]

            violation_bool = 1 if int((tmp != outcome).sum().iloc[0]) >= 1 else 0
            self.result_dict['Rules']['violation'] += violation_bool
            self.result_dict['Rules_detail'][i] = {
                'rule': " ".join(self.KB_origin[i]),
                'violation': violation_bool,
            }

    def save_result_as_json(self, file_path) -> None:
        with open(file_path, 'w') as f:
            json.dump(self.result_dict, f, indent=4)

    def evaluate(self, save_file_path: str = './result_1.json') -> None:
        self.calculate_scores()
        self.save_result_as_json(file_path=save_file_path)

In [10]:
# KB_origin の生成
data_dir_path = os.path.join(project_dir_path, 'inputs/pima_indian_diabetes_cv_3/fold_0')
file_list = os.listdir(os.path.join(data_dir_path, 'train'))

L_files = [filename for filename in file_list 
           if filename.startswith('L') and filename.endswith('.csv')]

U_files = [filename for filename in file_list 
           if filename.startswith('U') and filename.endswith('.csv')]

file_names_dict = {
    'supervised': L_files,
    'unsupervised': U_files,
    'rule': ['rules_3.txt']
}

problem_instance = Setup(data_dir_path, 
                         file_names_dict, 
                         None)

problem_instance.load_rules()

print(len(problem_instance.KB_origin))

for formula in problem_instance.KB_origin:
    print(formula)

load_rules took 0.0004918575286865234 seconds!
24
['Pregnancies_Low', '→', '¬', 'Outcome']
['Pregnancies_High', '→', 'Outcome']
['Glucose_Low', '→', '¬', 'Outcome']
['Glucose_High', '→', 'Outcome']
['BMI_Low', '→', '¬', 'Outcome']
['BMI_Medium', '→', 'Outcome']
['DiabetesPedigreeFunction_Low', '→', '¬', 'Outcome']
['Age_Low', '→', '¬', 'Outcome']
['Age_Medium', '→', 'Outcome']
['¬', 'Pregnancies_Medium', '⊗', 'Glucose_High', '⊗', '¬', 'BMI_Low', '⊗', '¬', 'DiabetesPedigreeFunction_Low', '⊗', 'BMI_Medium', '→', 'Outcome']
['¬', 'Glucose_Low', '⊗', '¬', 'DiabetesPedigreeFunction_Low', '⊗', 'Age_Medium', '⊗', 'BloodPressure_Medium', '⊗', '¬', 'BMI_Low', '⊗', 'Glucose_Medium', '→', 'Outcome']
['¬', 'Glucose_High', '⊗', '¬', 'Glucose_Low', '⊗', '¬', 'Pregnancies_High', '⊗', 'DiabetesPedigreeFunction_Low', '⊗', '¬', 'BloodPressure_Medium', '⊗', '¬', 'BloodPressure_Low', '⊗', '¬', 'Age_Low', '⊗', '¬', 'SkinThickness_Medium', '→', '¬', 'Outcome']
['¬', 'BMI_Low', '⊗', 'Glucose_Medium', '⊗', '¬

In [11]:
path_discretized = "./data/diabetes_discretized.csv"
data = pd.read_csv(path_discretized, index_col=0)
data = data.reset_index(drop=True)
X = data.drop('Outcome', axis=1)    
y = data['Outcome']
y.replace(0, -1, inplace=True)

random_state = 42
n_splits = 5

kf = KFold(n_splits=n_splits)

for i, (train_index, test_index) in enumerate(kf.split(X)):

    rfmode = 'classify'
    tree_generator = RandomForestClassifier(random_state=random_state)
    # ここでの random forest に対する seed の設定は意味がない

    model = RuleFitClassifier(rfmode=rfmode,
                            tree_generator=tree_generator,
                            random_state=random_state,
                            exp_rand_tree_size=False)

    save_file_path = f'./../../outputs/pima_indian_diabetes_5/fold_{i}/result_rulefit_1.json'
    model_name = "random forest (rulefit)"
    note = None

    evaluate_model = EvaluateModelRuleFit(path_discretized=path_discretized,
                                model=model,
                                name=model_name,
                                random_state=random_state,
                                n_splits=n_splits,
                                train_index=train_index,
                                test_index=test_index,
                                KB_origin=problem_instance.KB_origin)

    evaluate_model.evaluate(save_file_path=save_file_path)

# RuleFitClassifier そのもの

In [13]:
class EvaluateModelRuleFit2:
    def __init__(self,
                 path_discretized: str,
                 model: object,
                 KB_origin: List[List[str]],
                 random_state: int,
                 n_splits: int,
                 train_index: np.ndarray,
                 test_index: np.ndarray,
                 name: str = None,
                 note: str = None) -> None:

        self.path_discretized = path_discretized
        self.model = model
        self.KB_origin = KB_origin
        self.random_state = random_state 
        self.n_splits = n_splits

        self.train_index = train_index
        self.test_index = test_index

        self.result_dict = {
            'name'     : name,
            'note'     : note,
            'Accuracy' : None,
            'Precision': None,
            'Recall'   : None,
            'F1-score' : None,
            'Auc'      : None,
            'len_U'    : None,
            'Rules'    : {'violation': 0, 'total': len(self.KB_origin)},
            'Rules_detail': {}
        }

    def calculate_scores(self) -> None:
        data = pd.read_csv(self.path_discretized, index_col=0)
        data = data.reset_index(drop=True)
        X = data.drop('Outcome', axis=1)
        y = data['Outcome']
        y.replace(0, -1, inplace=True)

        X_train, y_train = X.loc[self.train_index, :], y.loc[self.train_index]
        X_test, y_test = X.loc[self.test_index, :], y.loc[self.test_index]

        feature_names = list(X_train.columns)
        X_train = X_train.values
        X_test  = X_test.values
        y_train = y_train.values
        y_test  = y_test.values        
        self.model.fit(X_train, y_train, feature_names=feature_names)

        y_pred = self.model.predict(X_test)

        y_pred_interpreted = np.where(y_pred == 0, -1, y_pred)
        
        y_pred = self.model.predict_proba(X_test)[:, 1]

        # 精度等の一般的な評価指標の計算
        accuracy = accuracy_score(y_test, y_pred_interpreted)
        precision = precision_score(y_test, y_pred_interpreted)
        recall = recall_score(y_test, y_pred_interpreted)
        f1 = f1_score(y_test, y_pred_interpreted)
        roc_auc = roc_auc_score(y_test, y_pred)

        self.result_dict['Accuracy'] = float(accuracy)
        self.result_dict['Precision'] = float(precision)
        self.result_dict['Recall'] = float(recall)
        self.result_dict['F1-score'] = float(f1)
        self.result_dict['Auc'] = float(roc_auc)
        
        # ルール違反
        rules_tmp = []
        for rule in self.KB_origin:
            if "Outcome" in rule:
                tmp = {}
                for idx, item in enumerate(rule):
                    if not is_symbol(item):
                        if idx == 0 or rule[idx - 1] != '¬':
                            tmp[item] = 1
                        elif item != "Outcome":
                            tmp[item] = 0
                        else:
                            tmp[item] = -1
                rules_tmp.append(tmp)

        
        X_test = pd.DataFrame(X_test, columns=feature_names, index=self.test_index)
        y_pred_interpreted = pd.DataFrame(y_pred_interpreted, index=self.test_index)
        

        for i, rule in enumerate(rules_tmp):
            outcome = rule["Outcome"]
            condition = " & ".join([f"{column} == {value}" for column, value in rule.items() if column != "Outcome"])

            tmp = y_pred_interpreted.loc[X_test.query(condition).index]

            violation_bool = 1 if int((tmp != outcome).sum().iloc[0]) >= 1 else 0
            self.result_dict['Rules']['violation'] += violation_bool
            self.result_dict['Rules_detail'][i] = {
                'rule': " ".join(self.KB_origin[i]),
                'violation': violation_bool,
            }

    def save_result_as_json(self, file_path) -> None:
        with open(file_path, 'w') as f:
            json.dump(self.result_dict, f, indent=4)

    def evaluate(self, save_file_path: str = './result_1.json') -> None:
        self.calculate_scores()
        self.save_result_as_json(file_path=save_file_path)

In [14]:
path_discretized = "./data/diabetes_discretized.csv"
data = pd.read_csv(path_discretized, index_col=0)
data = data.reset_index(drop=True)
X = data.drop('Outcome', axis=1)    
y = data['Outcome']
y.replace(0, -1, inplace=True)
random_state = 42
n_splits = 5

kf = KFold(n_splits=n_splits)

for i, (train_index, test_index) in enumerate(kf.split(X)):

    rfmode = 'classify'
    tree_generator = RandomForestClassifier(random_state=random_state)
    # ここでの random forest に対する seed の設定は意味がない

    model = RuleFitClassifier(rfmode=rfmode,
                            tree_generator=tree_generator,
                            random_state=random_state,
                            exp_rand_tree_size=False)

    save_file_path = f'./../../outputs/pima_indian_diabetes_5/fold_{i}/result_rulefit_2.json'
    model_name = "RuleFitClassifier"
    note = None

    evaluate_model = EvaluateModelRuleFit2(path_discretized=path_discretized,
                                model=model,
                                name=model_name,
                                random_state=random_state,
                                n_splits=n_splits,
                                train_index=train_index,
                                test_index=test_index,
                                KB_origin=problem_instance.KB_origin)

    evaluate_model.evaluate(save_file_path=save_file_path)