## Environment and Working Directory Configuration

In [None]:
!pip install tensorflow-gpu==2.10.0
!pip install cudnn==8.4.1
!pip install cudatoolkit==11.8.0
!pip install pillow
!pip install scikit-learn
!pip install openpyxl
!pip install opencv
!pip install pandas
!pip install matplotlib

In [None]:
import tensorflow as tf
tf.__version__
print(tf.__version__)
tf.__path__
print(tf.__path__)

2.10.0
['/usr/local/lib/python3.10/dist-packages/keras/api/_v2', '/usr/local/lib/python3.10/dist-packages/tensorflow_estimator/python/estimator/api/_v2', '/usr/local/lib/python3.10/dist-packages/tensorboard/summary/_tf', '/usr/local/lib/python3.10/dist-packages/tensorflow', '/usr/local/lib/python3.10/dist-packages/tensorflow/_api/v2']


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
# Change to the target directory
os.chdir('/content/drive/MyDrive/rdkit')
# Print the current working directory
print("The current working directory：", os.getcwd())

当前工作目录： /content/drive/MyDrive/rdkit


##Define parameters and file paths.

In [1]:
# Focal Loss Parameters
gamma_values = [1]   # Typically between 1.0 and 5.0, increase the focus on samples that are more difficult to classify.
pos_weight_values = [1]  # Values less than 1.0 emphasize positive samples in imbalanced datasets where positive samples are rare.
# BiLSTM Layer Parameters
lstm_units1_values = [32] # Define the number of units for each LSTM layer. (e.g. [32,64,96])
num_lstm_layers_values = [1] # Define the number of units for each dense layer. (e.g. [1,2,3])
# Dense Layer Parameters
dense_units1_values = [32]  # Define the number of units for each dense layer. (e.g. [32,64,96])
dense_layers_values = [3] # Define the number of dense layers in the network. (e.g. [1,2,3])
# Dropout Layer Parameters
dropout_rate1_values = [0.3] # Define the dropout rate for each dropout layer. (e.g. [0.1,0.2,0.3])
dropout_layers_values = [1] # Define the number of dropout layers in the network. (e.g. [1,2,3])

# Set the number of random iterations
num_iterations = 10  # Define the number of iterations for random hyperparameter search.
# training set and independent test set path setting
train_file = 'Dataset/24_IL-6/IL6_train.xlsx'
val_file = 'Dataset/24_IL-6/IL6_test.xlsx'

output_path = 'results/results.xlsx'  # The path where the results of training will be saved.

## This code block focuses on importing essential modules and defining a custom function for model training. The implemented functionalities within this cell do not necessitate any modifications.

In [None]:
import cv2
import os
import tensorflow as tf
import pandas as pd
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, roc_curve, roc_auc_score
from sklearn.model_selection import KFold, TimeSeriesSplit, StratifiedKFold
from preprocess_data_test import preprocess_seq, get_max_length, preprocess_sequence
from tensorflow.python.keras.callbacks import EarlyStopping
from sklearn.metrics import confusion_matrix, accuracy_score, roc_auc_score, balanced_accuracy_score, roc_curve, \
    matthews_corrcoef, precision_score, recall_score, f1_score
import math
from statistics import stdev
import random
import logging
from focal_loss import BinaryFocalLoss

def map_fn(sequence, label):
    processed_sequence = tf.py_function(preprocess_sequence, [sequence], tf.float16)
    return processed_sequence, label
# load_and_preprocess_data
def load_and_preprocess_data(sequences, labels):
    sequences = tf.constant(sequences, dtype=tf.string)
    labels = tf.constant(labels, dtype=tf.int8)

    # creat tf.data.Dataset
    dataset = tf.data.Dataset.from_tensor_slices((sequences, labels))
    dataset = dataset.map(lambda sequence, label: map_fn(sequence, label), num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.cache()
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    dataset = dataset.batch(16)
    return dataset

class PrintMetricsCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        print(f"Epoch {epoch + 1}:")
        print(f"  loss: {logs['loss']:.4f}")
        print(f"  accuracy: {logs['accuracy']:.4f}")
        print(f"  val_loss: {logs['val_loss']:.4f}")
        print(f"  val_accuracy: {logs['val_accuracy']:.4f}")
        print(f"  learning_rate: {tf.keras.backend.get_value(model.optimizer.lr):.6f}")

class Attention(tf.keras.layers.Layer):
    def __init__(self):
        super(Attention, self).__init__()

    def build(self, input_shape):
        self.context_vector = self.add_weight(shape=(input_shape[-1], 1), initializer='glorot_uniform',
                                              trainable=True)

    def call(self, inputs):
        # Compute attention scores
        attention_scores = tf.matmul(inputs, self.context_vector)
        attention_scores = tf.squeeze(attention_scores, axis=-1)
        attention_weights = tf.nn.softmax(attention_scores, axis=-1)

        # Apply attention weights to inputs
        weighted_inputs = tf.multiply(inputs, tf.expand_dims(attention_weights, axis=-1))
        context_vector = tf.reduce_sum(weighted_inputs, axis=1)

        return context_vector, attention_weights

# Create a callback function to print relevant information at the end of each epoch
class CustomCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        print(
            f"Epoch {epoch + 1} - Train Loss: {logs['loss']:.4f}, Train Accuracy: {logs['accuracy']:.4f}, Test Loss: {logs['val_loss']:.4f}, Test Accuracy: {logs['val_accuracy']:.4f}")

# Define a learning rate callback function to print the learning rate at the end of each epoch
class LearningRateCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        lr = self.model.optimizer.lr.numpy()
        print(f"Epoch {epoch + 1} - Learning Rate: {lr:.6f}")

def step_decay(epoch):
    initial_lrate = 0.1
    drop = 0.6
    epochs_drop = 3.0
    lrate = initial_lrate * math.pow(drop, math.floor((1 + epoch) / epochs_drop))
    return lrate
    # Define the training process
def train_model(train_dataset, test_dataset):
    strategy = tf.distribute.MirroredStrategy(devices=['GPU:0', 'GPU:1'])
    with strategy.scope():
        # Define ModelCheckpoint callback
        checkpoint = tf.keras.callbacks.ModelCheckpoint(model_path, monitor='val_accuracy', save_best_only=True,
                                                        mode='max', verbose=0)
        # EarlyStopping
        early_stopping = EarlyStopping(monitor='val_loss', patience=40, restore_best_weights=True, verbose=1)

        learning_rate = 0.0001
        model = create_model()
        model.optimizer.lr.assign(learning_rate)
        print_learning_rate = tf.keras.callbacks.LambdaCallback(
            on_epoch_begin=lambda epoch, logs: print(
                f"Learning Rate: {tf.keras.backend.get_value(model.optimizer.lr):.6f}"))
        callbacks = [reduce_lr, early_stopping, print_learning_rate,checkpoint]
        model.fit(train_dataset, epochs=400, batch_size=16, validation_data=test_dataset,
                  callbacks=callbacks, verbose=2)
    return model

# Define model
def create_model():
    learning_rate = 0.0001
    momentum = 0.5
    lr_scheduler = tf.keras.callbacks.LearningRateScheduler(step_decay)

    sequence_length = max_length
    input_shape = (sequence_length, 32, 32, 3)
    inputs = tf.keras.Input(shape=input_shape)

    initializer = tf.keras.initializers.HeNormal(seed=123456)
    model = tf.keras.layers.TimeDistributed(tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same'))(
        inputs)
    for _ in range(2):
        model = tf.keras.layers.TimeDistributed(
            tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same'))(
            inputs)
        model = tf.keras.layers.TimeDistributed(tf.keras.layers.MaxPooling2D((2, 2), strides=(2, 2)))(model)
    for _ in range(1):
        model = tf.keras.layers.BatchNormalization()(model)  # Adding a normalization layer
    model = tf.keras.layers.TimeDistributed(
        tf.keras.layers.Conv2D(128, (3, 3), strides=(1, 1), activation='relu', padding='same'))(model)
    for _ in range(2):
        model = tf.keras.layers.TimeDistributed(
            tf.keras.layers.Conv2D(128, (3, 3), strides=(1, 1), activation='relu', padding='same'))(model)
        model = tf.keras.layers.TimeDistributed(tf.keras.layers.MaxPooling2D((2, 2), strides=(2, 2)))(model)
    for _ in range(1):
        model = tf.keras.layers.BatchNormalization()(model)  # Adding a normalization layer

    model = tf.keras.layers.Reshape((sequence_length, -1))(inputs)
    lstm_units = lstm_units1
    # Create LSTM layers based on num_lstm_layers
    for _ in range(num_lstm_layers):
        model = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=lstm_units1, return_sequences=True))(model)
    # Add Temporal Attention mechanism
    model = tf.keras.layers.Reshape((-1, lstm_units * 2))(model)
    permute1 = tf.keras.layers.Permute((2, 1))(model)
    attention_probs = tf.keras.layers.Dense(units=1, activation='softmax')(permute1)
    permute2 = tf.keras.layers.Permute((2, 1))(attention_probs)
    model = tf.keras.layers.Multiply()([model, permute2])
    model = tf.keras.layers.Flatten()(model)
    # Add fully connected layers and dropout layers
    for _ in range(dense_layers):
        model = tf.keras.layers.Dense(units=dense_units1)(model)
    for _ in range(dropout_layers):
        model = tf.keras.layers.Dropout(rate=dropout_rate1)(model)
    num_classes = 1
    outputs = tf.keras.layers.Dense(units=num_classes, activation='sigmoid')(model)
    # Choose optimizers
    model = tf.keras.models.Model(inputs=inputs, outputs=outputs)
    sgd = tf.keras.optimizers.SGD(lr=0.1, momentum=momentum, decay=0.0, nesterov=False)
    rmsprop = tf.keras.optimizers.RMSprop(learning_rate=learning_rate)
    adam = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    # Compile the model
    model.compile(optimizer=adam, loss=BinaryFocalLoss(gamma=gamma,pos_weight=pos_weight), metrics=['accuracy'])  # rmsprop adam
    # Print the model structure
    model.summary()
    return model

def preprocess_sequence(sequence):
    # Read all images and store them in a dictionary
    images = {}
    folder_path = "residues32/IA"
    file_names = os.listdir(folder_path)

    # Load and preprocess images
    for file_name in file_names:
        file_path = os.path.join(folder_path, file_name)
        image = cv2.imread(file_path)
        image = cv2.resize(image, (32, 32))  # Resize the image to 32x32
        image = tf.cast(image, tf.float16) / 255.0  # Convert to float32 and normalize to [0, 1]
        # Replace NaN with 0
        image = tf.where(tf.math.is_nan(image), tf.zeros_like(image), image)
        images[file_path[14:-4]] = image
    def map_seq(input_str):
        char_images = []  # 创建一个空列表
        prev_index = None
        # Iterate over each character in the input string
        for index, char in enumerate(input_str):
            # If the current character is "x", remember its index and exit the loop
            if char == 'x':
                prev_index = index-1
                break
        # If "x" is not found, remember the index of the last characte
        if prev_index is None and len(input_str) > 0:
            prev_index = len(input_str) - 1

         # Iterate over each character in the input string
        for n in range(len(input_str)):
            if n == prev_index:
                char = input_str[n]
                image_key = char + '_C'

                char_tensor = tf.convert_to_tensor(images.get(image_key))
                char_images.append(char_tensor)
            elif n == 0:
                char = input_str[n]
                image_key = char + '_N'
                char_tensor = tf.convert_to_tensor(images.get(image_key))
                char_images.append(char_tensor)

            elif n != prev_index:
                char = input_str[n]

                char_tensor = tf.convert_to_tensor(images.get(char))
                char_images.append(char_tensor)
        char_images = np.array(char_images)

        seq_frames = tf.stack(char_images, axis=0)
        return seq_frames

    input_seq = sequence.numpy().decode("utf-8")
    processed_data = []
    seq_frames = map_seq(input_seq)
    processed_data.append(seq_frames)
    processed_data = tf.convert_to_tensor(processed_data)

    return processed_data

def preprocess_seq(filename, max_length):
    data = pd.read_excel(filename, engine='openpyxl', keep_default_na=False, na_values=[''])
    sequences = data['sequence'].tolist()
    labels = data['label'].tolist()

    processed_data = []

    for seq, label in zip(sequences, labels):
         # Strip trailing spaces and pad to specified length
        seq = seq.strip().ljust(max_length, 'x')
        processed_data.append((seq, label))  # Pack data and label into a tuple and add to list

    return processed_data

def get_max_length(filename1,filename2,max_length):

    def count_max_length(data):
        sequences = data['sequence'].tolist()
        labels = data['label'].tolist()
        max_length = 0
        positive_sequences = []
        negative_sequences = []
        for seq, label in zip(sequences, labels):
            if label == 1:
                positive_sequences.append(seq)
            else:
                negative_sequences.append(seq)
            max_length = max(max_length, len(seq))
        return max_length
    # Read each line from the file and associate it with the corresponding photo
    data1 = pd.read_excel(filename1, engine='openpyxl', keep_default_na=False, na_values=[''])
    data2 = pd.read_excel(filename2, engine='openpyxl', keep_default_na=False, na_values=[''])
    max_length1 = count_max_length(data1)
    max_length2 = count_max_length(data2)
    if max_length1>max_length:
        max_length =max_length1
    if max_length2>max_length1:
        max_length=max_length2

    return max_length

## The following cell contains the main training code. After training, the results and hyperparameter settings are saved in the "result.xlsx" file.

In [None]:
# Set the logging level
logging.basicConfig(level=logging.INFO)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
# Set the logging level to output only errors and warnings
tf.get_logger().setLevel('ERROR')
best_accuracy = 0.0
# Generate unique random indices
random_indices = random.sample(range(num_iterations), num_iterations)

# Create an empty DataFrame to store results
results_df = pd.DataFrame(columns=[
    'Iteration', 'gamma', 'pos_weight', 'lstm_units1', 'dense_units1',
    'num_lstm_layers', 'dense_layers', 'dropout_rate1', 'dropout_layers', 'TP', 'FP', 'FN', 'TN', 'ACC',
    'BACC', 'Sn', 'Sp', 'MCC', 'AUC', "AUC_prime", 'Preci sion', 'Recall', 'F1_score'
])

for i in random_indices:
    gamma = gamma_values[i % len(gamma_values)]
    pos_weight = pos_weight_values[i % len(pos_weight_values)]
    lstm_units1 = lstm_units1_values[i % len(lstm_units1_values)]
    dense_units1 = dense_units1_values[i % len(dense_units1_values)]
    num_lstm_layers = num_lstm_layers_values[i % len(num_lstm_layers_values)]
    dense_layers = dense_layers_values[i % len(dense_layers_values)]
    dropout_rate1 = dropout_rate1_values[i % len(dropout_rate1_values)]
    dropout_layers = dropout_layers_values[i % len(dropout_layers_values)]

    max_length = 0
    max_length = get_max_length(train_file, val_file, max_length)
    print(max_length)
    data_train =preprocess_seq(train_file,max_length)
    data_test =preprocess_seq(val_file,max_length)

    X = np.array([sequence for sequence, label in data_train])
    y = np.array([label for sequence, label in data_train])
    X_val = np.array([sequence for sequence, label in data_test])
    y_val = np.array([label for sequence, label in data_test])

    val_dataset =load_and_preprocess_data(X_val, y_val)
    # Define 5-fold cross-validation
    kf = KFold(n_splits=5, shuffle=True, random_state=42)
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    # Define the model saving path
    model_path = 'best_model'+ str(i+1)+'.h5'

    best_model = None
    best_accuracy = 0
    # Define empty lists to store metrics for each k-fold training
    accuracy_list = []
    auc_list = []
    bacc_list = []
    sensitivity_list = []
    specificity_list = []
    mcc_list = []
    precision_list = []
    recall_list = []
    f1_list = []

    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10, verbose=1,
                                                     min_delta=1e-4, mode='min')
    # Set up callbacks
    custom_callback = CustomCallback()
    lr_callback = LearningRateCallback()
    callbacks = [custom_callback, lr_callback]

    for train_index, test_index in skf.split(X, y):
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]

        # Ensure the number of samples in the training and testing sets is a multiple of the batch size
        train_samples = len(X_train) - (len(X_train) % 16)
        test_samples = len(X_test) - (len(X_test) % 16)
        X_train = X_train[:train_samples]
        y_train = y_train[:train_samples]
        X_test = X_test[:test_samples]
        y_test = y_test[:test_samples]
        assert X_test.shape[0] == y_test.shape[0]

        train_dataset = load_and_preprocess_data(X_train, y_train)
        test_dataset =load_and_preprocess_data(X_test,y_test)

        model = train_model(train_dataset,test_dataset)
        # Evaluate the model's performance on the validation set
        val_loss, val_acc = model.evaluate(val_dataset)
        print("The ACC of best model"+str(val_acc))
        print("The loss of best model" + str(val_loss))
        # Save the best model
        if val_acc > best_accuracy:
            best_model = model
            best_accuracy = val_acc

         # Initialize empty lists for y_true and y_pred_binary
        y_true = []
        y_pred_binary = []

        # Iterate over the val_dataset and extract labels
        for batch_inputs, batch_labels in val_dataset:
            # Predict the output for each batch
            batch_inputs = np.squeeze(batch_inputs, axis=1)
            batch_predictions = model.predict(batch_inputs)
            batch_predictions_binary = (batch_predictions > 0.5).astype("int32")

            # Append the labels and predictions of each batch to the lists
            y_true.extend(batch_labels.numpy())
            y_pred_binary.extend(batch_predictions_binary)

        # Convert to NumPy arrays
        y_true = np.array(y_true)
        y_pred_binary = np.array(y_pred_binary)

        # Calculate various metrics
        precision, recall, fscore, support = precision_recall_fscore_support(y_true, y_pred_binary, zero_division=1)
        fpr, tpr, thresholds = roc_curve(y_true, y_pred_binary)
        cm = confusion_matrix(y_true, y_pred_binary)
        TN, FP, FN, TP = cm.ravel()
        accuracy = accuracy_score(y_true, y_pred_binary)
        auc = roc_auc_score(y_true, y_pred_binary)
        auc_prime = np.trapz(tpr, fpr)
        sensitivity = recall_score(y_true, y_pred_binary)
        specificity = recall_score(y_true, y_pred_binary, pos_label=0)
        mcc = matthews_corrcoef(y_true, y_pred_binary)
        bacc = balanced_accuracy_score(y_true, y_pred_binary)
        precision = precision_score(y_true, y_pred_binary)
        recall = sensitivity
        f1 = f1_score(y_true, y_pred_binary)

        # Add the metrics of each k-fold training to the list
        accuracy_list.append(accuracy)
        auc_list.append(auc)
        bacc_list.append(bacc)
        sensitivity_list.append(sensitivity)
        specificity_list.append(specificity)
        mcc_list.append(mcc)
        precision_list.append(precision)
        recall_list.append(recall)
        f1_list.append(f1)

    # Calculate the standard deviation of each metric
    accuracy_std = stdev(accuracy_list)
    auc_std = stdev(auc_list)
    bacc_std = stdev(bacc_list)
    sensitivity_std = stdev(sensitivity_list)
    specificity_std = stdev(specificity_list)
    mcc_std = stdev(mcc_list)
    precision_std = stdev(precision_list)
    recall_std = stdev(recall_list)
    f1_std = stdev(f1_list)


    best_model.save('best_model'+str(i + 1)+'.h5')

    val_accuracy = accuracy
    # Update the best result
    if val_accuracy > best_accuracy:
        best_accuracy = val_accuracy
        best_combination = {
            'gamma': gamma,
            'pos_weight': pos_weight,
            'lstm_units': lstm_units1,
            'dense_units': dense_units1,
            'num_lstm_layers': num_lstm_layers,
            'dense_layers': dense_layers,
            'dropout_rate': dropout_rate1,
        }

    # Add the result to results_df
    results_df = pd.concat([results_df, pd.DataFrame({
        'Iteration': [i + 1],
        'gamma': [gamma],
        'pos_weight': [pos_weight],
        'lstm_units1': [lstm_units1],
        'dense_units1': [dense_units1],
        'num_lstm_layers': [num_lstm_layers],
        'dense_layers': [dense_layers],
        'dropout_rate1': [dropout_rate1],
        'dropout_layers': [dropout_layers],
        'TP': [TP],
        'FP': [FP],
        'FN': [FN],
        'TN': [TN],
        'ACC': [accuracy],
        'BACC': [bacc],
        'Sn': [sensitivity],
        'Sp': [specificity],
        'MCC': [mcc],
        'AUC': [auc],
        'AUC_prime': [auc_prime],
        'Precision': [precision],
        'Recall': [recall],
        'F1_score': [f1]
    })], ignore_index=True)

    # 将DataFrame写入Excel文件

    results_df.to_excel(output_path, index=False)
    print(f"Results saved to {output_path}")

# 打印最佳结果的组合和最佳准确率
print(f"Best Combination: {best_combination}")
print(f"Best Accuracy: {best_accuracy}")

  super().__init__(name, **kwargs)


数据集中字符最大长度: 180
