In [49]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd
import numpy as np
#import matplotlib.pyplot as plt
from imblearn.over_sampling import SMOTE

best_prec = 0
depth = 9
random_state = 132
criterion = "gini"
max_depth = 5
max_features = "sqrt"
n_estimators = 150
max_leaf_nodes = 30
bootstrap = False

data_table = pd.read_csv('data/german.csv')

X = data_table.drop('Creditability', axis=1).values
y = data_table['Creditability'].values
#X = StandardScaler().fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.30, random_state=random_state)

sm = SMOTE(random_state=random_state)
X_train, y_train = sm.fit_resample(X_train, y_train)
r_clf = RandomForestClassifier(
    max_depth=depth, random_state=random_state, n_estimators=n_estimators)

r_clf.fit(X_train, y_train)
y_pred = r_clf.predict(X_test)
features = data_table.drop('Creditability', axis=1).columns.to_list()

print('Accuracy Score is', accuracy_score(y_test, y_pred))
print('Precision is', precision_score(y_test, y_pred))
print('Recall is', recall_score(y_test, y_pred))
print('F1-Score is', f1_score(y_test, y_pred))


class PathExtractor():
    def __init__(self, forest, X, y, X_train, y_train, features):
        self.X, self.y = X, y   # original training data

        self.features = features
        self.n_features = len(self.features)
        self.categories = np.unique(y).tolist()
        self.n_categories = len(self.categories)
        self.feature_range = [np.min(X, axis=0), np.max(X, axis=0)+1e-9]
        self.category_total = [np.sum(self.y == i)
                               for i in range(self.n_categories)]
        self.forest = forest
        self.X = X
        self.y = y
        self.X_train = X_train
        self.y_train = y_train
        self.n_examples = len(self.y)
        self.n_examples2 = len(self.y_train)
        self.n_estimators = forest.n_estimators

    def get_paths(self, min_impurity_decrease=0.0):
        self.paths = [[] for i in range(self.n_estimators)]
        self.min_impurity_decrease = min_impurity_decrease
        for i in range(self.n_estimators):
            self.dfs(i, 0, {}, np.ones(self.n_examples),
                     np.ones(self.n_examples2))
        return self.paths

    def dfs(self, i, u, feature_range, vec_examples, vec_examples2):
        tr = self.forest.estimators_[i].tree_

        def impurity_decrease(tr, u):
            N_t = tr.n_node_samples[u]
            I_t = tr.impurity[u]
            N = tr.n_node_samples[0]
            Lc = tr.children_left[u]
            Rc = tr.children_right[u]
            N_l = tr.n_node_samples[Lc]
            I_l = tr.impurity[Lc]
            N_r = tr.n_node_samples[Rc]
            I_r = tr.impurity[Rc]
            return N_t/N*(I_t-N_r/N_t*I_r-N_l/N_t*I_l)

        def cpy(m):
            return {key: m[key].copy() for key in m}

        if tr.children_left[u] < 0 or tr.children_right[u] < 0 or impurity_decrease(tr, u) < self.min_impurity_decrease:
            distribution = [np.dot(vec_examples, self.y == cid)
                            for cid in range(self.n_categories)]
            distribution2 = [np.dot(vec_examples2, self.y_train == cid)
                             for cid in range(self.n_categories)]
            output = np.argmax(distribution2)
            coverage = sum(distribution)
            if coverage > 0:
                self.paths[i].append({
                    "name": 'r%d-%d' % (len(self.paths[i]), i),
                    "tree_index": i,
                    "rule_index": len(self.paths[i]),
                    "range": {str(key): feature_range[key].copy() for key in feature_range},
                    "distribution": distribution,
                    "coverage": coverage,
                    "fidelity": distribution[int(output)] / coverage,
                    "sample": vec_examples,
                    "output": str(output)
                })
        else:
            feature = tr.feature[u]
            threshold = tr.threshold[u]

            _feature_range = cpy(feature_range)
            if not feature in feature_range:
                _feature_range[feature] = [self.feature_range[0]
                                           [feature], self.feature_range[1][feature]+1e-9]
            _feature_range[feature][1] = min(
                _feature_range[feature][1], threshold)

            _vec_examples = vec_examples*(self.X[:, feature] <= threshold)
            _vec_examples2 = vec_examples2 * \
                (self.X_train[:, feature] <= threshold)

            self.dfs(
                i, tr.children_left[u], _feature_range, _vec_examples, _vec_examples2)

            _feature_range = cpy(feature_range)
            if not feature in feature_range:
                _feature_range[feature] = [self.feature_range[0]
                                           [feature], self.feature_range[1][feature]]
            _feature_range[feature][0] = threshold

            _vec_examples = vec_examples*(self.X[:, feature] > threshold)
            _vec_examples2 = vec_examples2 * \
                (self.X_train[:, feature] > threshold)
            self.dfs(
                i, tr.children_right[u], _feature_range, _vec_examples, _vec_examples2)

    # given X as input, find the range of fid-th feature to keep the prediction unchanged
    def getRange(self, X, fid):
        step = (self.feature_range[1][fid]-self.feature_range[0][fid])*0.005
        L, R = X[fid], X[fid]
        Xi = X.copy()
        ei = np.array([1 if i == fid else 0 for i in range(self.n_features)])
        result0 = self.predict([X])[0]
        result1 = result0

        while(result1 == result0 and L > self.feature_range[0][fid]):
            Xi = Xi-step*ei
            result1 = self.predict([Xi])[0]
            L -= step
        L = max(L, self.feature_range[0][fid])
        LC = result1

        Xi = X.copy()
        while(result1 == result0 and R < self.feature_range[1][fid]):
            Xi = Xi+step*ei
            result1 = self.predict([Xi])[0]
            R += step
        R = min(R, self.feature_range[1][fid])
        RC = result1
        return {
            "L": L,
            "LC": LC,  # the prediction when X[fid]=L-eps
            "R": R,
            "RC": RC,  # the prediction when X[fid]=R+eps
        }

rf = PathExtractor(r_clf, X, y, X_train, y_train, features)

Accuracy Score is 0.7733333333333333
Precision is 0.8341463414634146
Recall is 0.8341463414634146
F1-Score is 0.8341463414634146


In [48]:
for k in r_clf.estimators_[0].decision_path(X)[0]:
    print(k)

  (0, 0)	1
  (0, 1)	1
  (0, 2)	1
  (0, 20)	1
  (0, 38)	1
  (0, 39)	1
  (0, 40)	1
  (0, 41)	1
  (0, 42)	1
  (0, 43)	1


In [51]:
from os import path
from copy import deepcopy

def visit_boosting_tree(tree, path = {}):
    if 'decision_type' not in tree:
        path['value'] = tree['leaf_value']
        path['weight'] = tree['leaf_weight']
        return [{
            'range': path,
            'value': tree['leaf_value'],
            'weight': tree['leaf_weight'],
        }]
    
    key = tree['split_feature']
    thres = tree['threshold']
    ret = []
    leftpath = deepcopy(path)
    if key in leftpath:
        r = leftpath[key]
        leftpath[key] = [r[0], min(r[1], thres)]
    else:
        leftpath[key] = [-1e9, thres]
    ret += visit_boosting_tree(tree['left_child'], leftpath)

    rightpath = deepcopy(path)
    if key in rightpath:
        r = rightpath[key]
        rightpath[key] = [max(r[0], thres), r[1]]
    else:
        rightpath[key] = [thres, 1e9]
    ret += visit_boosting_tree(tree['right_child'], rightpath)

    return ret

def visit_decision_tree(tree, index = 0, path = {}):
    if tree.children_left[index] == -1 and tree.children_right[index] == -1:
        return [{
            'range': path,
            'value': 0,
            'weight': 1,
        }]
    key = tree.feature[index]
    thres = tree.threshold[index]
    ret = []
    leftpath = deepcopy(path)
    if key in leftpath:
        r = leftpath[key]
        leftpath[key] = [r[0], min(r[1], thres)]
    else:
        leftpath[key] = [-1e9, thres]
    ret += visit_decision_tree(tree, tree.children_left[index], leftpath)
    
    rightpath = deepcopy(path)
    if key in rightpath:
        r = rightpath[key]
        rightpath[key] = [max(r[0], thres), r[1]]
    else:
        rightpath[key] = [thres, 1e9]
    ret += visit_decision_tree(tree, tree.children_right[index], rightpath)

    return ret

def assign_value_for_random_forest(paths, data):
    X, y = data
    for path in paths:
        ans = 2 * y - 1
        m = path['range']
        for key in m:
            ans = ans * (X[:, int(key)] >= m[key][0]) * (X[:, int(key)] < m[key][1])
        pos = (ans == 1).sum()
        neg = (ans == -1).sum()
        path['value'] = 1 if pos > neg else -1

def path_extractor(model, model_type, data = None):
    if model_type == 'random forest' :
        ret = []
        for estimator in model.estimators_:
            ret += path_extractor(estimator, 'decision tree')
        assign_value_for_random_forest(ret, data)
        return ret
    elif model_type == 'lightgbm':
        ret = []
        info = model._Booster.dump_model()
        for tree in info['tree_info']:
            ret += visit_boosting_tree(tree['tree_structure'])
        return ret
    elif model_type == 'decision tree':
        return visit_decision_tree(model.tree_)
    return []
    

paths = path_extractor(r_clf, 'random forest', (X_train, y_train))

In [53]:
paths[1]

{'range': {0: [-1000000000.0, 3.5],
  14: [-1000000000.0, 1.5],
  1: [-1000000000.0, 15.5],
  5: [-1000000000.0, 3.5],
  18: [-1000000000.0, 1.5],
  4: [-1000000000.0, 2996.0],
  10: [3.5, 1000000000.0],
  7: [-1000000000.0, 2.5]},
 'value': 1,
 'weight': 1}

In [62]:
import pulp
import numpy as np
from sklearn.neighbors import LocalOutlierFactor

class Extractor:
    # 可以调用的接口：compute_accuracy和extract
    def __init__(self, rf, X_train, y_train, X_test, y_test):
        # rf：随机森林模型
        # X_raw、y_raw：训练数据集
        self.rf = rf
        self.X_raw = X_train
        self.y_raw = y_train
        self.X_test = X_test
        self.y_test = y_test
        _paths = rf.get_paths()
        self.paths = [p for r in _paths for p in r]
        self.paths.sort(key=lambda x: -x['coverage'])

    def compute_accuracy_on_train(self, paths):
        # 计算数据集在给定规则集下的accuracy
        # paths：规则集，为list
        Mat = self.getMat(self.X_raw, self.y_raw, paths)
        idx = np.argwhere(np.all(Mat[..., :] == 0, axis=0))
        Mat = np.delete(Mat, idx, axis=1)
        right = np.sum(Mat, axis=0)
        return np.sum(np.where(right >= 0, 1, 0)) / len(self.X_raw)

    def compute_accuracy_on_test(self, paths):
        # 计算数据集在给定规则集下的accuracy
        # paths：规则集，为list
        Mat = self.getMat(self.X_test, self.y_test, paths)
        idx = np.argwhere(np.all(Mat[..., :] == 0, axis=0))
        Mat = np.delete(Mat, idx, axis=1)
        right = np.sum(Mat, axis=0)
        return np.sum(np.where(right >= 0, 1, 0)) / len(self.X_test)

    def extract(self, max_num, tau):
        # 根据给定的max_num和tau，使用rf的全部规则和数据集抽取出相应的规则
        # max_num：抽取出规则的最大数量
        # tau：每个样本允许的最大惩罚
        # 返回抽取出规则的列表、数据集使用全部规则的accuracy、数据集使用抽取规则的accuracy
        Mat = self.getMat(self.X_raw, self.y_raw, self.paths)
        w = self.getWeight(Mat)
        new_paths, new_path_indexes = self.LP_extraction(w, Mat, max_num, tau)
        accuracy_origin = self.compute_accuracy_on_test(self.paths)
        accuracy_new = self.compute_accuracy_on_test(new_paths)
        return new_path_indexes, new_paths, accuracy_origin, accuracy_new

    def path_score(self, path, X, y):
        ans = 2 * (y == int(path.get('output'))) - 1
        m = path.get('range')
        for key in m:
            ans = ans * (X[:, int(key)] >= m[key][0]) * (X[:, int(key)] < m[key][1])
        return ans

    def getMat(self, X_raw, y_raw, paths):
        # 覆盖矩阵Mat
        Mat = np.array([self.path_score(p, X_raw, y_raw) for p in paths]).astype('float')
        return Mat

    def getWeight(self, Mat):
        # 权重向量w
        RXMat = np.abs(Mat)
        XRMat = RXMat.transpose()
        XXAnd = np.dot(XRMat, RXMat)
        XROne = np.ones(XRMat.shape)
        XXOr = 2 * np.dot(XROne, RXMat) - XXAnd
        XXOr = (XXOr + XXOr.transpose()) / 2
        XXDis = 1 - XXAnd / XXOr
        K = int(np.ceil(np.sqrt(len(self.X_raw))))
        clf = LocalOutlierFactor(n_neighbors=K, metric="precomputed")
        clf.fit(XXDis)
        XW = -clf.negative_outlier_factor_
        MXW, mXW = np.max(XW), np.min(XW)
        XW = 1 + (3 - 1) * (XW - mXW) / (MXW - mXW)
        return XW / np.sum(XW)

    def LP_extraction(self, w, Mat, max_num, tau):
        m = pulp.LpProblem(sense=pulp.LpMinimize)
        # 创建最小化问题
        var = []
        for i in range(len(self.paths)):
            var.append(pulp.LpVariable(f'x{i}', cat=pulp.LpContinuous, lowBound=0, upBound=1))
        for i in range(len(w)):
            var.append(pulp.LpVariable(f'k{i}', cat=pulp.LpContinuous, lowBound=0))
        # 添加变量x_0至x_{M-1}, k_0至k_{N-1}

        m += pulp.lpSum([w[j] * (var[j + len(self.paths)])
                         for j in range(len(w))])
        # 添加目标函数

        m += (pulp.lpSum([var[j] for j in range(len(self.paths))]) <= max_num)
        # 筛选出不超过max_num条规则

        for j in range(len(w)):
            m += (var[j + len(self.paths)] >= 1000 + tau - pulp.lpSum([var[k] * Mat[k][j] for k in range(len(self.paths))]))
            m += (var[j + len(self.paths)] >= 1000)
            # max约束

        m.solve(pulp.PULP_CBC_CMD())#solver = pulp.solver.CPLEX())#
        new_paths = [self.paths[i] for i in range(len(self.paths)) if var[i].value() > 0.5]
        new_path_indexes = [self.paths[i]['name'] for i in range(len(self.paths)) if var[i].value() > 0.5]
        return new_paths, new_path_indexes


In [63]:
ex = Extractor(rf, X_train, y_train, X_test, y_test)
ret = ex.extract(100, 2)

In [61]:
indexes = [p['tree_index'] for p in ret[1]]
print(len(ret[1]))
print(len(np.unique(indexes)))

343
139


In [64]:
print(ret[2:])

(0.7733333333333333, 0.62)


In [17]:
import shap

explainer = shap.Explainer(r_clf)
shap_values = explainer(X)


In [20]:

features=[
    {
        "name": rf.features[i],
        "lbound":rf.feature_range[0][i],
        "rbound":rf.feature_range[1][i],
        "importance":r_clf.feature_importances_[i],
        "options":"+",
    } for i in range(rf.n_features)
]

data = {
    'paths': all_paths,
    'features': features,
    'selected': ret[0],
    'shap_values': shap_values,
}

import pickle
pickle.dump(data, open('output/german.pkl', 'wb'))

In [21]:
print(shap_values[0])

.values =
array([[ 1.09110125e-01, -1.09110125e-01],
       [ 3.01264698e-03, -3.01264698e-03],
       [-1.24874332e-01,  1.24874332e-01],
       [ 2.10891519e-03, -2.10891519e-03],
       [-1.99799022e-02,  1.99799022e-02],
       [ 3.40283799e-02, -3.40283799e-02],
       [ 3.85628726e-02, -3.85628726e-02],
       [ 5.60976830e-03, -5.60976830e-03],
       [ 3.36153339e-02, -3.36153339e-02],
       [ 3.85308858e-03, -3.85308858e-03],
       [-2.39386825e-02,  2.39386825e-02],
       [ 7.02421386e-03, -7.02421386e-03],
       [ 1.96232689e-02, -1.96232689e-02],
       [-1.74483971e-02,  1.74483971e-02],
       [ 6.65478065e-02, -6.65478065e-02],
       [ 6.73321595e-03, -6.73321595e-03],
       [ 1.58162875e-03, -1.58162875e-03],
       [-6.93273863e-05,  6.93273863e-05],
       [ 5.12965848e-03, -5.12965848e-03],
       [ 2.51444887e-03, -2.51444887e-03]])

.base_values =
array([0.50028956, 0.49971044])

.data =
array([   1,   18,    4,    2, 1049,    1,    2,    4,    2,    1,    4,

In [32]:
ret[0][3]

'r99-69'

In [18]:
data = pd.read_csv('data/german.data', sep=',')

In [34]:
data[data.columns[3]].dtype == 'O'

True