In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import torchvision
from torchvision import transforms
import scipy.io as scio
import random
from torch.utils import data
from sklearn.model_selection import train_test_split

# 数据加载切割

In [3]:
def LoadMat(path):
    mat = scio.loadmat(path)
    emg = mat['emg']
    label = []
    try:
        label = mat['restimulus']
    except:
        label = mat['stimulus']
    return emg, label


def cut(emg, label):
    ans_emg = []
    ans_label = []
    i = 0
    while(i < len(label)):
        if(i + 199 >= len(label) - 1):
            break
        if(label[i] == label[i+199]):
            start = i
            end = i + 200
            temp_emg = emg[start : end]
            temp_label = label[i]
            ans_emg.append(temp_emg)
            ans_label.append(temp_label)
            i += 50
        else:
            i += 50
    return ans_emg, ans_label




def count_unique_labels(labels):
    label_count = {}  # 用字典来存储不同标签及其对应的个数

    for label_list in labels:
        label = label_list[0]  # 提取单个元素的标签
        if label in label_count:
            label_count[label] += 1
        else:
            label_count[label] = 1

    for label, count in label_count.items():
        print(f"标签 {label} 的个数：{count}")

def one_hot(x, class_count=41):
    return torch.eye(class_count)[x, :].squeeze()




# 读取矩阵，并存储emg和label
E1_emg, E1_label = LoadMat("D:/research\EMG project\DB2_s1\S1_E1_A1.mat")



# 将同一志愿者的所有肌电信号数据进行切割后合并
emgs, labels = cut(E1_emg, E1_label)
# E2_emg, E2_label = cut(E2_emg, E2_label)
# E3_emg, E3_label = cut(E3_emg, E3_label)
# emgs.extend(E2_emg)
# emgs.extend(E3_emg)
# labels.extend(E2_label)
# labels.extend(E3_label)

# 查看数据集大小
print('处理前样本数量:'+str(len(emgs)))  # 肌电信号数据集长度为102430

# 发现标签为0的样本数量过多，剔除一部分
# 找到所有标签为 0 的样本的索引
label_0_indices = [i for i, label in enumerate(labels) if label == 0]

# 随机选择 100 个标签为 0 的样本的索引
selected_indices = random.sample(label_0_indices, min(53600, len(label_0_indices)))

# 保留不在选定索引中的样本和标签
emgs = [emgs[i] for i in range(len(emgs)) if i not in selected_indices]
labels = [labels[i] for i in range(len(labels)) if i not in selected_indices]

count_unique_labels(labels)  # 统计不同标签的数据样本量
print('处理后样本数量:'+str(len(emgs)))

# 切割训练集和测试集(6:4)
train_emgs, test_emgs, train_labels, test_labels = train_test_split(emgs, labels, test_size=0.4, random_state=42)

print('训练集大小为:'+str(len(train_labels)))
print('测试集大小为:'+str(len(test_labels)))



处理前样本数量:35351
标签 1 的个数：1551
标签 2 的个数：1156
标签 3 的个数：997
标签 4 的个数：877
标签 5 的个数：777
标签 6 的个数：489
标签 7 的个数：1138
标签 8 的个数：542
标签 9 的个数：774
标签 10 的个数：878
标签 11 的个数：846
标签 12 的个数：858
标签 13 的个数：762
标签 14 的个数：718
标签 15 的个数：448
标签 16 的个数：861
标签 17 的个数：794
处理后样本数量:14466
训练集大小为:8679
测试集大小为:5787


In [4]:
# 将数据集转换为tensor
#* 列表转array 转tensor
train_emgs = np.array(train_emgs)
train_labels = np.array(train_labels)
test_emgs = np.array(test_emgs)
test_labels = np.array(test_labels)

train_emgs = torch.tensor(train_emgs)
train_labels = torch.tensor(train_labels, dtype=torch.int64)   
test_emgs = torch.tensor(test_emgs)
test_labels = torch.tensor(test_labels, dtype=torch.int64)


# 查看训练集和测试集的形状和数据类型
print('训练集样本的形状为:'+str(train_emgs.shape))      #* 总共有多少段，每段长200，12个通道
print('测试集样本的形状为:'+str(test_emgs.shape))
print('训练集标签的形状为:'+str(train_labels.shape))
print('测试集标签的形状为:'+str(test_labels.shape))

print('样本的数据类型为:'+str(train_emgs.dtype))  # emg信号的数据类型为float32
print('标签的数据类型为:'+str(train_labels.dtype))  # label的数据类型为int64

训练集样本的形状为:torch.Size([8679, 200, 12])
测试集样本的形状为:torch.Size([5787, 200, 12])
训练集标签的形状为:torch.Size([8679, 1])
测试集标签的形状为:torch.Size([5787, 1])
样本的数据类型为:torch.float32
标签的数据类型为:torch.int64


In [5]:
class EMG_dataset(data.Dataset):
    def __init__(self, emgs, labels):
        self.emgs = emgs
        self.labels = labels

    def __getitem__(self, index):
        emg = emgs[index]
        label = labels[index]
        label = one_hot(label)     #* 标签为onehot
        return emg, label

    def __len__(self):
        return len(self.emgs)


# 创建训练集的 EMG_dataset 和 DataLoader
train_dataset = EMG_dataset(train_emgs, train_labels)

#todo :
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)      #!这里处理一个批次32

# 创建测试集的 EMG_dataset 和 DataLoader
test_dataset = EMG_dataset(test_emgs, test_labels)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)


# 取出一个批次的数据查看其形状
emg, label = next(iter(train_dataloader))
print('一个批次的样本的形状:'+str(emg.shape))  # batch, 200, 12
print('一个批次的标签的形状:'+str(label.shape))  # batch, 1    #* 标签长度为41维onehot向量

一个批次的样本的形状:torch.Size([32, 200, 12])
一个批次的标签的形状:torch.Size([32, 41])


# 生成器

encoder
decoder
- 调用attention 解码encoder的内容

## attention part

- more to add:
- location function --  calculate location info in seq

In [9]:
#input

import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.autograd import Variable
import numpy as np

import torch
import torch.nn as nn
import random  # 你可能需要导入 random 模块

In [16]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import torchvision
from torchvision import transforms
import scipy.io as scio
import random
from torch.utils import data
from sklearn.model_selection import train_test_split

In [10]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.autograd import Variable
import torch.nn.init as init   #* 根据语音生成模型，选用xavier初始化

class Attention(nn.Module):
	def __init__(self, hidden_dim) :
		super(Attention,self).__init__()
		self.hidden_dim = hidden_dim
		
		self.attention = nn.Linear(hidden_dim * 2, hidden_dim)    #*连接编码器和解码器的隐藏状态后，调整维度
		self. value = nn.Parameter(init.xavier_normal_(torch.empty(hidden_dim)))

    #todo : 前向传播
	def forward(self, hidden, encoder_outputs):
		seq_length = encoder_outputs.shape[0]   #* 编码器输出序列长度为序列长度
		attention_energy = torch.zeros(seq_length)   #*初始化注意力能量--0

		for i in range(seq_length):
			attention_energy[i] = self.score(hidden, encoder_outputs[i])
		
		attention_weight = torch.softmax(attention_energy, dim=0)   
		#* 计算权重

		context = context = torch.sum(attention_weight * encoder_outputs, dim=0)
        
		return context, attention_weight
	

	#todo: score
	def score(self, hidden , encoder_output) :

		#* 需要 hidden隐藏层和 编码器输出为维度相同的向量 -- 相乘
		energy = torch.dot(hidden, encoder_output)
		return energy
	



        


Encoder

In [13]:

class Encoder(nn.Module):
	def __init__(self, input_dim, hidden_dim, num_layers,dropout):
		super(Encoder, self).__init__()
		self.hidden_dim = hidden_dim
		self.num_layers = num_layers
		self.embedding = nn.Embedding(input_dim, hidden_dim)
		self.rnn = nn.LSTM( hidden_dim, num_layers,dropout = dropout)
		self.dropout = nn.Dropout(dropout)

	# todo : to imporve !!!
	#* directly use RNN in forward
	def forward (self,src):
		outputs, (hidden, cell) = self.rnn(src)
		return hidden, cell



    

## decoder

In [15]:
class Decoder(nn.Module):
	def __init__(self, output_dim, emb_dim, hidden_dim, num_layers, dropout) :
		super(Decoder,self).__init__()
		self.output_dim = output_dim
		self.embedding = nn.Embedding(output_dim, emb_dim)
		self.rnn = nn.LSTM(emb_dim, hidden_dim, num_layers, dropout=dropout)
		self.dropout = nn.Dropout(dropout)
		
		#* out = in * W(t) + bias
		self.function_output = nn.Linear(hidden_dim, output_dim)

	def forward(self, input, hidden, cell):
		input = input.unsqueeze(0)
		embedded = self.dropout(self.embedding(input))
		output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
		prediction = self.function_output(output.squeeze(0))
		return prediction, hidden, cell




In [23]:
class Generator(nn.Module) :
	def __init__(self, encoder,decoder) :
		super(Generator,self).__init__()
		self.encoder = encoder
		self.decoder = decoder
		
	
	
	#* input tensor的维度(seq 长度，batch，输入特征维度)
	def forward(self,input_tensor,target , teacher_forcing_ratio=0.5):
		#* initialize  bianliang
		target_length, batch_size ,  = target.shape
		target_ini_size = self.decoder.output_dim

		outputs = torch.zeros(target_length, batch_size, target_ini_size).to(self.device)
		hidden, cell = self.encoder(input_tensor)
		input = input_tensor[0, :, : ].unsqueeze(0)    #* adjust input dim


		for time in range (1, target_length) :
			output ,hidden,cell = self.decoder(input, hidden, cell)
			outputs[time] = output
			teacher_force = random.random() < teacher_forcing_ratio
			top1 = output.argmax(2) 
			input = target[time:time+1, :, :] if teacher_force else top1

		return outputs







## to be continue...



ValueError: too many values to unpack (expected 2)