## 导入依赖环境

In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
import random
import time

def set_seed(s=0):
    random.seed(s)
    np.random.seed(s)
    tf.random.set_seed(s)

from sklearn.model_selection import train_test_split


## 数据导入以及预处理

In [2]:
df = pd.read_csv('E:\Graduation_Project\datasets\heart.csv')
# 1️⃣ 划分类别型和数值型特征
categorical_features = ['sex', 'cp', 'fbs', 'restecg', 'exang', 'slope', 'ca', 'thal']
numeric_features = [col for col in df.columns if col not in categorical_features + ['target']]
df[categorical_features] = df[categorical_features].astype(str)



# UCI 心脏病数据集（Cleveland Heart Disease Dataset）

该数据集包含患者基本信息、临床检查和心电图结果等 14 个特征，用于预测是否患有心脏病。

---

## 字段说明

- **Age**：年龄（岁），年龄越大风险越高。  
- **Sex**：性别（1=男性，0=女性），男性风险通常更高。  
- **CP**：胸痛类型（0=典型心绞痛，1=非典型，2=非心源性，3=无症状），反映心脏病风险差异。  
- **Trestbps**：静息血压（mmHg），高血压会增加心脏病风险。  
- **Chol**：血清胆固醇（mg/dl），高胆固醇易导致动脉粥样硬化。  
- **FBS**：空腹血糖>120 mg/dl（1=是，0=否），糖尿病患者风险更高。  
- **RestECG**：静息心电图（0=正常，1=ST-T 异常，2=左心室肥大），反映心脏功能。  
- **Thalach**：最大心率，数值低可能提示心功能异常。  
- **Exang**：运动诱发心绞痛（1=是，0=否），供血不足的表现。  
- **Oldpeak**：运动引起的 ST 段压低（mm），数值大说明缺血风险高。  
- **Slope**：运动心电图 ST 段斜率（0=下斜危险最大，1=平，2=上斜相对正常）。  
- **CA**：主要血管数（0–3），堵塞数量越多风险越大。  
- **Thal**：核医学扫描结果（3=正常，6=固定缺陷，7=可逆缺陷），可逆缺陷提示心肌缺血。  
- **Target**：心脏病诊断（1=有病，0=无病），预测目标变量。

---

## 总结
这些变量涵盖 **人口学信息、实验室检查、心电图与造影指标**，共同用于预测心脏病的发生。


In [3]:
import dice_ml
from dice_ml.utils.helpers import DataTransfomer
transformer = DataTransfomer(func='ohe-min-max')


target = df['target']
train_dataset, test_dataset, y_train, y_test = train_test_split(df, 
                                                                target,
                                                                test_size=0.2, 
                                                                random_state=42, 
                                                                stratify=df['target'])



X_train_df = train_dataset.drop('target', axis=1)
X_test_df = test_dataset.drop('target', axis=1)
d = dice_ml.Data(dataframe=df,
                 continuous_features=numeric_features,
                 outcome_name='target')
# 2. 绑定 data_interface，用于定义哪些列是连续/类别/目标变量等
transformer.feed_data_params(d)
# ✅ 3. 初始化转换器（这是你漏掉的步骤！）
transformer.initialize_transform_func()

X_train = transformer.transform(X_train_df)
X_test = transformer.transform(X_test_df)


In [4]:
import numpy as np

# 如果是稀疏矩阵（如OneHot+标准化后的输出），先转为dense
if hasattr(X_train, "toarray"):
    X_dense = X_train.toarray()
else:
    X_dense = X_train

# 1. 计算中心点（每列均值）
centroid = np.mean(X_dense, axis=0)

# 2. 计算每个样本到中心的欧氏距离
distances = np.linalg.norm(X_dense - centroid, axis=1)

# 3. 找到最近样本的索引
closest_idx = np.argmin(distances)
closest_idx

84

## 待解释模型训练以及评估

In [5]:
from tensorflow import keras
from tensorflow.keras.utils import to_categorical
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score

def build_simple_dnn():
    model = keras.models.Sequential()
    model.add(keras.layers.Dense(16, activation='relu', input_shape=(31,)))  # 输入31维特征
    model.add(keras.layers.Dense(2, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

set_seed(1)

model = build_simple_dnn()

# 6️⃣ 训练模型
model.fit(X_train, to_categorical(y_train), epochs=10, batch_size=8, verbose=1)
model.save_weights('my_model_weights.h5')  # 只保存权重

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [6]:
# 预测（以X_test为例）
y_pred_prob = model.predict(X_test)

# 概率转类别
y_pred = y_pred_prob.argmax(axis=1)

# 计算准确率
from sklearn.metrics import accuracy_score
acc = accuracy_score(y_test, y_pred)
print(f'Accuracy: {acc:.3f}')

Accuracy: 0.836


In [7]:
from sklearn.metrics import classification_report, f1_score
import numpy as np

# 预测概率
y_probs = model.predict(X_test)
# 转为类别标签（取概率最大索引）
y_pred = np.argmax(y_probs, axis=1)

# 打印分类报告（含F1分数、准确率、召回率）
print(classification_report(y_test, y_pred, digits=4))

# 也可以分别提取 F1（宏、微）
f1_macro = f1_score(y_test, y_pred, average='macro')
f1_micro = f1_score(y_test, y_pred, average='micro')

print(f"F1 Macro: {f1_macro:.4f}, F1 Micro: {f1_micro:.4f}")

              precision    recall  f1-score   support

           0     0.8400    0.9545    0.8936        44
           1     0.8182    0.5294    0.6429        17

    accuracy                         0.8361        61
   macro avg     0.8291    0.7420    0.7682        61
weighted avg     0.8339    0.8361    0.8237        61

F1 Macro: 0.7682, F1 Micro: 0.8361


## 编译DiCE反事实解释器
### (1)KDtree

In [8]:
import dice_ml
# Step 1: dice_ml.Data
d = dice_ml.Data(dataframe=df,
                 continuous_features=numeric_features,
                 outcome_name='target')
# Using sklearn backend
m = dice_ml.Model(model=model, 
                  backend="TF2", 
                  func="ohe-min-max")
# Using method=random for generating CFs
exp = dice_ml.Dice(d, m,
                   method="kdtree")


In [9]:
from dice_ml.model_interfaces.keras_tensorflow_model import KerasTensorFlowModel
def patched_get_output(self, input_tensor, model_score=True, training=False, transform_data=False):
    import tensorflow as tf
    if transform_data or not tf.is_tensor(input_tensor):
        input_tensor = tf.constant(self.transformer.transform(input_tensor).to_numpy(), dtype=tf.float32)
    output = self.model(input_tensor, training=training)
    return output
KerasTensorFlowModel.get_output = patched_get_output

#### KDtree方法训练以及评估

In [10]:
set_seed(0)

start = time.time()
e1_kdtree = exp.generate_counterfactuals(
    X_test_df,
    total_CFs=4,
    desired_class="opposite",
    features_to_vary = X_train_df.columns[X_train_df.columns != 'sex'].tolist()
)
end = time.time()
time_kdtree = (end - start)/X_test.shape[0]

100%|██████████| 61/61 [00:23<00:00,  2.57it/s]


In [11]:
exp.generate_counterfactuals(
    X_train_df[closest_idx:closest_idx+1],
    total_CFs=4,
    desired_class="opposite",
    features_to_vary = X_train_df.columns[X_train_df.columns != 'sex'].tolist()
).visualize_as_dataframe(show_only_changes=True)

100%|██████████| 1/1 [00:00<00:00,  1.81it/s]

Query instance (original outcome : 0)





Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
0,54,1,4,140,239,0,0,160,0,1.2,1,0,normal,0



Diverse Counterfactual set (new outcome: 1.0)


Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
31,60,-,-,117,230,1,-,-,1,-,-,2,reversible,1
42,67,-,-,125,254,1,-,-,-,0.2,2,2,reversible,1
124,-,-,-,108,-,1,-,147,-,0.1,-,3,reversible,1
282,59,-,-,110,-,-,2,142,1,-,2,1,reversible,1


### (2)Random

In [12]:

exp = dice_ml.Dice(d, m,
                   method="random")
start = time.time()
e1_random = exp.generate_counterfactuals(
    X_test_df,
    total_CFs=4,
    desired_class="opposite",
    features_to_vary = X_train_df.columns[X_train_df.columns != 'sex'].tolist()
)
end = time.time()
time_random = (end - start)/X_test.shape[0]

100%|██████████| 61/61 [00:44<00:00,  1.36it/s]


In [13]:
exp.generate_counterfactuals(
    X_train_df[closest_idx:closest_idx+1],
    total_CFs=4,
    desired_class="opposite",
    features_to_vary = X_train_df.columns[X_train_df.columns != 'sex'].tolist()
).visualize_as_dataframe(show_only_changes=True)

100%|██████████| 1/1 [00:01<00:00,  1.07s/it]

Query instance (original outcome : 0)





Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
0,54,1,4,140,239,0,0,160,0,1.2,1,0,normal,0



Diverse Counterfactual set (new outcome: 1)


Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
0,-,-,-,-,352,1,-,-,-,-,-,2,reversible,1
1,-,-,-,110,-,-,-,-,1,-,-,3,reversible,1
2,-,-,-,-,-,-,-,-,1,2.5,2,-,reversible,1
3,-,-,-,-,-,1,-,-,-,-,-,2,reversible,1


### (3)Genetic

In [14]:

exp = dice_ml.Dice(d, m,
                   method="genetic")
start = time.time()
e1_genetic = exp.generate_counterfactuals(
    X_test_df,
    total_CFs=4,
    desired_class="opposite",
    features_to_vary = X_train_df.columns[X_train_df.columns != 'sex'].tolist()
)
end = time.time()
time_genetic = (end - start)/X_test.shape[0]

100%|██████████| 61/61 [02:08<00:00,  2.10s/it]


In [15]:
exp.generate_counterfactuals(
    X_train_df[closest_idx:closest_idx+1],
    total_CFs=4,
    desired_class="opposite",
    features_to_vary = X_train_df.columns[X_train_df.columns != 'sex'].tolist()
).visualize_as_dataframe(show_only_changes=True)

100%|██████████| 1/1 [00:00<00:00,  1.66it/s]

Query instance (original outcome : 0)





Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
0,54,1,4,140,239,0,0,160,0,1.2,1,0,normal,0



Diverse Counterfactual set (new outcome: 1.0)


Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
0,60,-,-,-,293,-,2,170,-,-,2,2,reversible,1
0,29,-,-,-,-,-,-,139,1,2.0,2,-,reversible,1
0,59,-,-,110,-,-,2,142,1,-,2,1,reversible,1
0,60,-,-,117,230,1,-,-,1,1.4,-,2,reversible,1


### (4)Gradient

In [16]:

def store_cfs(e1, length):
    list_cfs = []
    for i in range(length):
        list_cfs.append((
            e1.cf_examples_list[i].test_instance_df,
            e1.cf_examples_list[i].final_cfs_df
              ))
    return list_cfs


注意！！！这里模型在未达到最大迭代数便收敛到了最优解，但是输出的概率依旧存在一个与原始样本预测类别一样的错误反事实解释。原因在于模型内部再逆映射时，会对结果进行舍入到原始数据的精度，而这种微小的改变可能导致预测值的大幅更改，这点我已验证。这里发现validation概率偏低的原因是数据中类别变量存在int型数据，而DiCE内部转化是按照string的形式转化，如果你加上 handle_unknown='ignore'，遇到新类别时不会报错，而是全部置零（即该特征的 one-hot 全是0，模型会自动忽略掉这个新类别）。而我们的int型数据虽然在肉眼上和string是一样的，但编码器不这么认为，所以会出现大量的全零数据代表新类别，从而导致预测结果强烈偏差。

In [17]:
start = time.time()
exp = dice_ml.Dice(d, m,
                   method="gradient")
e1_gradient = exp.generate_counterfactuals(
    X_test_df[0:1],
    total_CFs=4,
    desired_class="opposite",
    features_to_vary=X_train_df.columns[X_train_df.columns != 'sex'].tolist(),
    min_iter=0,
    max_iter=800
)

for i in range(1, X_test.shape[0]):
    e1_i = exp.generate_counterfactuals(
        X_test_df[i:i+1],
        total_CFs=4,
        desired_class="opposite",
        features_to_vary = X_train_df.columns[X_train_df.columns != 'sex'].tolist(),
        min_iter=0,
        max_iter=800
    )
    e1_gradient.cf_examples_list.append(e1_i.cf_examples_list[0])
end = time.time()

time_gradient = (end - start)/X_test.shape[0]

Diverse Counterfactuals found! total time taken: 00 min 07 sec
Diverse Counterfactuals found! total time taken: 00 min 11 sec
Diverse Counterfactuals found! total time taken: 00 min 06 sec
Diverse Counterfactuals found! total time taken: 00 min 05 sec
Diverse Counterfactuals found! total time taken: 00 min 37 sec
Diverse Counterfactuals found! total time taken: 00 min 15 sec
Diverse Counterfactuals found! total time taken: 00 min 08 sec
Diverse Counterfactuals found! total time taken: 00 min 06 sec
Diverse Counterfactuals found! total time taken: 00 min 07 sec
Diverse Counterfactuals found! total time taken: 00 min 07 sec
Diverse Counterfactuals found! total time taken: 00 min 10 sec
Diverse Counterfactuals found! total time taken: 00 min 09 sec
Diverse Counterfactuals found! total time taken: 00 min 14 sec
Diverse Counterfactuals found! total time taken: 00 min 14 sec
Diverse Counterfactuals found! total time taken: 00 min 14 sec
Diverse Counterfactuals found! total time taken: 00 min

In [18]:
exp.generate_counterfactuals(
    X_train_df[closest_idx:closest_idx+1],
    total_CFs=4,
    desired_class="opposite",
    features_to_vary = X_train_df.columns[X_train_df.columns != 'sex'].tolist()
).visualize_as_dataframe(show_only_changes=True)

Diverse Counterfactuals found! total time taken: 00 min 47 sec
Query instance (original outcome : 0.03999999910593033)


Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
0,54.0,1,4,140.0,239.0,0,0,160.0,0,1.2,1,0,normal,0.04



Diverse Counterfactual set (new outcome: 1.0)


Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,target
0,49.0,-,-,-,-,1,-,-,-,-,2,3,reversible,1
1,-,-,2,-,-,-,-,152.0,1,-,3,2,reversible,1
2,-,-,-,-,230.0,-,-,-,1,3.0,2,2,-,1
3,-,-,-,-,-,-,-,-,-,6.2,-,2,reversible,1


In [19]:
cfs_kdtree = store_cfs(e1_kdtree, X_test.shape[0])
cfs_random = store_cfs(e1_random, X_test.shape[0])
cfs_genetic = store_cfs(e1_genetic, X_test.shape[0])
cfs_gradient = store_cfs(e1_gradient, X_test.shape[0])

In [24]:
print("Expected input dim:", model.input_shape)
print("Current X_org shape:", transformer.transform(cfs_kdtree[0][0]).shape)


Expected input dim: (None, 31)
Current X_org shape: (1, 32)


In [None]:
import importlib
import XAI_metrics   # 先 import 一次
importlib.reload(XAI_metrics)  # 🔄 重新加载，不需要重启内核

from XAI_metrics import calc_valid, calc_sparsity, calc_continuous_proximity, \
    calc_categorical_proximity, calc_manifold_distance, calc_cf_num

valid_kdtree = calc_valid(cfs_kdtree, model, transformer, df.shape[1])
sparsity_kdtree = calc_sparsity(cfs_kdtree, categorical_features)
con_proximity_kdtree = calc_continuous_proximity(cfs_kdtree, numeric_features)
cat_proximity_kdtree = calc_categorical_proximity(cfs_kdtree, categorical_features)
manifold_kdtree = calc_manifold_distance(cfs_kdtree, df, categorical_features)
cf_num_kdtree = calc_cf_num(cfs_kdtree)

valid_random = calc_valid(cfs_random, model, transformer, df.shape[1])
sparsity_random = calc_sparsity(cfs_random, categorical_features)
con_proximity_random = calc_continuous_proximity(cfs_random, numeric_features)
cat_proximity_random = calc_categorical_proximity(cfs_random, categorical_features)
manifold_random = calc_manifold_distance(cfs_random, df, categorical_features)
cf_num_random = calc_cf_num(cfs_random)

valid_genetic = calc_valid(cfs_genetic, model, transformer, df.shape[1])
sparsity_genetic = calc_sparsity(cfs_genetic, categorical_features)
con_proximity_genetic = calc_continuous_proximity(cfs_genetic, numeric_features)
cat_proximity_genetic = calc_categorical_proximity(cfs_genetic, categorical_features)
manifold_genetic = calc_manifold_distance(cfs_genetic, df, categorical_features)
cf_num_genetic = calc_cf_num(cfs_genetic)

valid_gradient = calc_valid(cfs_gradient, model, transformer, df.shape[1])
sparsity_gradient = calc_sparsity(cfs_gradient, categorical_features)
con_proximity_gradient = calc_continuous_proximity(cfs_gradient, numeric_features)
cat_proximity_gradient = calc_categorical_proximity(cfs_gradient, categorical_features)
manifold_gradient = calc_manifold_distance(cfs_gradient, df, categorical_features)
cf_num_gradient = calc_cf_num(cfs_gradient)


In [35]:
results = {
    "method": ["kdtree", "random", "genetic", "gradient"],
    "Avg Time(s)": [time_kdtree, time_random, time_genetic, time_gradient],
    "Validity": [valid_kdtree, valid_random, valid_genetic, valid_gradient],
    "Sparsity": [sparsity_kdtree, sparsity_random, sparsity_genetic, sparsity_gradient],
    "Proximity_con": [con_proximity_kdtree, con_proximity_random, con_proximity_genetic, con_proximity_gradient],
    "Proximity_cat": [cat_proximity_kdtree, cat_proximity_random, cat_proximity_genetic, cat_proximity_gradient],
    "Manifold": [manifold_kdtree, manifold_random, manifold_genetic, manifold_gradient],
    "Avg CF count": [cf_num_kdtree, cf_num_random, cf_num_genetic, cf_num_gradient]
}

df_result = pd.DataFrame(results)

# 保留两位小数
df_result = df_result.round(2)

In [36]:
df_result.to_csv('./results/DiCE_result_heart.csv', index=False)