In [1]:
from models import *
from evaluate import *
import os
from keras.callbacks import EarlyStopping
from torch.utils.data import Dataset, DataLoader
import random
# 设置随机种子
seed = 42  # 你可以更改为任意整数
os.environ['PYTHONHASHSEED'] = str(seed)  # 影响 Python 内部哈希算法的随机性
random.seed(seed)  # 固定 Python 的随机性
np.random.seed(seed)  # 固定 NumPy 的随机性
tf.random.set_seed(seed)  # 固定 TensorFlow 的随机性

# 确保 TensorFlow 计算图的确定性
tf.config.experimental.enable_op_determinism()

# 配置 TensorFlow 以获得确定性行为
os.environ["TF_DETERMINISTIC_OPS"] = "1"

# 关闭 GPU 计算中的非确定性优化（如 cuDNN）
os.environ["TF_CUDNN_DETERMINISTIC"] = "1"

 The versions of TensorFlow you are currently using is 2.12.0 and is not supported. 
Some things might work, some things might not.
If you were to encounter a bug, do not file an issue.
If you want to make sure you're using a tested and supported configuration, either change the TensorFlow version or the TensorFlow Addons's version. 
You can find the compatibility matrix in TensorFlow Addon's readme:
https://github.com/tensorflow/addons


In [2]:
def transform_labels(y_train, y_test):
    """
    Transform label to min equal zero and continuous
    For example if we have [1,3,4] ---> [0,1,2]


    Parameters
    ----------
    y_train: array
        Labels of the train set

    y_test: array
        Labels of the test set


    Returns
    -------
    new_y_train: array
        Transformed y_train array

    new_y_test: array
        Transformed y_test array
    """

    # Initiate the encoder
    encoder = LabelEncoder()

    # Concatenate train and test to fit
    y_train_test = np.concatenate((y_train, y_test), axis=0)

    # Fit the encoder
    encoder.fit(y_train_test.ravel())

    # Transform to min zero and continuous labels
    new_y_train_test = encoder.transform(y_train_test.ravel())

    # Resplit the train and test
    new_y_train = new_y_train_test[0 : len(y_train)]
    new_y_test = new_y_train_test[len(y_train) :]

    return new_y_train, new_y_test

In [3]:
path=r'data'
X = np.load(path + "/X.npy")
y = np.load(path + "/y.npy")

In [4]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Layer

class CrossAttention(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, **kwargs):
        super(CrossAttention, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.query_dense = Dense(embed_dim)
        self.key_dense = Dense(embed_dim)
        self.value_dense = Dense(embed_dim)

    def call(self, inputs):
        Q, K, V = inputs
        batch_size = tf.shape(Q)[0]
        seq_len_Q = tf.shape(Q)[1]
        seq_len_K = tf.shape(K)[1]
        # print(Q.shape,K.shape,V.shape,2226)
        # 计算 Q, K, V
        Q = self.query_dense(Q)
        K = self.key_dense(K)
        V = self.value_dense(V)
        # print(Q.shape,K.shape,V.shape,2227)
        # 分成多个头
        Q = self.split_heads(Q, batch_size)
        K = self.split_heads(K, batch_size)
        V = self.split_heads(V, batch_size)
        # print(Q.shape,K.shape,V.shape,2228)
        # 计算注意力分数
        scores = tf.matmul(Q, K, transpose_b=True) / tf.math.sqrt(tf.cast(self.embed_dim // self.num_heads, tf.float32))
        # print(scores.shape,1115)
        # 计算注意力权重
        attention_weights = tf.nn.softmax(scores, axis=-1)
        
        # 计算注意力输出
        context = tf.matmul(attention_weights, V)
        # print(context.shape,1118)
        context = self.combine_heads(context, batch_size)
        
        return context
    


    def split_heads(self, x, batch_size):
        depth = self.embed_dim // self.num_heads
        x = tf.reshape(x, (batch_size, -1, self.num_heads, depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def combine_heads(self, x, batch_size):
        x = tf.transpose(x, perm=[0, 2, 1, 3])
        x = tf.reshape(x, (batch_size, -1, self.embed_dim))
        return x


def stcan_model1(input_shape, n_class, embed_dim=512, num_heads=1):
    n = input_shape[0]
    print(n,1111)
    k = input_shape[1]
    print(k,1112)
    input_layer = Input(shape=(n, k, 1))

    # 2D convolution layers
    a = Conv2D(
        filters=64,
        kernel_size=(n, 1),
        strides=(1, 1),
        padding="same",
        input_shape=(n, k, 1),
        name="2D",
    )(input_layer)
    a = BatchNormalization()(a)
    a = Activation("relu", name="2D_Activation")(a)
    a = Conv2D(filters=1, kernel_size=(1, 1), strides=(1, 1))(a)
    a = Activation("relu", name="2D_Reduced_Activation")(a) 
    x = Reshape((n, k))(a)

    # 1D convolution layers
    b = Reshape((n, k))(input_layer)
    b = LSTM(100, return_sequences=True)(b)
    b = LSTM(1, return_sequences=True)(b)
    b = BatchNormalization()(b)
    y = Activation("relu", name="1D_Activation")(b)


    # Cross-Attention
    z = CrossAttention(embed_dim, num_heads)([y,x,x])
    print(x.shape,y.shape,z.shape,1113)
    z = GlobalAveragePooling1D()(z)
    output_layer = Dense(n_class, activation="softmax")(z)

    model = Model(input_layer, output_layer)
    focal_loss = tfa.losses.SigmoidFocalCrossEntropy(alpha=0.5, gamma=4.0)
    model.compile(loss=focal_loss, metrics=["AUC"], optimizer='sgd')
    return model

In [5]:
def stcan_model2(input_shape, n_class, embed_dim=512, num_heads=1):
    n = input_shape[0]
    # print(n,1111)
    k = input_shape[1]
    # print(k,1112)
    input_layer = Input(shape=(n, k, 1))

    # 2D convolution layers
    a = Conv2D(
        filters=64,
        kernel_size=(n, 1),
        strides=(1, 1),
        padding="same",
        input_shape=(n, k, 1),
        name="2D",
    )(input_layer)
    a = BatchNormalization()(a)
    a = Activation("relu", name="2D_Activation")(a)
    a = Conv2D(filters=1, kernel_size=(1, 1), strides=(1, 1))(a)
    a = Activation("relu", name="2D_Reduced_Activation")(a) 
    x = Reshape((n, k))(a)

    # 1D convolution layers
    b = Reshape((n, k))(input_layer)
    b = LSTM(100, return_sequences=True)(b)
    b = LSTM(1, return_sequences=True)(b)
    b = BatchNormalization()(b)
    y = Activation("relu", name="1D_Activation")(b)


    # Cross-Attention
    # z = CrossAttention(embed_dim, num_heads)([y,x,x])
    # Cross-Attention
    z = CrossAttention(embed_dim, num_heads)([x,y,y])
    # print(x.shape,y.shape,z.shape,1113)
    # Reshape z to fit Conv2D
    # z = Reshape((z.shape[1], z.shape[2], 1))(z)
    
    # Add Conv2D classifier
    z_reshaped = Reshape((n, 1, embed_dim))(z)
    # print(z_reshaped.shape,1114)
    # Conv2D layers for classification
    z_conv = Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu')(z_reshaped)
    z_conv = Flatten()(z_conv)
    output_layer = Dense(n_class, activation='softmax')(z_conv)
    
    model = Model(inputs=input_layer, outputs=output_layer)
    focal_loss = tfa.losses.SigmoidFocalCrossEntropy(alpha=0.5, gamma=4.0)
    model.compile(loss=focal_loss, metrics=["AUC"], optimizer='sgd')
    return model




In [6]:
input_shape = (7, 55)
n_class = 2
model = stcan_model(input_shape, n_class)
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 7, 55, 1)]   0           []                               
                                                                                                  
 2D (Conv2D)                    (None, 7, 55, 64)    128         ['input_1[0][0]']                
                                                                                                  
 batch_normalization (BatchNorm  (None, 7, 55, 64)   256         ['2D[0][0]']                     
 alization)                                                                                       
                                                                                                  
 reshape_1 (Reshape)            (None, 7, 55)        0           ['input_1[0][0]']            

In [7]:
stcan2 = stcan_model2(input_shape, n_class=2,embed_dim=55,num_heads=5)
models={'stcan2':stcan2}

In [8]:
model_eval_train = pd.DataFrame({},[])
model_eval_test = pd.DataFrame({},[])

In [9]:
from sklearn.model_selection import StratifiedKFold
import os
import numpy as np
import keras
from keras.callbacks import EarlyStopping, ModelCheckpoint

# 设定随机种子
seed = 5
np.random.seed(seed)

# 5折交叉验证
num_folds = 5
skf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=seed)

# 存储模型的评估结果
accuracy_list = []
f1_list = []
roc_auc_list = []
mcc_list = []

for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
    print(f'Fold {fold + 1}/{num_folds}')

    # 划分训练集和验证集
    X_train, X_val = X[train_idx], X[val_idx]
    y_train, y_val = y[train_idx], y[val_idx]
    y_train, y_val = transform_labels(y_train, y_val)
    y_train_nonencoded, y_val_nonencoded = y_train, y_val

    # One hot encoding of the labels
    enc = OneHotEncoder()
    enc.fit(np.concatenate((y_train, y_val), axis=0).reshape(-1, 1))
    y_train = enc.transform(y_train.reshape(-1, 1)).toarray()
    y_val = enc.transform(y_val.reshape(-1, 1)).toarray()
    X_train = np.reshape(
        np.array(X_train),
        (X_train.shape[0], X_train.shape[1], X_train.shape[2], 1),
        order="C",
    )
    X_val = np.reshape(
        np.array(X_val),
        (X_val.shape[0], X_val.shape[1], X_val.shape[2], 1),
        order="C",
)

    for name, model in models.items():
        print(name, model, "Training in Fold:", fold + 1)

        # 创建模型保存路径
        path1 = f'best_model/{name}/'
        path2 = f"{fold}/best_weights.hdf5"
        best_weights_filepath = os.path.join(path1, path2)
        os.makedirs(os.path.dirname(best_weights_filepath), exist_ok=True)

        # 训练时使用早停法
        monitor = EarlyStopping(monitor='val_auc', min_delta=1e-5, patience=50, verbose=1, mode='max')
        saveBestModel = ModelCheckpoint(best_weights_filepath, monitor='val_auc', verbose=1, save_best_only=True, mode='max')

        # 训练模型
        h = model.fit(X_train, y_train,
                      callbacks=[monitor, saveBestModel],
                      validation_data=(X_val, y_val),
                      epochs=1000, batch_size=32, verbose=1)

        # 载入当前折最佳权重
        model.load_weights(best_weights_filepath)

        # 评估模型
        model_eval_train, model_eval_test, cf_matrix_train, cf_matrix_test = model_eval_data(
            model, X_train, y_train_nonencoded, 
            X_val, y_val_nonencoded, 
            model_eval_train, model_eval_test, 
            Name=str(fold)+'_'+name
        )

        # 计算当前折的指标
        accuracy = accuracy_score(y_val_nonencoded, model.predict(X_val).argmax(axis=1))
        f1 = f1_score(y_val_nonencoded, model.predict(X_val).argmax(axis=1))
        roc_auc = roc_auc_score(y_val_nonencoded, model.predict(X_val)[:, 1])
        mcc = matthews_corrcoef(y_val_nonencoded, model.predict(X_val).argmax(axis=1))

        accuracy_list.append(accuracy)
        f1_list.append(f1)
        roc_auc_list.append(roc_auc)
        mcc_list.append(mcc)

# 计算五折交叉验证的平均指标
print("\nFinal 5-Fold Cross-Validation Results:")
print(f'Average Accuracy: {np.mean(accuracy_list):.4f} ± {np.std(accuracy_list):.4f}')
print(f'Average F1 Score: {np.mean(f1_list):.4f} ± {np.std(f1_list):.4f}')
print(f'Average ROC AUC: {np.mean(roc_auc_list):.4f} ± {np.std(roc_auc_list):.4f}')
print(f'Average MCC: {np.mean(mcc_list):.4f} ± {np.std(mcc_list):.4f}')


Fold 1/5
stcan2 <keras.engine.functional.Functional object at 0x00000236D9C3A580> Training in Fold: 1
Epoch 1/1000
Epoch 1: val_auc improved from -inf to 0.68020, saving model to best_model/stcan2/0\best_weights.hdf5
Epoch 2/1000
Epoch 2: val_auc did not improve from 0.68020
Epoch 3/1000
Epoch 3: val_auc improved from 0.68020 to 0.68193, saving model to best_model/stcan2/0\best_weights.hdf5
Epoch 4/1000
Epoch 4: val_auc improved from 0.68193 to 0.72155, saving model to best_model/stcan2/0\best_weights.hdf5
Epoch 5/1000
Epoch 5: val_auc improved from 0.72155 to 0.74771, saving model to best_model/stcan2/0\best_weights.hdf5
Epoch 6/1000
Epoch 6: val_auc did not improve from 0.74771
Epoch 7/1000
Epoch 7: val_auc improved from 0.74771 to 0.78306, saving model to best_model/stcan2/0\best_weights.hdf5
Epoch 8/1000
Epoch 8: val_auc did not improve from 0.78306
Epoch 9/1000
Epoch 9: val_auc did not improve from 0.78306
Epoch 10/1000
Epoch 10: val_auc improved from 0.78306 to 0.80146, saving mo

In [10]:
model_eval_test

Unnamed: 0,precision,recall,f1-score,accuracy,ROC,PR,MCC
0_stcan2,0.920455,0.642857,0.757009,0.86802,0.919954,0.87636,0.690639
1_stcan2,0.973333,0.579365,0.726368,0.860406,0.942342,0.91452,0.679436
2_stcan2,0.931373,0.753968,0.833333,0.903553,0.935501,0.915737,0.775001
3_stcan2,0.882883,0.771654,0.823529,0.893401,0.945413,0.917417,0.751138
4_stcan2,0.840336,0.787402,0.813008,0.883249,0.941196,0.913591,0.729082


In [None]:
zzz

In [48]:
# model_eval_test.to_csv('stcan4.csv')