In [1]:
import pandas as pd
import torch.nn as nn
import torch
import time
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch.nn as nn
from torch.nn import BatchNorm1d
from torch.utils.data import TensorDataset
from tqdm import tqdm
import torch.nn.functional as F
from sklearn.model_selection import KFold, train_test_split
from torch.utils.data import TensorDataset, DataLoader
from sklearn.preprocessing import StandardScaler
import torch.optim as optim
from tqdm.auto import tqdm

In [2]:
class LayerNorm(nn.Module):
    def __init__(self, hidden_size, eps=1e-12):
        """Construct a layernorm module in the TF style (epsilon inside the square root).
        """
        super(LayerNorm, self).__init__()
        self.weight = nn.Parameter(torch.ones(hidden_size))
        self.bias = nn.Parameter(torch.zeros(hidden_size))
        self.variance_epsilon = eps

    def forward(self, x):
        u = x.mean(-1, keepdim=True)
        s = (x - u).pow(2).mean(-1, keepdim=True)
        x = (x - u) / torch.sqrt(s + self.variance_epsilon)
        return self.weight * x + self.bias

In [3]:

class SelfAttention(nn.Module):
    def __init__(self, num_attention_heads, input_size, hidden_size, output_dim=1, kernel_size=3,
                 hidden_dropout_prob=0.5, attention_probs_dropout_prob=0.5):
        super(SelfAttention, self).__init__()
        input_size = input_size - kernel_size + 1

        if hidden_size % num_attention_heads != 0:
            raise ValueError(
                "The hidden size (%d) is not a multiple of the number of attention "
                "heads (%d)" % (hidden_size, num_attention_heads))


        self.num_attention_heads = num_attention_heads
        self.attention_head_size = int(hidden_size / num_attention_heads)
        self.all_head_size = hidden_size

        self.query = nn.Linear(input_size, self.all_head_size)
        self.key = nn.Linear(input_size, self.all_head_size)
        self.value = nn.Linear(input_size, self.all_head_size)


        self.attn_dropout = nn.Dropout(attention_probs_dropout_prob)
        self.out_dropout = nn.Dropout(hidden_dropout_prob)

        self.dense = nn.Linear(hidden_size, input_size)

        self.LayerNorm = LayerNorm(input_size, eps=1e-12)

        self.out = nn.Linear(input_size, output_dim)

        self.cnn = nn.Conv1d(1, 1, kernel_size, stride=1)
        self.relu = nn.ReLU()
        self.LeakyReLU = nn.LeakyReLU()
        self.dropout = nn.Dropout(0.6)

    def transpose_for_scores(self, x):
        new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size)
        x = x.view(*new_x_shape)
        return x.permute(0, 2, 1, 3)

    def forward(self, input_tensor): #input tensor (128,10000)
        cnn_hidden = self.cnn(input_tensor.view(input_tensor.size(0), 1, -1))#(128,1,9998)
        cnn_hidden = self.LeakyReLU(cnn_hidden)

        cnn_hidden = self.dropout(cnn_hidden)

        input_tensor = cnn_hidden
        self.cnn_result = input_tensor

        mixed_query_layer = self.query(input_tensor)
        mixed_key_layer = self.key(input_tensor)
        mixed_value_layer = self.value(input_tensor)

        self.mixed_query_layer_ = mixed_query_layer
        self.mixed_key_layer_ = mixed_key_layer
        self.mixed_value_layer_ = mixed_value_layer

        query_layer = self.transpose_for_scores(mixed_query_layer)
        key_layer = self.transpose_for_scores(mixed_key_layer)
        value_layer = self.transpose_for_scores(mixed_value_layer)

        self.query_layer_ = query_layer
        self.key_layer_ = key_layer
        self.value_layer_ = value_layer

        attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
        attention_scores = attention_scores / np.sqrt(self.attention_head_size)
        self.attention_scores_ = attention_scores
        attention_probs = nn.Softmax(dim=-1)(attention_scores)
        self.attention_probs_ = attention_probs
        attention_probs = self.attn_dropout(attention_probs)
        self.attention_probs__ = attention_probs

        context_layer = torch.matmul(attention_probs, value_layer)
        self.context_layer_ = context_layer

        context_layer = context_layer.permute(0, 2, 1, 3).contiguous()
        self.context_layer__ = context_layer

        new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,)
        self.new_context_layer_shape_ = new_context_layer_shape

        context_layer = context_layer.view(*new_context_layer_shape)
        self.context_layer3 = context_layer



        hidden_states = self.dense(context_layer)
        self.h1 = hidden_states
        # hidden_states = self.LeakyReLU(hidden_states)

        hidden_states = self.out_dropout(hidden_states) #128，1，9998
        self.h2 = hidden_states

        hidden_states = self.LayerNorm(hidden_states + input_tensor) #128，1，9998
        self.h3 = hidden_states

        output = self.out(self.LeakyReLU(hidden_states.view(hidden_states.size(0), -1)))
        self.o = output
        return output


In [4]:
label = pd.read_csv("./culm_length_label.csv")
data = pd.read_csv("./culm_length_data.csv")

In [5]:
X_train_reduction, X_test_reduction, Y_train, Y_test = train_test_split(data, label,test_size=0.2, random_state=2)

In [6]:
Y_train = pd.DataFrame(Y_train)
Y_test = pd.DataFrame(Y_test)
Y_train = Y_train.values.astype(float)
Y_test = Y_test.values.astype(float)
X_train_reduction = pd.DataFrame(X_train_reduction)
X_test_reduction = pd.DataFrame(X_test_reduction)
X_train_reduction = X_train_reduction.values.astype(float)
X_test_reduction = X_test_reduction.values.astype(float)

In [7]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device used: {DEVICE}')
is_scaler = False
batch_size = 128  
hidden_dim = 128  
output_dim = 1  
num_attention_heads = 8
kernel_size = 3
best_corr_coef = -1
LR = 0.0001
epochs = 100

hidden_dropout_prob=0.5
attention_probs_dropout_prob = 0.5

Device used: cuda


In [8]:
np.random.seed(42)
torch.manual_seed(42)

<torch._C.Generator at 0x7f60791fe7b0>

In [9]:
save_path = './cn_at_culm_len.pth'

In [10]:
kf = KFold(n_splits=5, shuffle=True, random_state=42)

best_corr_coefs = []

In [None]:
start_time = time.time()
x = X_train_reduction
y = Y_train
for fold, (train_index, test_index) in enumerate(kf.split(x)):
    x_train, x_test = x[train_index], x[test_index]
    y_train, y_test = y[train_index], y[test_index]


    scaler = StandardScaler()
    x_train = scaler.fit_transform(x_train)
    x_test = scaler.transform(x_test)

    x_train_tensor = torch.from_numpy(x_train).float().to(DEVICE)
    y_train_tensor = torch.from_numpy(y_train).float().to(DEVICE)
    x_test_tensor = torch.from_numpy(x_test).float().to(DEVICE)
    y_test_tensor = torch.from_numpy(y_test).float().to(DEVICE)

    train_data = TensorDataset(x_train_tensor, y_train_tensor)
    test_data = TensorDataset(x_test_tensor, y_test_tensor)

    train_loader = DataLoader(train_data, batch_size, shuffle=True)
    test_loader = DataLoader(test_data, batch_size, shuffle=False)

    model = SelfAttention(num_attention_heads, x_train.shape[1], hidden_dim, output_dim,
                          hidden_dropout_prob=hidden_dropout_prob, kernel_size=kernel_size,
                          attention_probs_dropout_prob=attention_probs_dropout_prob).to(DEVICE)
    loss_function = torch.nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=LR)

    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10, verbose=True)

    best_corr_coef = -1

    for epoch in range(epochs):
        model.train()
        running_loss = 0
        train_bar = tqdm(train_loader, desc=f'第 {epoch + 1}/{epochs} 轮')
        for data in train_bar:
            x_batch, y_batch = data
            optimizer.zero_grad()
            y_pred = model(x_batch)
            loss = loss_function(y_pred, y_batch.reshape(-1, 1))
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

    model.eval()
    y_test_preds, y_test_trues = [], []
    with torch.no_grad():
        for x_batch, y_batch in test_loader:
            y_test_pred = model(x_batch)
            y_test_preds.extend(y_test_pred.cpu().numpy().reshape(-1).tolist())
            y_test_trues.extend(y_batch.cpu().numpy().reshape(-1).tolist())

    corr_coef = np.corrcoef(y_test_preds, y_test_trues)[0, 1]
    if corr_coef > best_corr_coef:
        best_corr_coef = corr_coef
        torch.save(model.state_dict(), save_path)
    best_corr_coefs.append(best_corr_coef)

    print(f'Fold {fold+1} - Epoch {epoch+1}: Best Correlation Coefficient: {best_corr_coef:.4f}')

end_time = time.time()
execution_time = end_time - start_time
print(f"time: {execution_time} 秒")

In [12]:
print(f"time: {execution_time} 秒")

代码执行时间: 46.30194592475891 秒


In [12]:
average = np.mean(best_corr_coefs)
print(best_corr_coefs)
print(f"corr: {average:.4f}")
import numpy as np
performance_scores = np.array(best_corr_coefs)
mean_score = np.mean(performance_scores)
sample_variance = np.var(performance_scores, ddof=1)
sample_std_dev = np.sqrt(sample_variance)
standard_error = sample_std_dev / np.sqrt(len(performance_scores))
print(standard_error)

各折最佳相关系数: [0.7476666516718471, 0.7230705099991172, 0.732883567349856, 0.7332355810875453, 0.7704683366504448]
平均最佳相关系数: 0.7415
0.008244318057459954


In [13]:
### load_model
model_a = SelfAttention(num_attention_heads, 
                      X_train_reduction.shape[1], 
                      hidden_dim, 
                      output_dim,
                      hidden_dropout_prob=hidden_dropout_prob,
                      kernel_size = kernel_size,
                      attention_probs_dropout_prob=attention_probs_dropout_prob).to(DEVICE)
model_a.load_state_dict(torch.load("./cn_at_culm_len.pth"))

<All keys matched successfully>

In [14]:

data_test_tensor = torch.from_numpy(X_test_reduction).to(torch.float32).to(DEVICE)
label_test_tensor = torch.from_numpy(Y_test).to(torch.float32).to(DEVICE)

In [15]:
model_a.eval()
with torch.no_grad():
    a_output = model_a(data_test_tensor)
pre = a_output.cpu().detach().numpy().reshape(-1).tolist()
tru = label_test_tensor.cpu().detach().numpy().reshape(-1).tolist()
test_corr_coef = np.corrcoef(pre, tru)[0,1]
print("test_corr",test_corr_coef)

test_corr 0.7388878509424767
