### 猜想1：每个特征单独做全局和局部

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiFeatureAFF(nn.Module):
    def __init__(self, num_features, channels=64, r=4):
        super(MultiFeatureAFF, self).__init__()
        self.num_features = num_features
        inter_channels = int(channels // r)
        
        # 对每个特征定义独立的局部和全局注意力机制
        self.local_atts = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(channels, inter_channels, kernel_size=1, stride=1, padding=0),
                nn.BatchNorm2d(inter_channels),
                nn.ReLU(inplace=True),
                nn.Conv2d(inter_channels, 1, kernel_size=1, stride=1, padding=0),
            ) for _ in range(num_features)
        ])
        
        self.global_atts = nn.ModuleList([
            nn.Sequential(
                nn.AdaptiveAvgPool2d(1),
                nn.Conv2d(channels, inter_channels, kernel_size=1, stride=1, padding=0),
                nn.BatchNorm2d(inter_channels),
                nn.ReLU(inplace=True),
                nn.Conv2d(inter_channels, 1, kernel_size=1, stride=1, padding=0),
            ) for _ in range(num_features)
        ])

    def forward(self, features):
        # 独立计算每个特征的局部和全局注意力权重
        att_weights = []
        for i, feature in enumerate(features):
            local_weight = self.local_atts[i](feature)
            global_weight = self.global_atts[i](feature)
            att_weights.append(local_weight + global_weight)
        
        # 对所有特征的注意力权重进行softmax归一化
        att_weights = torch.cat(att_weights, dim=1)
        att_weights = F.softmax(att_weights, dim=1)
        
        # 独立加权每个特征，并计算加权和
        weighted_features = []
        start_idx = 0
        for i, feature in enumerate(features):
            end_idx = start_idx + 1
            weights = att_weights[:, start_idx:end_idx, :, :]
            weighted_feature = feature * weights
            weighted_features.append(weighted_feature)
            start_idx = end_idx
        
        output = sum(weighted_features)
        return output


### 猜想2：迭代两个特征的fusion做n次：

In [None]:
import torch
import torch.nn as nn

class AFF(nn.Module):
    '''
    多特征融合 AFF
    '''
    def __init__(self, channels=64, r=4):
        super(AFF, self).__init__()
        inter_channels = int(channels // r)

        self.local_att = nn.Sequential(
            nn.Conv2d(channels, inter_channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(inter_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(inter_channels, channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(channels),
        )

        self.global_att = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(channels, inter_channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(inter_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(inter_channels, channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(channels),
        )
    
        self.sigmoid = nn.Sigmoid()

    def forward(self, x, residual):
        xa = x + residual
        xl = self.local_att(xa)
        xg = self.global_att(xa)
        xlg = xl + xg
        wei = self.sigmoid(xlg)

        xo = x * wei + residual * (1 - wei)
        return xo

def iterative_feature_fusion_with_aff(features, aff_module):
    """
    使用AFF模块逐步融合多个特征的函数。
    
    :param features: 特征列表，每个特征是一个张量。
    :param aff_module: 用于融合两个特征的AFF模块实例。
    :return: 融合后的特征。
    """
    if len(features) < 2:
        raise ValueError("需要至少两个特征进行融合")
    
    fused_feature = features[0]  # 初始化融合特征为列表中的第一个特征
    for feature in features[1:]:
        fused_feature = aff_module(fused_feature, feature)
    
    return fused_feature

# 示例使用
features = [torch.randn(1, 64, 32, 32) for _ in range(4)]  # 假设有4个特征
aff_module = AFF(channels=64, r=4)

fused_feature = iterative_feature_fusion_with_aff(features, aff_module)


## 猜想3：只对单个特征做全局特征

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiAFF(nn.Module):
    def __init__(self, channels=64, r=4):
        super(MultiAFF, self).__init__()
        inter_channels = int(channels // r)
        
        # 只保留全局注意力机制
        self.global_att = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(channels, inter_channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(inter_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(inter_channels, 1, kernel_size=1, stride=1, padding=0),  # 输出通道数为1，用于计算权重
        )

    def forward(self, features):
        # features 应该是一个特征列表
        weights = []
        for feature in features:
            weights.append(self.global_att(feature))
        
        # 使用softmax计算权重
        weights = torch.cat(weights, dim=1)  # 将权重沿通道维度拼接
        weights = F.softmax(weights, dim=1)  # 计算softmax
        
        # 使用计算出的权重对特征进行加权求和
        weighted_features = [features[i] * weights[:, i:i+1, :, :] for i in range(len(features))]
        output = torch.sum(torch.stack(weighted_features), dim=0)
        
        return output


## 猜想4：concat方法

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ModifiedAFFWithSoftmax(nn.Module):
    def __init__(self, channels=64, r=4, num_features=2):
        super(ModifiedAFFWithSoftmax, self).__init__()
        self.num_features = num_features
        inter_channels = int(channels // r * num_features)  # 调整以适应拼接后的特征
        
        self.local_att = nn.Sequential(
            nn.Conv2d(channels * num_features, inter_channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(inter_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(inter_channels, channels * num_features, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(channels * num_features),
        )
        
        self.global_att = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(channels * num_features, inter_channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(inter_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(inter_channels, channels * num_features, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(channels * num_features),
        )

    def forward(self, features):
        x = torch.cat(features, dim=1)  # 拼接特征
        
        # 局部和全局注意力
        xl = self.local_att(x)
        xg = self.global_att(x)
        xlg = xl + xg
        
        # 应用softmax而非sigmoid
        # 注意：需要reshape以适应softmax沿特定维度
        weights = xlg.view(xlg.size(0), self.num_features, -1)
        weights = F.softmax(weights, dim=1)
        weights = weights.view_as(xlg)
        
        # 分离权重并应用于对应的特征
        separated_weights = torch.chunk(weights, self.num_features, dim=1)
        weighted_features = [features[i] * separated_weights[i] for i in range(self.num_features)]
        
        # 加权特征求和
        output = torch.sum(torch.stack(weighted_features, dim=0), dim=0)

        return output


### 测试concat部分

In [32]:
import pickle
import numpy as np
with open('train.pkl', 'rb') as file:
    data = pickle.load(file)

In [33]:
train=data["train_data"]

In [34]:
train[0]

array([[2.0120000e+03, 1.0000000e+00, 1.0000000e+00, ..., 9.2000000e+00,
        0.0000000e+00, 0.0000000e+00],
       [2.0120000e+03, 1.0000000e+00, 1.0000000e+00, ..., 1.5100000e+01,
        0.0000000e+00, 0.0000000e+00],
       [2.0120000e+03, 1.0000000e+00, 1.0000000e+00, ..., 1.8317552e+01,
        0.0000000e+00, 0.0000000e+00],
       ...,
       [2.0120000e+03, 1.0000000e+00, 1.0000000e+00, ..., 0.0000000e+00,
        0.0000000e+00, 0.0000000e+00],
       [2.0120000e+03, 1.0000000e+00, 1.0000000e+00, ..., 0.0000000e+00,
        0.0000000e+00, 0.0000000e+00],
       [2.0120000e+03, 1.0000000e+00, 1.0000000e+00, ..., 0.0000000e+00,
        0.0000000e+00, 0.0000000e+00]])

In [35]:
train=data["train_data"]
train_data_expanded = np.expand_dims(train, axis=-1)
train_data_expanded.shape

(8784, 63, 18, 1)

In [37]:
train_data_expanded = np.transpose(train_data_expanded, (0, 2, 1,3))
train_data_expanded.shape

(8784, 18, 63, 1)

In [38]:
train_data=train_data_expanded[:,5:,:,:]

### 无敌实验

In [44]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ModifiedAFFWithSoftmax(nn.Module):
    def __init__(self, channels=13, r=4, num_features=13):
        super(ModifiedAFFWithSoftmax, self).__init__()
        self.num_features = num_features
        inter_channels = int(channels // r * num_features)
        
        self.local_att = nn.Sequential(
            nn.Conv2d(channels, inter_channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(inter_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(inter_channels, channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(channels),
        )
        
        self.global_att = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(channels, inter_channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(inter_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(inter_channels, channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(channels),
        )

    def forward(self, x):
        xl = self.local_att(x)
        xg = self.global_att(x)
        xlg = xl + xg
        
        weights = xlg.view(xlg.size(0), self.num_features, -1)
        weights = F.softmax(weights, dim=1)
        weights = weights.view_as(xlg)
        
        separated_weights = torch.chunk(weights, self.num_features, dim=1)
        weighted_features = [x[:, i:i+1, :, :] * separated_weights[i] for i in range(self.num_features)]
        
        output = torch.sum(torch.stack(weighted_features, dim=0), dim=0)

        return output.squeeze(1)

# 初始化模型
model = ModifiedAFFWithSoftmax()

# 生成模拟输入数据
train_tensor = torch.from_numpy(train_data)  # Batch size 8784, 18 features, 41 sites, width 1
train_tensor = train_tensor.float()

# 前向传播
output_tensor = model(train_tensor)

# 打印输出形状
print("Output shape:", output_tensor.shape)


Output shape: torch.Size([8784, 63, 1])
