In [1]:
import torch
from torch import nn as nn
from torch import optim as optim
from torch.utils import data as Data
from torchvision.datasets import __file__
device = 'cuda' if torch.cuda.is_available() else 'cpu'
#     ''' 
#     运行环境:PyTorch1.8.0 & Python3.8.1 & Cuda11.1 & Cudnn8.0
#     CPU：i7-10070F  RAM:16G 2600 GPU:RTX3070 
#     '''
class attention1D_layer(nn.Module):
#     ''' 
#     注意力机制模块
#     input   x= [batch_size,channels(信号通道数),points(采样点数目)] 多通道一维信号
#     return out = [b*x,b] 
#     其中:b*x 通道权重乘输入信号 ， b为基于神经得到的各采样点权重
       
#     '''    
    def __init__(self,features):
        super(attention1D_layer,self).__init__()
        self.features = features
        # 一维全局平均池化获得全部通道数均值
        self.GobalPooling = nn.AdaptiveAvgPool1d(1)
        
        # 使用一个带Leakly的线性激活函数的BP网络获取采样点权重值 [channels →channels/4 → channels]
        self.Linear1 = nn.Sequential(
            nn.Linear(features,int(features/4)),
            nn.ReLU6())
        self.Linear2 = nn.Sequential(
            nn.Linear(int(features/4),features),
            nn.ReLU6())
        
    #前向传播函数
    def forward(self,x):    #传播前，进行维度交换  [batch_size,channels,points]→[batch_size,points,channels]
        b = self.GobalPooling(x.transpose(1,2))
        #全局池化后释放最后一个维度通道数(channels)
        b = b.squeeze(-1)
        #送入BP网络获得b
        b = self.Linear2(self.Linear1(b))
        b = torch.mean(b,0).reshape(1,1,self.features)
        #b = nn.Softmax(b)
        return b*x,b.squeeze(0).squeeze(0)
        
class Inception_layer(nn.Module):
# '''
#     多尺度卷积模块
#     使用4种模块卷积核处理数据来获取不同大小的感受野 
#     参考论文:https://arxiv.org/abs/1512.00567 及 InceptionE InceptionV3 InceptionV4
# '''
    def __init__(self,in_features,out_features):
        super(Inception_layer,self).__init__()
        self.in_features = in_features
        self.Conv1x3 = nn.Sequential(
            nn.Conv1d(in_features,10,kernel_size=1,stride=1,padding=0),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Conv1d(10,out_features,kernel_size=3,stride=1,padding=1),
            nn.ReLU()    
        )
        self.Conv1x5 = nn.Sequential(
            nn.Conv1d(in_features,10,kernel_size=1,stride=1,padding=0),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Conv1d(10,out_features,kernel_size=5,stride=1,padding=2),
            nn.ReLU(),
        )
        self.Conv3x5 = nn.Sequential(
            nn.Conv1d(in_features,10,kernel_size=3,stride=1,padding=1),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Conv1d(10,out_features,kernel_size=5,stride=1,padding=2),
            nn.ReLU()
        )
        self.short_cut = nn.Sequential(
            nn.Conv1d(in_features,out_features,kernel_size=1,stride=1,padding=0)
        )
    def forward(self,x):
        part_1 = self.Conv1x3(x)
        part_2 = self.Conv3x5(x)
        part_3 = self.Conv1x5(x)
        x = self.short_cut(x)
        # 返回四种尺度信号词义
        return part_1,part_2,part_3,x

In [2]:
class ResPart(nn.Module):
# '''
#     恒等连接结构 :  by conv1→conv3→conv1
#     考虑输入通道数是否需要调整来满足残差相接: 
#         F(x) = adjust(x) + f(x)
#     参考论文结构:https://arxiv.org/pdf/1512.03385.pdf
# '''
    def __init__(self,in_features,out_features):
        self.in_features = in_features
        self.out_features = out_features
        super(ResPart,self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        if in_features != out_features:
            self.conv = nn.Sequential(
                nn.Conv1d(in_features,out_features,kernel_size=1,stride=1,padding=0),
                nn.ReLU(),
                nn.Dropout(0.1),
                nn.Conv1d(out_features,out_features,kernel_size=3,stride=1,padding=1),
                nn.ReLU(),
                nn.Dropout(0.1),
                nn.Conv1d(out_features,out_features,kernel_size=1,stride=1,padding=0),
                nn.ReLU(),
                nn.Dropout(0.1)
             )
            self.adjust_x = nn.Conv1d(in_features,out_features,kernel_size=1,stride=1,padding=0)
        else:
            self.conv = nn.Sequential(
                nn.Conv1d(in_features,out_features,kernel_size=1,stride=1,padding=0),
                nn.ReLU(),
                nn.Dropout(0.1),
                nn.Conv1d(out_features,out_features,kernel_size=1,stride=1,padding=0),
                nn.ReLU(),
                nn.Dropout(0.1),
                nn.Conv1d(out_features,out_features,kernel_size=1,stride=1,padding=0),
                nn.ReLU(),
                nn.Dropout(0.1)
            )
    def forward(self,x):
        if self.in_features !=  self.out_features:
            short_cut = self.adjust_x(x)
            x = self.conv(x)
            return short_cut.add(x)
        else:
            return self.conv(x).add(x) 

In [3]:
class sbd_mode(nn.Module):
    def __init__(self,in_channels,goal_channel=1):
        super(sbd_mode,self).__init__()
        self.in_channels = in_channels
        self.goal_channel = goal_channel
        self.ResBody_layer1 = ResPart(in_channels,100)
        self.InCeption_layer = Inception_layer(100,100)
        self.ResBody_layer2 = ResPart(400,100)
        self.attention_layer = attention1D_layer(100)     
        self.conv = nn.Conv1d(100,self.goal_channel,kernel_size=1)
        self.fully = nn.Sequential(
            nn.Linear(100,200),
            nn.ReLU(),
            nn.Linear(200,100))
        
        
    #
    def forward(self,x):
        x = self.ResBody_layer1(x)
        #print(x.shape)
        p1,p2,p3,x = self.InCeption_layer(x)
        x = torch.cat((p1,p2,p3,x),axis=1)
        #print(x.shape)
        x = self.ResBody_layer2(x)
        x,b = self.attention_layer(x)
        x = self.conv(x)
        x = x.squeeze(1)
        return self.fully(x)
model = sbd_mode(in_channels=3).to(device)   

In [4]:
print(model)

sbd_mode(
  (ResBody_layer1): ResPart(
    (conv): Sequential(
      (0): Conv1d(3, 100, kernel_size=(1,), stride=(1,))
      (1): ReLU()
      (2): Dropout(p=0.1, inplace=False)
      (3): Conv1d(100, 100, kernel_size=(3,), stride=(1,), padding=(1,))
      (4): ReLU()
      (5): Dropout(p=0.1, inplace=False)
      (6): Conv1d(100, 100, kernel_size=(1,), stride=(1,))
      (7): ReLU()
      (8): Dropout(p=0.1, inplace=False)
    )
    (adjust_x): Conv1d(3, 100, kernel_size=(1,), stride=(1,))
  )
  (InCeption_layer): Inception_layer(
    (Conv1x3): Sequential(
      (0): Conv1d(100, 10, kernel_size=(1,), stride=(1,))
      (1): ReLU()
      (2): Dropout(p=0.1, inplace=False)
      (3): Conv1d(10, 100, kernel_size=(3,), stride=(1,), padding=(1,))
      (4): ReLU()
    )
    (Conv1x5): Sequential(
      (0): Conv1d(100, 10, kernel_size=(1,), stride=(1,))
      (1): ReLU()
      (2): Dropout(p=0.1, inplace=False)
      (3): Conv1d(10, 100, kernel_size=(5,), stride=(1,), padding=(2,))
     

In [None]:
## train 
import hiddenlayer as h1
optimizer = optim.Adam(model.parameters())
loss_func = nn.MSELoss()
history1 = h1.History()
canvas1 = h1.Canvas()
for epoch in range(500):
    all_loss = 0
    for step,(b_x,b_y) in enumerate(data_gen):
        b_x = b_x.to(device)
        b_y = b_y.to(device)
        pre = model(b_x)
        loss = loss_func(pre,b_y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        all_loss += loss.item()
    epo_loss = all_loss / 10
    history1.log(epoch,train_loss=epo_loss)
    
    with canvas1:
        canvas1.draw_plot(history1['train_loss'])   