In [8]:
import torch
import torch.nn as nn

# 定义神经网络
class SingleLayerNet(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(SingleLayerNet, self).__init__()
        # 定义单个线性层
        self.linear = nn.Linear(in_features=input_dim, 
                               out_features=output_dim)
    
    def forward(self, x):
        # 前向传播：通过线性层
        out = self.linear(x)
        return out

# --------------------------
# 示例用法
# --------------------------

if __name__ == "__main__":
    # 设置参数
    input_dim = 10   # 输入特征维度
    output_dim = 5   # 输出特征维度
    batch_size = 3   # 批大小

    # 创建模型实例
    model = SingleLayerNet(20, output_dim)
    
    # 打印模型结构
    print("模型结构:")
    print(model)
    
    # 生成随机输入数据
    x = torch.randn(batch_size, input_dim, 20)  # shape: [3, 10]
    
    # 前向传播
    output = model(x)
    
    # 打印输出结果
    print("\n输入形状:", x.shape)
    print("输出形状:", output.shape)
    print("\n前3个样本的输出示例:")
    print(output)

模型结构:
SingleLayerNet(
  (linear): Linear(in_features=20, out_features=5, bias=True)
)

输入形状: torch.Size([3, 10, 20])
输出形状: torch.Size([3, 10, 5])

前3个样本的输出示例:
tensor([[[-0.0756,  0.7473,  1.0564,  0.2725,  0.8614],
         [ 0.1039,  0.8008,  0.9195,  1.0703,  0.9789],
         [ 0.3596, -0.3920,  0.0724, -0.3199, -0.2100],
         [ 0.2259,  0.1826,  0.3812, -0.0281,  0.2861],
         [-0.1274,  0.5550,  0.4859,  0.1172,  0.3532],
         [-0.4267,  0.5544,  0.3043,  0.0673,  0.6002],
         [ 0.3817,  0.4111,  0.0077, -0.1533, -0.0500],
         [ 0.3110,  0.3331, -0.3116,  0.4077,  0.9141],
         [ 0.1817,  0.9595,  0.2959,  0.2309,  0.6635],
         [-0.7728, -1.2180,  0.2332, -0.1493,  0.1157]],

        [[-0.3624, -0.1314,  1.9791, -0.0159,  0.6907],
         [-0.3848,  0.4734,  1.5797, -0.3050,  0.6885],
         [-0.5546, -0.7310,  0.0978,  0.4279, -0.2886],
         [-0.1615, -0.2032, -0.1288,  1.0454,  0.0314],
         [ 0.1704,  0.5333,  1.6015,  0.4394,  0.5292],

In [1]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split, TensorDataset
from Models.SeEANet import SeEANet
import torch.optim as optim
import torch.nn as nn
from datetime import datetime
from utils.tools import make_datasets,make_dir
from torchsummary import summary

# 生成含趋势、周期和噪声的合成数据
def generate_finance_data(seq_length=200, noise_level=0.5):
    t = np.linspace(0, 20, seq_length)
    trend = 0.05 * t ** 2  # 二次趋势项
    seasonality = 2 * np.sin(0.5 * t) + 1.5 * np.cos(1.2 * t)  # 多频率周期项
    y_true = trend + seasonality
    noise = noise_level * np.random.randn(seq_length)
    y_noisy = y_true + noise
    return t, y_true, y_noisy
 
t, y_true, y_noisy = generate_finance_data()

def kalman_filter_1d(y_noisy, Q=1e-4, R=0.1):
    n = len(y_noisy)
    x_est = np.zeros(n)  # 状态估计序列
    P = np.zeros(n)  # 协方差序列
    x_est[0] = y_noisy[0]  # 初始状态：第一个观测值
    P[0] = 1.0  # 初始协方差：假设初始估计不确定性大
    
    for k in range(1, n):
        # 预测阶段：基于前一时刻估计值外推
        x_pred = x_est[k-1]  # 状态预测（假设A=1，无控制输入）
        P_pred = P[k-1] + Q  # 协方差预测：累积过程噪声Q
        
        # 更新阶段：用当前观测值修正预测
        K = P_pred / (P_pred + R)  # 卡尔曼增益：噪声越小，K越接近1
        x_est[k] = x_pred + K * (y_noisy[k] - x_pred)  # 状态更新：融合预测与观测
        P[k] = (1 - K) * P_pred  # 协方差更新：观测降低不确定性
    return x_est
 
y_kalman = kalman_filter_1d(y_noisy)

class TransformerPredictor(nn.Module):
    def __init__(self, input_dim=1, d_model=64, nhead=4, num_layers=2, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.embedding = nn.Linear(input_dim, d_model)  # 输入特征映射
        self.pos_encoder = PositionalEncoding(d_model, dropout)  # 位置编码
        # Transformer编码器层（含因果掩码，避免未来信息泄露）
        encoder_layer = nn.TransformerEncoderLayer(
            d_model, nhead, dim_feedforward=256, dropout=dropout, batch_first=True,
            norm_first=True  # 先归一化，再残差连接，提升稳定性
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(d_model, 1)  # 输出层
        
    def forward(self, x):
        # 输入形状：(batch_size, seq_length, input_dim)
        x = self.embedding(x) * np.sqrt(self.d_model)  # 缩放嵌入，稳定梯度
        x = self.pos_encoder(x)  # 注入位置信息
        # 因果掩码：确保第i步只能看到前i步的信息
        seq_len = x.size(1)
        mask = torch.triu(torch.ones(seq_len, seq_len, dtype=torch.bool), diagonal=1)
        x = self.transformer_encoder(x, src_key_padding_mask=None, mask=mask)
        output = self.fc(x)  # 输出形状：(batch_size, seq_length, 1)
        return output
 
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        # 预计算位置编码，避免重复计算
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        # x形状：(batch_size, seq_length, d_model)
        x = x + self.pe[:x.size(1), :]  # 叠加位置编码
        return self.dropout(x)
    

def create_dataset(y, lookback=15):
    """将序列转换为滑动窗口样本：输入为前lookback步，标签为第lookback+1步"""
    X, y_label = [], []
    for i in range(lookback, len(y)):
        X.append(y[i-lookback:i])
        y_label.append(y[i])
    return (torch.tensor(X).float().unsqueeze(-1),  # 形状：(样本数, lookback, 1)
            torch.tensor(y_label).float())
 
X_train, y_train = create_dataset(y_kalman)  # 输入为卡尔曼滤波后的数据
dataset = TensorDataset(X_train, y_train)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)
 
model = TransformerPredictor()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
 
def train_model(model, dataloader, epochs=80):
    for epoch in range(epochs):
        model.train()
        total_loss = 0.0
        for X_batch, y_batch in dataloader:
            print(X_batch.shape)
            optimizer.zero_grad()
            output = model(X_batch)[:, -1, :].squeeze()  # 取最后一步预测
            print(output.shape)
            loss = criterion(output, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1:03d}, Loss: {total_loss/len(dataloader):.4f}")
print(model)
train_model(model, dataloader)

  return (torch.tensor(X).float().unsqueeze(-1),  # 形状：(样本数, lookback, 1)


TransformerPredictor(
  (embedding): Linear(in_features=1, out_features=64, bias=True)
  (pos_encoder): PositionalEncoding(
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=64, out_features=64, bias=True)
        )
        (linear1): Linear(in_features=64, out_features=256, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=256, out_features=64, bias=True)
        (norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (fc): Linear(in_features=64, out_features=1, bias=True)
)
torch.Size([16, 15, 1])
torch.Size([16])
torch.Size([16,

KeyboardInterrupt: 

  return (torch.tensor(X).float().unsqueeze(-1),  # 形状：(样本数, lookback, 1)


TransformerPredictor(
  (embedding): Linear(in_features=1, out_features=64, bias=True)
  (pos_encoder): PositionalEncoding(
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=64, out_features=64, bias=True)
        )
        (linear1): Linear(in_features=64, out_features=256, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=256, out_features=64, bias=True)
        (norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (fc): Linear(in_features=64, out_features=1, bias=True)
)
torch.Size([16])
torch.Size([16])
torch.Size([16])
torch

In [None]:
def visualize_results():
    fig, axes = plt.subplots(2, 2, figsize=(18, 12))
    
    # 原始数据与真实信号：噪声显著干扰观测
    axes[0, 0].plot(t, y_noisy, 'r.', alpha=0.6, label='Noisy Data')
    axes[0, 0].plot(t, y_true, 'g-', label='True Signal', linewidth=2)
    axes[0, 0].set_title('1. 原始数据：噪声掩盖真实趋势与周期')
    
    # 卡尔曼滤波结果：噪声被有效抑制
    axes[0, 1].plot(t, y_kalman, 'b-', linewidth=2, label='Kalman Filtered')
    axes[0, 1].set_title('2. 卡尔曼滤波：去除高频噪声，保留低频趋势')
    
    # Transformer预测结果：基于去噪数据的长程依赖建模
    with torch.no_grad():
        y_pred = model(X_train).squeeze().numpy()
    predict_t = t[15:]  # lookback=15，预测从第15步开始
    axes[1, 0].plot(predict_t, y_pred, 'm-', label='Transformer Prediction')
    axes[1, 0].set_title('3. Transformer预测：捕捉多尺度依赖（趋势+周期）')
    
    # 真实值与预测值对比：融合后误差显著降低
    axes[1, 1].plot(predict_t, y_true[15:], 'g-', label='True Signal')
    axes[1, 1].plot(predict_t, y_pred, 'm--', label='Prediction', alpha=0.8)
    axes[1, 1].set_title('4. 融合效果：预测曲线紧密贴合真实信号')
    
    for ax in axes.flatten():
        ax.legend()
        ax.grid(alpha=0.3)
    plt.tight_layout()
    plt.show()
 
visualize_results()

In [2]:
import torch
from tensorboardX import SummaryWriter
from Models.SeEANet import SeEANet
from datetime import datetime
import os

In [4]:
modelName = "SAT"
formatted_time = datetime.now().strftime("%m-%d-%H:%M:%S")
checkpoint_save_path = os.path.join(os.getcwd(), "Run", modelName,formatted_time + ".pth")
checkpoint_save_path

'/home/admin123/SATNet/Run/SAT/04-12-16:51:47.pth'

In [8]:
import matplotlib.pyplot as plt
import torch

# 可视化卷积核
from Models.SeEANet import SeEANet

# 定义模型
model = SeEANet()
# 创建虚拟输入数据
dummy_input = torch.randn(1, 6, 256)  # 根据你的模型输入形状调整

# 前向传播以初始化模型参数
model(dummy_input)

# 现在可以访问卷积核了
filters = model.conv1.weight.detach().numpy()

# 可视化卷积核
for i in range(filters.shape[0]):
    # 确保卷积核是 2D 的
    kernel = filters[i, 0]
    if kernel.ndim == 1:
        # 如果是 1D，可以将其转换为 2D
        kernel = kernel.reshape(1, -1)
    plt.imshow(kernel, cmap="gray")
    plt.axis("off")
    plt.savefig(f"filter_{i}.png", bbox_inches="tight", pad_inches=0)
    plt.close()

In [5]:
import torch
import torch.nn as nn

# 假设输出形状为 (batch_size, 3)
batch_size = 32
outputs = torch.randn(batch_size, 3)  # 网络输出
labels = torch.randn(batch_size, 3)   # 真实值

# 定义权重
weights = torch.tensor([0.5, 1.0, 2.0], dtype=torch.float32)  # 权重可以是任意正数

# 计算加权的均方误差
print((weights *(outputs - labels) ** 2).shape)
mse_loss = torch.mean(weights * (outputs - labels) ** 2)

print(f"加权MSE损失: {mse_loss.item()}")

torch.Size([32, 3])
加权MSE损失: 2.647390127182007


In [4]:
import torch
from torchviz import make_dot
from Models.SeEANet import SeEANet

# 定义模型
model = SeEANet()

# 创建虚拟输入
x = torch.randn(1, 10)

# 生成计算图
dot = make_dot(model(x), params=dict(model.named_parameters()))
dot.render("model_visualization", format="pdf")  # 保存为 PDF 文件

IndexError: too many indices for tensor of dimension 2

In [3]:
writer = SummaryWriter()

model = SeEANet()
model = model.cuda()
inputs = torch.randn(24, 6, 256)
inputs = inputs.cuda()
# summary(model, input_size=(6, 256))
outputs = model(inputs)
print("outpus's shape :", outputs.shape)

writer.add_graph(model, input_to_model=(24, 6, 256))

writer.close()

outpus's shape : torch.Size([24, 1])
Type 'Tuple[int, int, int]' cannot be traced. Only Tensors and (possibly nested) Lists, Dicts, and Tuples of Tensors can be traced
Error occurs, No graph saved


RuntimeError: Type 'Tuple[int, int, int]' cannot be traced. Only Tensors and (possibly nested) Lists, Dicts, and Tuples of Tensors can be traced