1.13 搭建深度学习网络提取CO 

将figure文件夹中的图像按日期转化为张量形式

In [None]:
import os
from PIL import Image
import torch
from torchvision import transforms

"""对图像预处理 转化为张量
   RGBA转化为RGB通道
   图像缩放
   对图像每个通道数据归一化
   张量堆叠 每五个图像一轮输出张量堆叠作为网络输入
   输出张量打印形状检查
   存储于D：/deep learning torch
"""
transform = transforms.Compose([
    transforms.Resize((224, 224)),  #图像缩放
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  #对图像每个通道的数据进行归一化处理

])

def load_and_stack_images(image_folder):
    """加载图像，每五个图像堆叠成一个张量"""
    image_tensors = []
    stacked_tensors = []

    # 遍历文件夹并加载图像
    for image_file in sorted(os.listdir(image_folder)):
        if image_file.endswith(".png"):
            image_path = os.path.join(image_folder, image_file)
            with Image.open(image_path).convert("RGB") as img:
                img_tensor = transform(img)
                image_tensors.append(img_tensor)

                # 当达到五个图像时，堆叠它们并重置列表
                if len(image_tensors) == 5:
                    stacked_tensors.append(torch.stack(image_tensors))
                    image_tensors = []

    return stacked_tensors

# 主图像文件夹和保存张量的目录
main_directory = "C:\\Users\\HUAWEI\\Desktop\\figure"
save_directory = "D:\\deep learning torch"

# 创建保存目录
if not os.path.exists(save_directory):
    os.makedirs(save_directory)

# 加载图像并堆叠
for date_folder in os.listdir(main_directory):
    date_folder_path = os.path.join(main_directory, date_folder)
    if os.path.isdir(date_folder_path):
        stacked_image_groups = load_and_stack_images(date_folder_path)

        # 处理每组堆叠的图像张量
        for i, images_tensor in enumerate(stacked_image_groups):
            tensor_shape = images_tensor.shape
            print(f"Date: {date_folder}, Group: {i}, Tensor Shape: {tensor_shape}")

            # 保存张量到D盘目录
            tensor_filename = f"{date_folder}_group_{i}.pt"
            tensor_save_path = os.path.join(save_directory, tensor_filename)
            torch.save(images_tensor, tensor_save_path)


遍历data文件夹 提取excel文件中指定列数据 同时提取CO列作为标签数据 将excel表中每一列数据进行归一化 同时将数据信息与张量信息进行融合 输出新张量 作为网络的输入

In [None]:
import os
import pandas as pd
import torch

# 归一化函数
def normalize_dataframe(df):
    for column in df.columns:
    
            min_value = df[column].min()
            max_value = df[column].max()
            df[column] = (df[column] - min_value) / (max_value - min_value)
    return df

# 指定的列名，包括CO列
columns_of_interest = ['Average Heartbeat Interval 3.12 (s)', 'Heartbeat Period Variability 3.12', 'R1 AC/DC Component Ratio 3.12', 'Dicrotic Notch Component to DC Component Ratio 3.12', 'ln Average DC Component 3.12', 'R1', 'R2', 'R3', 'R4', 'PPG Pulse Wave Dynamics Parameters 3.12', 'CO']

# 文件夹路径
tensor_directory = "D:\\deep learning torch"
data_directory = "C:\\Users\\HUAWEI\\Desktop\\data"
output_directory = "D:\\path_to_save_results"
output_file_name = "results.txt"
output_path = os.path.join(output_directory, output_file_name)

# 创建输出目录
os.makedirs(output_directory, exist_ok=True)

# 准备用于保存结果的文件
with open(output_path, 'w') as output_file:
     for data_file in os.listdir(data_directory):
            if data_file.endswith(".xlsx"):
                # 从数据文件名构造图像张量文件夹的名称
                day = os.path.splitext(data_file)[0]  # 提取文件名（不包括扩展名）
                tensor_folder_name = f"figure{day}"  # 构造图像文件夹名称

                data_path = os.path.join(data_directory, data_file)
                df = pd.read_excel(data_path, usecols=columns_of_interest)
                df_normalized = normalize_dataframe(df)

                for index, row in df_normalized.iterrows():
                    tensor_file_name = f"{tensor_folder_name}_group_{index}.pt"
                    tensor_file_path = os.path.join(tensor_directory, tensor_file_name)
                    if os.path.exists(tensor_file_path):
                        image_tensor = torch.load(tensor_file_path)
                        co_label = row['CO']  # 提取CO列的值
    
                        output_file.write(f"Date: {tensor_folder_name}, Group: {index}, Tensor Shape: {image_tensor.shape}, Data: {row.drop('CO').to_dict()}, CO Label: {co_label}\n")

print(f"Results saved to {output_path}")


创建一个数据加载器 加载器在训练过程中将张量数据以及标签提供给网络：创建自定义的Dataset类 这个类读取对应的图像张量和数值数据 作为网络输入返回
DataLoader在训练循环中批量加载数据 在训练循环中使用 DataLoader 获取数据和对应的CO标签 输入到网络中进行训练

创建Dataset 读取对应图像张量与数值数据

创建并使用 CustomDataset 类 输入网络的张量动态地在每次调用 __getitem__ 方法时被加载和返回 在 CustomDataset 类 self.data 属性存储了数据文件路径 数值数据 CO标签的元组 而实际的图像张量是在调用 __getitem__ 方法时从文件中加载

CustomDataset 被用来从指定的目录中加载数据  通过 DataLoader 进行批量处理 在训练循环中 遍历 data_loader 来获取每个批次的数据 并将其输入到模型中进行训练

In [None]:
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
import torch
import os


class CustomDataset(Dataset):
    def __init__(self, data_directory, tensor_directory, columns_of_interest):
        self.tensor_directory = tensor_directory
        self.data = []

        # 处理每个文件
        for data_file in os.listdir(data_directory):
            if data_file.endswith(".xlsx"):
                data_path = os.path.join(data_directory, data_file)
                df = pd.read_excel(data_path, usecols=columns_of_interest)
                df_normalized = normalize_dataframe(df)

                tensor_folder_name = 'figure' + os.path.splitext(data_file)[0]
                for index, row in df_normalized.iterrows():
                    tensor_file_name = f"{tensor_folder_name}_group_{index}.pt"
                    tensor_file_path = os.path.join(tensor_directory, tensor_file_name)
                    if os.path.exists(tensor_file_path):
                        self.data.append((tensor_file_path, row.drop('CO'), row['CO']))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        tensor_path, numeric_data, co_label = self.data[idx]
        image_tensor = torch.load(tensor_path)
        return image_tensor, numeric_data.values.astype('float32'), co_label

    

       
        
        

       




创建Dataloader 用Dataloader加载数据 Dataloader将数据输入网络进行训练

输入数值网络结构体

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import KFold



# 自定义数据集类
class CustomDataset(Dataset):
    def __init__(self, data_directory, columns_of_interest):
        self.data = []
        for data_file in os.listdir(data_directory):
            if data_file.endswith(".xlsx"):
                data_path = os.path.join(data_directory, data_file)
                df = pd.read_excel(data_path, usecols=columns_of_interest)
                df_normalized = normalize_dataframe(df)  
                for _, row in df_normalized.iterrows():
                    numeric_data = row.drop('CO').astype(float)
                    co_label = row['CO'].astype(float)
                    self.data.append((numeric_data, co_label))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        numeric_data, co_label = self.data[idx]
        numeric_data = torch.tensor(numeric_data.values, dtype=torch.float32)
        co_label = torch.tensor(co_label, dtype=torch.float32).unsqueeze(0)
        return numeric_data, co_label

class NumericNetwork(nn.Module):
    def __init__(self, numeric_features):
        super(NumericNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(numeric_features, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.BatchNorm1d(16),
            nn.ReLU(),
            nn.Linear(16, 1)
        )
        self._initialize_weights()

    def forward(self, numeric_data):
        output = self.fc(numeric_data)
        return output

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
# 定义模型训练相关的工具函数
def print_gradients(net):
    for name, param in net.named_parameters():
        if param.requires_grad and param.grad is not None:
            print(f'{name} gradient: {param.grad.data.norm(2)}')


dataset = CustomDataset(data_directory, columns_of_interest)
data_loader = DataLoader(dataset, batch_size=20, shuffle=True)
kf = KFold(n_splits=10)

# K折交叉
for fold, (train_idx, val_idx) in enumerate(kf.split(dataset)):
    print(f"Starting fold {fold + 1}")
    train_subset = torch.utils.data.Subset(dataset, train_idx)
    val_subset = torch.utils.data.Subset(dataset, val_idx)

    train_loader = DataLoader(train_subset, batch_size=20, shuffle=True)
    val_loader = DataLoader(val_subset, batch_size=20, shuffle=False)

    model = NumericNetwork(numeric_features=len(columns_of_interest) - 1)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    

print(model)

num_epochs = 5000
max_grad_norm = 0.01  # 设置梯度裁剪的最大范数 防止梯度爆炸
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0  #损失函数计数器
    for batch_idx, (numeric_data, co_label) in tqdm(enumerate(train_loader), total=len(train_loader)):
       
        optimizer.zero_grad()
        outputs = model(numeric_data)
        loss = criterion(outputs, co_label)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        print(f"Fold {fold + 1}, Epoch {epoch + 1}, Batch {batch_idx + 1}, Loss: {loss.item()}")
        print("Input data:", numeric_data)
        print("Labels:", co_label)
    print(f"Fold {fold + 1}, Epoch {epoch + 1}, Average Epoch Loss: {epoch_loss / len(train_loader)}")
    

torch.save(model.state_dict(), f'model_weights_fold_{fold + 1}.pth')


可视化网络结构

In [None]:
from torchviz import make_dot


model_example = NumericNetwork(numeric_features=10) 


example_input = torch.randn(10, 10)

output = model_example(example_input)


viz_graph = make_dot(output, params=dict(list(model_example.named_parameters()) + [('input', example_input)]))


viz_graph.render("numeric_network_graph", format="png")


os.path.exists("numeric_network_graph.png")


In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import r2_score

#网络训练好之后 使用网络进行预测
#关闭梯度
#张量数据输入 Dataloader
all_data_loader = DataLoader(dataset, batch_size=20, shuffle=False)


model.eval()


all_predictions = []
all_actuals = []
with torch.no_grad():
    for numeric_data, co_label in all_data_loader:
        outputs = model(numeric_data)
        all_predictions.extend(outputs.view(-1).tolist())
        all_actuals.extend(co_label.view(-1).tolist())

#评估
r2 = r2_score(all_actuals, all_predictions)
print("R-squared:", r2)

#绘图
plt.scatter(all_actuals, all_predictions, alpha=0.5)
plt.xlabel("Actual Values")
plt.ylabel("Predicted Values")
plt.title("Predicted vs Actual Values")
plt.show()
