In [11]:
import pandas as pd
import tensorflow as tf

# 读取xlsx文件
file_path = "C:\\Users\\13067\\Desktop\\GSM1664928_YeastCorePromoterLibrary_Exp.xlsx"  # 替换为你的文件路径
#yan="文件地址"
df = pd.read_excel(file_path, header=0)
# 初始化列表来存储序列和强度分类
sequences_list = []
strengths_list = []
# 遍历每一行数据
for index, row in df.iterrows():
    strength_value = float(row[2])  # 第二列的值
    sequence = row[1]  # 第三列的序列
    
    #处理第二列的数字
    if strength_value <= 10:
        category = 0  # low
    else:
        category = 1  # high
    # 截取第三列的序列并添加空格
    processed_sequence = sequence[0:119]  # 从索引131到220共90bp
    processed_sequence_with_spaces = ' '.join(processed_sequence)
    
    # 将结果添加到列表中
    sequences_list.append(processed_sequence_with_spaces.encode('utf-8'))
    strengths_list.append(category)
# 将列表转换为 NumPy 数组
sequences_array = tf.constant(sequences_list)
strengths_array = tf.constant(strengths_list)
# 创建 tf.data.Dataset 对象
dataset = tf.data.Dataset.from_tensor_slices((sequences_array, strengths_array))
# 如果需要预取优化，可以使用 prefetch 方法
dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
dataset = dataset.shuffle(buffer_size=len(sequences_list))
# 如果需要批量优化，可以使用 batch 方法
dataset = dataset.batch(batch_size=32)
# 计算数据集的总长度
dataset_size = sum(1 for _ in dataset)

# 计算每个子集的大小
train_size = int(0.8 * dataset_size)
val_size = int(0.1 * dataset_size)
test_size = dataset_size - train_size - val_size

# 分割数据集
data1 = dataset.take(train_size)
data2 = dataset.skip(train_size).take(val_size)
data3 = dataset.skip(train_size + val_size)

    

  strength_value = float(row[2])  # 第二列的值
  sequence = row[1]  # 第三列的序列


In [12]:
from tensorflow.keras import layers

max_length = 201
max_tokens = 6
text_vectorization = layers.TextVectorization(
    max_tokens=max_tokens,
    output_mode="int",
    output_sequence_length=max_length,
)
dataset_=[
    "C",
    "T",
    "A",
    "G"
]
text_vectorization.adapt(dataset_)
int_train_ds = data1.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)
int_val_ds = data2.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)
int_test_ds = data3.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)
##毕赤酵母序列部分
new_data_path = "C:\\Users\\13067\\Documents\\WeChat Files\\wxid_uosns6j7qbzm12\\FileStorage\\File\\2025-04\\新建 XLSX 工作表.xlsx"
new_df = pd.read_excel(new_data_path,header=0)
new_sequences = [" ".join(seq[:200]) for seq in new_df["sequence"]]
new_sequences_vectorized = text_vectorization(new_sequences).numpy()
new_ds = tf.data.Dataset.from_tensor_slices(new_sequences_vectorized)
new_ds = new_ds.batch(32)

In [13]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

In [14]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim,**kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions


    def get_config(self):
        config = super().get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [15]:
vocab_size = 6
sequence_length = 201
embed_dim = 128
num_heads = 4
dense_dim = 128

inputs = keras.Input(shape=(None,), dtype="int64")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(2, activation="softmax")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])
model.summary()

callbacks = [
    keras.callbacks.ModelCheckpoint("full_transformer_encoder.keras",
                                    save_best_only=True)
]
history=model.fit(int_train_ds, validation_data=int_val_ds, epochs=30, callbacks=callbacks)
model = keras.models.load_model(
    "full_transformer_encoder.keras",
    custom_objects={"TransformerEncoder": TransformerEncoder,
                    "PositionalEmbedding": PositionalEmbedding})
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")
predictions = model.predict(new_ds)
# 输出预测结果（类别概率）
predicted_classes = predictions.argmax(axis=-1)  # 获取类别（0或1）
predicted_strengths = ["Low" if c == 0 else "High" for c in predicted_classes]
# 将预测结果添加到DataFrame
new_df["predicted_strength"] = predicted_strengths
new_df["probability_high"] = predictions[:, 1]  # 高表达的概率
new_df.to_csv("predictions_results.csv", index=False)

Model: "model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_3 (InputLayer)        [(None, None)]            0         
                                                                 
 positional_embedding_2 (Pos  (None, None, 128)        26496     
 itionalEmbedding)                                               
                                                                 
 transformer_encoder_2 (Tran  (None, None, 128)        297344    
 sformerEncoder)                                                 
                                                                 
 global_max_pooling1d_2 (Glo  (None, 128)              0         
 balMaxPooling1D)                                                
                                                                 
 dropout_2 (Dropout)         (None, 128)               0         
                                                           