In [4]:
import torch
import torch.nn as nn
import os
from torch.utils.data import Dataset, DataLoader
import numpy as np
import scipy.io as sio
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import pandas as pd
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F

labels_map = {
    1: "定子匝间短路\正常电机",
    2: "定子匝间短路\短路0.1",
    3: "定子匝间短路\短路0.2",
    4: "定子匝间短路\短路0.5",
    5: "定子匝间短路\短路1",
    6: "定子匝间短路\短路2",
    7: "定子匝间短路\短路5",
    8: "定子匝间短路\短路10",
    9: "定子匝间短路\短路15",
    10: "定子匝间短路\短路20",
}

file_path1="定子匝间短路\正常电机电流.csv"
file_path1_="定子匝间短路\正常电机转矩.csv"
file_path2="定子匝间短路\短路0.1电流.csv"
file_path2_="定子匝间短路\短路0.1转矩.csv"
file_path3="定子匝间短路\短路0.2电流.csv"
file_path3_="定子匝间短路\短路0.2转矩.csv"
file_path4="定子匝间短路\短路0.5电流.csv"
file_path4_="定子匝间短路\短路0.5转矩.csv"
file_path5="定子匝间短路\短路1电流.csv"
file_path5_="定子匝间短路\短路1转矩.csv"
file_path6="定子匝间短路\短路2电流.csv"
file_path6_="定子匝间短路\短路2转矩.csv"
file_path7="定子匝间短路\短路5电流.csv"
file_path7_="定子匝间短路\短路5转矩.csv"
file_path8="定子匝间短路\短路10电流.csv"
file_path8_="定子匝间短路\短路10转矩.csv"
file_path9="定子匝间短路\短路15电流.csv"
file_path9_="定子匝间短路\短路15转矩.csv"
file_path10="定子匝间短路\短路20电流.csv"
file_path10_="定子匝间短路\短路20转矩.csv"

file_paths = [
    file_path1, file_path1_,
    file_path2, file_path2_,
    file_path3, file_path3_,
    file_path4, file_path4_,
    file_path5, file_path5_,
    file_path6, file_path6_,
    file_path7, file_path7_,
    file_path8, file_path8_,
    file_path9, file_path9_,
    file_path10, file_path10_
]

# 创建一个空列表来存储 DataFrame
dfs = []

def read_df(df,df_):
    df = df.iloc[:,1:].T
    df_=df_.iloc[:,1:].T
    df_append = pd.concat([df, df_], ignore_index=True, axis=0)
    return  df_append

# 循环遍历文件路径,读取 CSV 文件并创建 DataFrame
for i in range(0, len(file_paths), 2):
    df = pd.read_csv(file_paths[i])
    df_ = pd.read_csv(file_paths[i+1])
    data = read_df(df,df_)
    dfs.append(data)


# 
dfs = np.array(dfs)
dfs = dfs.squeeze()
dfs.shape    
# dfs['label'] = labels_map.keys()
x = torch.tensor(dfs,device='cuda',dtype=torch.float32)
keys_list = list(labels_map.keys())
labels = torch.tensor(np.array(keys_list), device='cuda', dtype=torch.float32)  # Use int64 for class labels
x.shape, labels.shape


class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        
        # To match dimensions for residual connection
        self.shortcut = nn.Conv1d(in_channels, out_channels, kernel_size=1) if in_channels != out_channels else None

    def forward(self, x):
        identity = x if self.shortcut is None else self.shortcut(x)
        out = self.conv1(x)
        out = self.relu(out)
        out = self.conv2(out)
        out += identity
        return self.relu(out)

class SelfAttention(nn.Module):
    def __init__(self, in_dim):
        super(SelfAttention, self).__init__()
        self.query_layer = nn.Linear(in_dim, in_dim)
        self.key_layer = nn.Linear(in_dim, in_dim)
        self.value_layer = nn.Linear(in_dim, in_dim)

    def forward(self, x):
        Q = self.query_layer(x)  # Shape: [batch_size, seq_length, in_dim]
        K = self.key_layer(x)
        V = self.value_layer(x)

        attention_scores = torch.softmax(Q @ K.transpose(-2, -1), dim=-1)  # Shape: [batch_size, seq_length, seq_length]
        output = attention_scores @ V  # Shape: [batch_size, seq_length, in_dim]
        return output

class ResSAModel(nn.Module):
    def __init__(self, num_classes):
        super(ResSAModel, self).__init__()
        self.res_block1 = ResidualBlock(4, 256)  # Input channels set to 1
        self.res_block2 = ResidualBlock(256, 64)  # Add another residual block
        self.attention1 = SelfAttention(64)  # Attention over features after second block
        self.fc = nn.Linear(64, num_classes)  # Output layer for classification

    def forward(self, x):
        x = self.res_block1(x)  # Shape: [batch_size, 16 (channels), seq_length]
        x = self.res_block2(x)  # Shape: [batch_size, 32 (channels), seq_length]
        
        x = x.permute(0, 2, 1)   # Change shape to [batch_size, seq_length (1000), features (32)]
        
        x = self.attention1(x)   # Apply self-attention
        
        x = x.mean(dim=1)       # Global Average Pooling across sequence length
        x = self.fc(x)
        return x
class MyDataset(Dataset):
    def __init__(self, x,labels,transform=None):
        self.labels = labels  # Load labels from CSV
        self.data_tensor = x
        self.transform = transform
    def __len__(self):
        return self.data_tensor.shape[0]

    def __getitem__(self, idx):
        # Return the sample and its corresponding label (if applicable)
        # Here we assume labels are just indices for demonstration purposes
        sample = self.data_tensor[idx]
        label = self.labels[idx]  # Replace with actual label if available
        return sample, label
    
# Example usage:
num_classes = 10  # Define number of output classes
dataset = MyDataset(x,labels,transform=None)
data_loader = DataLoader(dataset, batch_size=2, shuffle=True)
model = ResSAModel(num_classes)
criterion = nn.CrossEntropyLoss()  # Suitable for multi-class classification
optimizer = optim.Adam(model.parameters(), lr=0.01)
x.shape

RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [2]:
num_epochs = 1000  # Number of epochs to train

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
writer = SummaryWriter("runs/test_2")

for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0
    
    for features, labels in data_loader:
        features = features.float().to(device)
        labels = labels.long().to(device)  # 确保标签是长整型
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(features)  # Add channel dimension for Conv1d
        
        # Compute loss
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization step
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    avg_loss = running_loss / len(data_loader)    
    writer.add_scalar('Loss/train', avg_loss, epoch) 
    
    

RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [None]:

def test_model(model, input_data):
    # 将输入数据转换为 PyTorch 张量
    input_tensor = torch.tensor(input_data, dtype=torch.float32).unsqueeze(0)  # 添加批次维度

    # 将张量移动到相应的设备（CPU 或 GPU）
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    input_tensor = input_tensor.to(device)

    # 设置模型为评估模式
    model.eval()

    with torch.no_grad():  # 不需要计算梯度
        # 进行前向传播以获得输出
        outputs = model(input_tensor.unsqueeze(1))  # 添加通道维度

        # 获取预测结果
        _, predicted = torch.max(outputs, 1)  # 获取最大值的索引作为预测标签

    return predicted.item()  # 返回预测的标签
# 假设您已经训练了模型并且它被称为 'model'

# 输入一个长度为4004的列表
input_data = dfs.iloc[4,:-1].values 

# 调用测试函数
predicted_label = test_model(model, input_data)

print(f'Predicted Label: {predicted_label}')