In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import sys
sys.path.append("/home/ljq/code/Ringdown_gap_filling/Proj/")
from model.QTranTimeMixerMod import *

signal = torch.randn(16, 1056)  # 假设批量大小为16，信号长度为1056
qt=QTransformModule()
spec=qt(signal)
print(f"spec shape: {spec.shape}")
model = TimeMixerEncoder(signal_length=1056, num_token=32, token_dim=64)
output = model(spec)
print(f"Output shape: {output.shape}")



spec shape: torch.Size([32, 1, 4, 1056])
input shape: torch.Size([32, 1, 4, 1056])


RuntimeError: mat1 and mat2 shapes cannot be multiplied (128x1056 and 264x264)

In [None]:
import torch
import torch.nn as nn
from config.config import Config
import math


#Embedding
import torch
import math
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class SinusoidalPositionEmbedding(nn.Module):
    def __init__(self, num_token=Config.num_token_IMR, token_dim=Config.segment_length_IMR, embedding_dim=Config.EMBEDDING_dim):
        super(SinusoidalPositionEmbedding, self).__init__()
        self.num_token = num_token
        self.embedding_dim = embedding_dim
        self.token_dim = token_dim
        
        # 创建正弦位置嵌入矩阵
        position = torch.arange(0, self.num_token, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, self.token_dim, 2).float() * (-math.log(10000.0) / self.token_dim))
        
        # 正弦和余弦函数的嵌入
        pe = torch.zeros(self.num_token, self.token_dim)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        # 将正弦位置嵌入注册为常量
        self.register_buffer('pe', pe)
        
        # 线性映射层，将正弦位置嵌入映射到 output_dim
        self.linear = nn.Linear(self.token_dim, self.embedding_dim)

    def forward(self, x):
        # x 的形状为 (batch_size, channels, num_token, embedding_dim)
        batch_size, channels, num_token,token_dim = x.shape
        
        # 扩展位置嵌入的维度以适应输入
        position_embeds = self.pe[:self.num_token, :].unsqueeze(0).unsqueeze(0)  # 形状 (1, 1, num_token, embedding_dim)
        position_embeds = position_embeds.expand(batch_size, channels, -1, -1)  # 形状 (batch_size, channels, num_token, embedding_dim)
        
        # 线性映射位置嵌入
        position_embeds = self.linear(position_embeds)  # 形状变为 (batch_size, channels, num_token, output_dim)
        
        return position_embeds

class TokenEmbedding(nn.Module):
    def __init__(self, token_dim=Config.segment_length_IMR, embedding_dim=Config.EMBEDDING_dim):
        super(TokenEmbedding, self).__init__()
        self.token_dim = token_dim
        self.embedding_dim = embedding_dim
        # 定义线性层，将 token_dim 映射到 embedding_dim
        self.linear = nn.Linear(self.token_dim, self.embedding_dim)
    
    def forward(self, x):
        # x 的形状为 (batch_size, channels, num_token, token_dim)
        # 我们对最后一维 (token_dim) 进行线性映射
        x = self.linear(x)
        # 返回形状为 (batch_size, channels, num_token, embedding_dim)
        return x



class ConvEmbeddingWithLinear(nn.Module):
    def __init__(self, channels=Config.channels, conv_out_channels=Config.CEout_channels, kernel_size=Config.CEkernel_size, padding=Config.CEpadding, token_dim=Config.segment_length_IMR,embedding_dim=Config.EMBEDDING_dim):
        super(ConvEmbeddingWithLinear, self).__init__()
        self.channels=channels
        self.token_dim = token_dim
        self.embedding_dim = embedding_dim
        # 定义 2D 卷积层
        self.conv = nn.Conv2d(in_channels=channels, out_channels=conv_out_channels, kernel_size=kernel_size, padding=padding)
        
        # 定义线性映射层，映射到最终的 embedding_dim
        self.linear = nn.Linear(self.token_dim, self.embedding_dim)
        
        # 激活函数
        self.relu = nn.ReLU()

    def forward(self, x):
        # x 的形状为 (batch_size, channels, num_tokens, token_dim)
        #print(f"x shape after convolution: {x.shape}")
        # Step 1: 进行 2D 卷积，保持 channels 不变
        conv_out = self.conv(x)  # 卷积输出的形状 (batch_size, channels, num_tokens, token_dim)
        conv_out = self.relu(conv_out)
        #print(f"conv_out shape after convolution: {conv_out.shape}")
        # Step 2: 线性层映射，仅映射 token_dim
        #batch_size, channels, num_tokens, token_dim = conv_out.shape
        
        # 将 token_dim 映射为 embedding_dim，形状为 (batch_size, channels, num_tokens, embedding_dim)
        #conv_out = conv_out.view(batch_size * channels, num_tokens, token_dim)
        embedding = self.linear(conv_out)  # 线性映射 token_dim -> embedding_dim
        #print(f"conv_out shape after convolution: {conv_out.shape}")
        # 将形状恢复为 (batch_size, channels, num_tokens, embedding_dim)
        #embedding = embedding.view(batch_size, self.channels, num_tokens, -1)
        
        return embedding

class ConditionMLP(nn.Module):
    def __init__(self, input_dim, output_dim=Config.EMBEDDING_dim):
        super(ConditionMLP, self).__init__()
        # 定义 MLP：输入维度为 input_dim，输出维度为 output_dim
        self.mlp = nn.Sequential(
            nn.Linear(input_dim, Config.segment_length_IMR),  # 第一个线性层，映射到128维
            nn.ReLU(),                  # 激活函数
            nn.Linear(Config.segment_length_IMR, output_dim)   # 映射到目标的 64 维
        )

    def forward(self, x):
        # 前向传播
        return self.mlp(x)



class ConditionConv1D(nn.Module):
    def __init__(self, input_channels=1, output_channels=Config.num_token_IMR):
        super(ConditionConv1D, self).__init__()
        # 定义 1D 卷积层
        # in_channels = 1 表示输入通道数为 1，out_channels = 32 表示输出通道数为 32
        # kernel_size = 3 表示卷积核大小，padding = 1 确保输入输出的长度保持不变
        self.conv1d = nn.Conv1d(in_channels=input_channels, out_channels=output_channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU()

    def forward(self, x):
        # 输入 x 形状为 (batch_size, 64)
        # 先扩展成 (batch_size, 1, 64) 以适应 Conv1d 输入格式
        x = x.unsqueeze(1)

        # 通过 1D 卷积，将形状变为 (batch_size, 32, 64)
        x = self.conv1d(x)

        # 激活函数
        x = self.relu(x)

        return x



class Conv2DEMbedding(nn.Module):
    def __init__(self, in_channels=1, out_channels=Config.channels, token_dim=Config.segment_length_IMR, embedding_dim=Config.EMBEDDING_dim, kernel_size=Config.ConEkernel_size, padding=Config.ConEpadding):
        super(Conv2DEMbedding, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.token_dim = token_dim
        self.embedding_dim = embedding_dim
        self.kernel_size = kernel_size
        self.padding = padding
        # 定义 2D 卷积层
        # Conv2d: in_channels=1, out_channels=8, kernel_size=3x3, padding=1 保持输入的空间维度不变
        self.conv2d = nn.Conv2d(in_channels=self.in_channels, out_channels=self.out_channels, kernel_size=self.kernel_size, padding=self.padding)
        self.relu = nn.ReLU()
        self.linear = nn.Linear(self.token_dim, self.embedding_dim)

    def forward(self, x):
        # 输入 x 的形状为 (batch_size, 32, 64)
        # 先通过 unsqueeze 添加维度，变为 (batch_size, 1, 32, 64)
        x = x.unsqueeze(1)
        
        # 通过 2D 卷积，变为 (batch_size, 8, 32, 64)
        x = self.conv2d(x)

        # 激活函数
        x = self.relu(x)
        x=self.linear(x)

        return x

class ConditionEmbedding(nn.Module):
    def __init__(self):
        super(ConditionEmbedding, self).__init__()
        # 定义 1D 卷积层
        self.mlp = ConditionConv1D()
        # 定义 2D 卷积层
        self.conv1d = ConditionConv1D()
        self.conv2dembedding = Conv2DEMbedding()
    def forward(self, x):
        x = self.mlp(x)

        x = self.conv1d(x)
        x = self.conv2dembedding(x)

        return x    
class TotalEmbedding(nn.Module):
    def __init__(self, token_dim=Config.segment_length_IMR, embedding_dim=Config.EMBEDDING_dim, conv_channels=Config.channels, conv_out_channels=Config.channels,num_token=Config.num_token_IMR,kernel_size=Config.CEkernel_size_IMR, dropout_p=Config.dropout):
        super(TotalEmbedding, self).__init__()
        self.token_dim = token_dim
        self.embedding_dim = embedding_dim
        self.conv_channels = conv_channels
        self.conv_out_channels = conv_out_channels
        self.num_token = num_token
        self.kernel_size = kernel_size
       
        self.dropout_p = dropout_p
        # 初始化各个嵌入模块
        self.token_embedding = TokenEmbedding(self.token_dim, self.embedding_dim)
        self.conv_embedding = ConvEmbeddingWithLinear(self.conv_channels, self.conv_out_channels, embedding_dim=self.embedding_dim)
        self.position_embedding = SinusoidalPositionEmbedding(self.num_token, self.embedding_dim)
        #self.condition_embedding=ConditionEmbedding()
        # 2D 卷积层，用于之后的卷积操作
        self.conv2d = nn.Conv2d(in_channels=self.conv_channels, out_channels=self.conv_channels, kernel_size=self.kernel_size, padding=1)
        
        # 激活函数 GeLU
        self.gelu = nn.GELU()

        # Dropout
        self.dropout = nn.Dropout(dropout_p)

    def forward(self, x):
        # 计算 Token Embedding, Convolutional Embedding, Position Embedding
        token_embed = self.token_embedding(x)
        conv_embed = self.conv_embedding(x)
        position_embed = self.position_embedding(x)
        #condition_embed=self.condition_embedding(x)
       

        # DF = TE + CE + PE
        DF = token_embed + conv_embed + position_embed#+condition_embed
        
        # 进行 2D 卷积并加上残差连接
        conv_out = self.conv2d(DF)  # 卷积操作
        conv_out = self.gelu(conv_out)  # GeLU 激活
        DF_with_conv = DF + conv_out  # 残差项
        
        # Dropout 操作
        RF = self.dropout(DF_with_conv)
        
        return RF
class TransformerEncoderLayerWithChannels(nn.Module):
    def __init__(self, embedding_dim=Config.EMBEDDING_dim, num_heads=Config.num_heads, dim_feedforward=Config.FF_dim, dropout=Config.dropout):
        super(TransformerEncoderLayerWithChannels, self).__init__()
        self.embedding_dim = embedding_dim 
        self.num_heads = num_heads
        self.dim_feedforward = dim_feedforward  
        self.dropout = dropout
        # 定义多头自注意力机制
        # Multi-head Self-Attention
        self.self_attn = nn.MultiheadAttention(embed_dim=self.embedding_dim, num_heads=self.num_heads, dropout=self.dropout)
        
        # Feedforward Network
        self.feedforward = nn.Sequential(
            nn.Linear(self.embedding_dim, self.dim_feedforward),
            nn.ReLU(),
            nn.Linear(self.dim_feedforward, self.embedding_dim),
        )
        
        # Layer Normalization
        self.norm1 = nn.LayerNorm(self.embedding_dim)
        self.norm2 = nn.LayerNorm(self.embedding_dim)
        
        # Dropout
        self.dropout = nn.Dropout(self.dropout)

    def forward(self, src):
        # src 的形状为 (batch_size, channels, num_tokens, embedding_dim)
        batch_size, channels, num_tokens, embedding_dim = src.shape
        
        # 我们对每个 channel 独立应用 self-attention
        outputs = []
        for ch in range(channels):
            # 对于每个 channel，进行 multi-head self-attention
            src_ch = src[:, ch, :, :]  # 取出当前 channel 的数据，形状为 (batch_size, num_tokens, embedding_dim)
            
            # Self-Attention expects input as (num_tokens, batch_size, embedding_dim)
            src_ch_transposed = src_ch.transpose(0, 1)  # 转置为 (num_tokens, batch_size, embedding_dim)
            
            # Self-Attention, output shape: (num_tokens, batch_size, embedding_dim)
            attn_output, _ = self.self_attn(src_ch_transposed, src_ch_transposed, src_ch_transposed)
            
            # Residual Connection + Layer Normalization
            src2 = src_ch_transposed + self.dropout(attn_output)
            src2 = self.norm1(src2)
            
            # Feedforward Layer
            src2_transposed = src2.transpose(0, 1)  # 转回 (batch_size, num_tokens, embedding_dim)
            feedforward_output = self.feedforward(src2_transposed)
            
            # Residual Connection + Layer Normalization
            src3 = src2_transposed + self.dropout(feedforward_output)
            output = self.norm2(src3)
            
            # 将处理好的 channel 加入 outputs 列表
            outputs.append(output.unsqueeze(1))  # (batch_size, 1, num_tokens, embedding_dim)
        
        # 拼接所有 channels
        outputs = torch.cat(outputs, dim=1)  # 最终形状为 (batch_size, channels, num_tokens, embedding_dim)
        
        return outputs

class TransformerEncoder(nn.Module):
    def __init__(self, embedding_dim=Config.EMBEDDING_dim, num_heads=Config.num_heads, num_layers=Config.num_layers_T, dim_feedforward=Config.FF_dim, dropout=Config.dropout):
        super(TransformerEncoder, self).__init__()
        self.embedding_dim = embedding_dim
        self.num_heads = num_heads
        self.num_layers = num_layers
        self.dim_feedforward = dim_feedforward
        self.dropout = dropout
        self.layers = nn.ModuleList([TransformerEncoderLayerWithChannels(self.embedding_dim, self.num_heads, self.dim_feedforward, self.dropout) for _ in range(self.num_layers)])

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

class Encoder(nn.Module):
    def __init__(self, channels=Config.channels, token_dim=Config.segment_length_IMR, embedding_dim=Config.EMBEDDING_dim, num_heads=Config.num_heads, num_layers=Config.num_layers_T, dropout=Config.dropout):
        super(Encoder, self).__init__()
        self.channels = channels
        self.token_dim = token_dim
        self.embedding_dim = embedding_dim
        self.num_heads = num_heads
        self.num_layers = num_layers
        self.dropout = dropout
        # 初始化各个嵌入模块
        #self.token_embedding = TokenEmbedding(token_dim, embedding_dim)
        #self.conv_embedding = ConvEmbeddingWithLinear(channels, token_dim, embedding_dim=embedding_dim)
        #self.position_embedding = SinusoidalPositionEmbedding(Config.num_token, token_dim, embedding_dim)
        self.totalembedding=TotalEmbedding()
        self.transformer = TransformerEncoder()
        self.dropout = nn.Dropout(self.dropout)
        self.gelu = nn.GELU()

    def forward(self, x):
        # 进行 Token Embedding, Convolutional Embedding 和 Position Embedding
        RF=self.totalembedding(x)
        
        # Transformer Encoder
        output = self.transformer(RF)# (batch_size, channels, num_tokens, embedding_dim)
        
        return output

class MLP(nn.Module):
    def __init__(self, input_dim=Config.EMBEDDING_dim, hidden_dim=Config.h_dim_MLP, output_dim=Config.EMBEDDING_dim):
        super(MLP, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        # 定义两层全连接层
        # 两层全连接层
        # print(input_dim)
        # print(f"input_dim: {input_dim}, type: {type(input_dim)}")
        # print(f"hidden_dim: {hidden_dim}, type: {type(hidden_dim)}")
        
        self.fc1 = nn.Linear(self.input_dim, self.hidden_dim)  # 第一层
        self.relu = nn.ReLU()  # 激活函数
        self.fc2 = nn.Linear(self.hidden_dim, self.output_dim)  # 第二层，将输出维度映射到 64

    def forward(self, x):
        # 形状: (batch_size, channels, num_tokens, embedding_dim)
        batch_size, channels, num_tokens, embedding_dim = x.shape

        # 调整维度以适配全连接层
        x = x.view(batch_size * channels * num_tokens, embedding_dim)  # 展平成 (batch_size * channels * num_tokens, embedding_dim)
        x = self.fc1(x)  # 第一层全连接
        x = self.relu(x)  # 激活函数
        x = self.fc2(x)  # 第二层全连接，输出维度为 64

        # 恢复形状为 (batch_size, channels, num_tokens, output_dim)
        x = x.view(batch_size, channels, num_tokens, -1)
        return x
class InverseTokenEmbedding(nn.Module):
    def __init__(self, token_embedding_layer):
        super(InverseTokenEmbedding, self).__init__()
        # 获取 TokenEmbedding 的权重并转置
        weight = token_embedding_layer.linear.weight
        self.inverse_linear = nn.Linear(weight.size(1), weight.size(0), bias=False)  # 定义逆映射层
        self.inverse_linear.weight = nn.Parameter(weight.T)  # 使用转置后的权重

    def forward(self, x):
        return self.inverse_linear(x)
def combine_segments(segments, segment_length=Config.segment_length_IMR, signal_length=Config.signal_length_IMR, overlap=Config.overlap):
    """
    将分段的信号重新组合为原始信号，并处理重叠部分。
    
    参数:
    - segments: 输入形状为 (batch_size, channels, num_segments, segment_length) 的张量
    - segment_length: 每个分段的长度，默认为 64
    - signal_length: 重组后的信号总长度，默认为 1056
    - overlap: 重叠率，默认为 50%

    返回:
    - 重组后的信号，形状为 (batch_size, channels, signal_length)
    """
    batch_size, channels, num_segments, _ = segments.shape
    step_size = int(segment_length * (1 - overlap))  # 步长，重叠 50% 的话，步长是 segment_length 的一半

    # 初始化输出张量，用于存储重新拼接后的信号
    output = torch.zeros((batch_size, channels, signal_length), dtype=segments.dtype)

    # 初始化一个计数器张量，用于记录每个位置被覆盖的次数
    counter = torch.zeros((batch_size, channels, signal_length), dtype=segments.dtype)

    # 将每个 segment 拼接到输出张量中
    for i in range(num_segments):
        start = i * step_size  # 计算每个分段的起始位置
        end = start + segment_length
        output=output.to(device)
        segments=segments.to(device)
        counter=counter.to(device)
        # 将当前分段添加到输出张量
        output[:, :, start:end] += segments[:, :, i, :]
        
        # 计数器记录每个位置被覆盖的次数
        counter[:, :, start:end] += 1

    # 处理重叠区域：将那些被多次覆盖的部分除以覆盖次数（即重叠部分除以 2）
    output = output / counter

    return output
class ChannelMerger(nn.Module):
    def __init__(self, input_channels=Config.channels, output_channels=1, signal_length=Config.signal_length_IMR):
        super(ChannelMerger, self).__init__()
        # 输入信号的通道数和输出信号的通道数
        self.input_channels = input_channels
        self.output_channels = output_channels
        self.signal_length = signal_length
        # 定义 Conv1d 卷积层，将输入 8 个通道的信号变换为 1 个输出信号
        # kernel_size=3, padding=1 用于保持输入输出信号长度不变
        self.conv1d = nn.Conv1d(in_channels=input_channels, out_channels=output_channels, kernel_size=3, padding=1)
        
    def forward(self, x):
        # 输入的 x 形状为 (batch_size, input_channels=8, signal_length=1056)
        # 通过 1D 卷积，将每个通道卷积成长度相同的信号
        output = self.conv1d(x)
        # 卷积的输出形状为 (batch_size, output_channels=1, signal_length=1056)
        return output  # 将 (batch_size, 1, 1056) 变为 (batch_size, 1056)

from ssqueezepy import icwt, Wavelet
import torch
import numpy as np




import torch
import torch.nn as nn

class Decoder(nn.Module):
    def __init__(self, inverse_token_embedding):
        super(Decoder, self).__init__()
        self.mlp = MLP()  # MLP模块
        self.inverse_token_embedding = inverse_token_embedding  # 逆 Token Embedding 模块
        self.CM=ChannelMerger()
    def forward(self, encoder_output):
        """
        :param encoder_output: 从 Encoder 得到的输出, 形状为 (batch_size, channels, num_tokens, embedding_dim)
                               即 (16, 8, 32, 128)
        :return: 最终重建的信号, 形状为 (batch_size, 1, signal_length) 即 (16, 1, 1056)
        """
        # Step 1: MLP 处理
        x = self.mlp(encoder_output)  # MLP 输出形状为 (16, 8, 32, 128)

        # Step 2: 通过逆 Token Embedding 将数据从 (16, 8, 32, 128) 转换为 (16, 8, 32, 64)
        x = self.inverse_token_embedding(x)  # 逆 Token Embedding 输出形状为 (16, 8, 32, 64)

        # Step 3: Segment 拼接，将分段信号重新组合成 (16, 8, 1056)
        x = combine_segments(x, segment_length=Config.segment_length_IMR, signal_length=Config.signal_length_IMR, overlap=0.5)  # 拼接后形状为 (16, 8, 1056)

        # Step 4: 将 8 个通道的信号重建为 1 个通道
        x = self.CM(x)  # 小波逆变换后形状为 (16, 1, 1056)

        return x


class IMRGapsFiller(nn.Module):
    def __init__(self, channels=Config.channels, token_dim=Config.segment_length_IMR, embedding_dim=Config.EMBEDDING_dim, num_heads=Config.num_heads, num_layers=Config.num_layers_T, dropout=Config.dropout):
        super(IMRGapsFiller, self).__init__()
        self.encoder = Encoder(channels, token_dim, embedding_dim, num_heads, num_layers, dropout)
        token_embedding_instance = self.encoder.totalembedding.token_embedding
        self.inverse_token_embedding = InverseTokenEmbedding(token_embedding_instance)

        self.decoder = Decoder(self.inverse_token_embedding)
        
    
    def forward(self, x):  

        encoder_output = self.encoder(x)
        output = self.decoder(encoder_output)
        return output










