In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# 1. 定义基础残差块
class BasicBlock(nn.Module):
    expansion = 1  # 输出通道扩展倍数（Bottleneck 会用 4）

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        # 卷积层 1
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        # 卷积层 2
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        # 下采样（如果输入输出维度不一致，需要调整 shortcut）
        self.downsample = downsample

    def forward(self, x):
        identity = x  # 保存输入

        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))

        # 如果需要调整输入维度，走 downsample
        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity  # 残差连接
        out = F.relu(out)
        return out


# 2. 定义 ResNet 主体
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        """
        block: 残差块类型（BasicBlock 或 Bottleneck）
        layers: 每个 stage 的 block 数量，例如 [2,2,2,2] 对应 ResNet-18
        num_classes: 分类类别数
        """
        super(ResNet, self).__init__()
        self.in_channels = 64

        # stem
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # 4 个 stage
        self.layer1 = self._make_layer(block, 64,  layers[0], stride=1)
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        # 全局平均池化 + 全连接层
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride):
        """
        构建一个 stage（包含多个残差块）
        out_channels: 输出通道数
        blocks: 残差块数量
        stride: 第一个 block 的 stride（是否下采样）
        """
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


# 3. 构建 ResNet-18
def resnet18(num_classes=1000):
    return ResNet(BasicBlock, [2, 2, 2, 2], num_classes=num_classes)


# 测试网络
if __name__ == "__main__":
    model = resnet18(num_classes=10)
    x = torch.randn(1, 3, 224, 224)
    y = model(x)
    print(y.shape)  # torch.Size([1, 10])


torch.Size([1, 10])


In [None]:
nc: 1000  # 主任务分类数

stem:
  - {from: [0], module: Conv, args: {out_channels: 64, kernel_size: 7, stride: 2, padding: 3}}
  - {from: [-1], module: BatchNorm, args: {}}
  - {from: [-1], module: ReLU, args: {}}
  - {from: [-1], module: MaxPool, args: {kernel_size: 3, stride: 2, padding: 1}}

backbone:
  # Stage 1
  - {from: [-1], module: ResBlock, args: {channels: 64, stride: 1, dilation: 1}}
  - {from: [-1], module: ResBlock, args: {channels: 64, stride: 1, dilation: 1}}
  # Stage 2
  - {from: [-1], module: ResBlock, args: {channels: 128, stride: 2, dilation: 1}}
  - {from: [-1], module: ResBlock, args: {channels: 128, stride: 1, dilation: 1}}
  # Stage 3
  - {from: [-1], module: ResBlock, args: {channels: 256, stride: 2, dilation: 1}}
  - {from: [-1], module: ResBlock, args: {channels: 256, stride: 1, dilation: 1}}
  # Stage 4
  - {from: [-1], module: ResBlock, args: {channels: 512, stride: 2, dilation: 1}}
  - {from: [-1], module: ResBlock, args: {channels: 512, stride: 1, dilation: 1}}

head:
  - {from: [-1], module: GlobalAvgPool, args: {}}
  - {from: [-1], module: FC, args: {out_features: 1000}}  # 主任务分类头
  - {from: [-2], module: FC, args: {out_features: 10}}    # 辅助任务分类头


In [None]:
import torch
import torch.nn as nn
import yaml

# ------------------------------
# 基础模块
# ------------------------------
class ResBlock(nn.Module):
    def __init__(self, channels, stride=1, dilation=1):
        super().__init__()
        self.conv1 = nn.Conv2d(channels, channels, 3, stride=stride, padding=dilation, dilation=dilation, bias=False)
        self.bn1 = nn.BatchNorm2d(channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(channels, channels, 3, stride=1, padding=dilation, dilation=dilation, bias=False)
        self.bn2 = nn.BatchNorm2d(channels)

    def forward(self, x):
        identity = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += identity
        out = self.relu(out)
        return out

# ------------------------------
# 模块工厂
# ------------------------------
def module_factory(module_name, args, prev_out=None):
    modules = {
        'Conv': lambda: nn.Conv2d(**args),
        'BatchNorm': lambda: nn.BatchNorm2d(args.get('num_features', prev_out.shape[1])),
        'ReLU': lambda: nn.ReLU(inplace=True),
        'MaxPool': lambda: nn.MaxPool2d(**args),
        'GlobalAvgPool': lambda: nn.AdaptiveAvgPool2d((1, 1)),
        'FC': lambda: nn.Linear(args.get('in_features', prev_out.shape[1]), args['out_features']),
        'ResBlock': lambda: ResBlock(**args)
    }
    if module_name not in modules:
        raise ValueError(f"Unknown module: {module_name}")
    return modules[module_name]()

# ------------------------------
# 支持多输入多输出的网络
# ------------------------------
class YAMLNet(nn.Module):
    def __init__(self, yaml_path, in_channels=3, verbose=True):
        super().__init__()
        with open(yaml_path, 'r') as f:
            self.cfg = yaml.safe_load(f)

        self.layers = nn.ModuleList()
        self.from_list = []
        chs = [in_channels]

        # 构建层
        for layer_cfg in self.cfg.get('stem', []) + self.cfg.get('backbone', []) + self.cfg.get('head', []):
            f = layer_cfg['from']  # 列表形式
            m_name = layer_cfg['module']
            args = layer_cfg['args'].copy()
            prev_out = None if len(f)==0 else torch.zeros(1, chs[f[0]], 1, 1)  # 临时推测通道
            layer = module_factory(m_name, args, prev_out)
            self.layers.append(layer)
            self.from_list.append(f)

            # 更新输出通道
            if 'out_channels' in args:
                c2 = args['out_channels']
            elif 'channels' in args:
                c2 = args['channels']
            elif 'out_features' in args:
                c2 = args['out_features']
            else:
                c2 = chs[-1]
            chs.append(c2)

            if verbose:
                print(f"{len(self.layers)-1:03}: {m_name}, from {f}, args={args}, out_ch={c2}")

    def forward(self, x):
        outputs = {0: x}
        out_heads = []
        head_start_idx = len(self.layers) - len(self.cfg['head'])
        for i, layer in enumerate(self.layers):
            f = self.from_list[i]
            inp = torch.cat([outputs[j] for j in f], dim=1) if len(f) > 1 else outputs[f[0]]
            out = layer(inp)
            outputs[i+1] = out

            # 收集 head 输出
            if i >= head_start_idx:
                out_heads.append(out)

        return tuple(out_heads) if len(out_heads) > 1 else out_heads[0]

# ------------------------------
# 测试
# ------------------------------
if __name__ == "__main__":
    model = YAMLNet("resnet18_multi_task.yaml", in_channels=3, verbose=True)
    x = torch.randn(1, 3, 224, 224)
    y = model(x)
    if isinstance(y, tuple):
        for i, out in enumerate(y):
            print(f"Output head {i} shape:", out.shape)
    else:
        print("Output shape:", y.shape)


In [None]:
nc: 1000  # 分类数

stem:
  - {type: "image", channels: 3, height: 224, width: 224}  # 图像输入
  - {type: "vector", channels: 128}                        # 音频特征向量

backbone:
  # Stage 1 图像处理
  - {from: [0], module: Conv, args: {out_channels: 64, kernel_size: 7, stride: 2, padding: 3}}
  - {from: [-1], module: BatchNorm, args: {}}
  - {from: [-1], module: ReLU, args: {}}
  - {from: [-1], module: MaxPool, args: {kernel_size: 3, stride: 2, padding: 1}}

  # Stage 2 融合音频
  - {from: [-1, 1], module: ResBlock, args: {channels: 128, stride: 1, dilation: 1}}

head:
  - {from: [-1], module: FC, args: {out_features: 1000}}  # 最终输出层


In [None]:
nc: 1000  # 分类数

stem:
  - {type: "image", channels: 3, height: 224, width: 224}  # 图像输入
  - {type: "vector", channels: 128}                        # 音频特征向量

backbone:
  # Stage 1 图像处理
  - {from: [0], module: Conv, args: {out_channels: 64, kernel_size: 7, stride: 2, padding: 3}}
  - {from: [-1], module: BatchNorm, args: {}}
  - {from: [-1], module: ReLU, args: {}}
  - {from: [-1], module: MaxPool, args: {kernel_size: 3, stride: 2, padding: 1}}

  # Stage 2 融合音频
  - {from: [-1, 1], module: ResBlock, args: {channels: 128, stride: 1, dilation: 1}}

head:
  - {from: [-1], module: FC, args: {out_features: 1000}}  # 最终输出


In [None]:
import torch
import torch.nn as nn
import yaml

# ----------------------------
# 简单 ResBlock 示例
# ----------------------------
class ResBlock(nn.Module):
    def __init__(self, channels, stride=1, dilation=1):
        super().__init__()
        self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, stride=stride,
                               padding=dilation, dilation=dilation, bias=False)
        self.bn1 = nn.BatchNorm2d(channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, stride=1,
                               padding=dilation, dilation=dilation, bias=False)
        self.bn2 = nn.BatchNorm2d(channels)

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += identity
        out = self.relu(out)
        return out

# ----------------------------
# 简单 FC 层，可包含 flatten
# ----------------------------
class FC(nn.Module):
    def __init__(self, out_features):
        super().__init__()
        self.fc = nn.Linear(in_features=None, out_features=out_features)  # in_features 动态设置
        self.out_features = out_features

    def forward(self, x):
        if x.ndim > 2:
            x = torch.flatten(x, 1)
        # 动态初始化 in_features
        if self.fc.in_features is None:
            self.fc = nn.Linear(x.shape[1], self.out_features).to(x.device)
        return self.fc(x)

# ----------------------------
# 模型构建函数
# ----------------------------
class DynamicNet(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        layers_cfg = cfg['backbone'] + cfg['head']
        self.from_list = [l['from'] for l in layers_cfg]
        self.layers = nn.ModuleList()
        for l in layers_cfg:
            module_cls = globals()[l['module']]
            self.layers.append(module_cls(**l['args']))

    def forward(self, inputs):
        # inputs: list of input tensors, 对应 stem
        outputs = {i: inp for i, inp in enumerate(inputs)}
        for i, layer in enumerate(self.layers):
            f = self.from_list[i]
            f = f if isinstance(f, list) else [f]
            x = torch.cat([outputs[j] for j in f], dim=1) if len(f) > 1 else outputs[f[0]]
            out = layer(x)
            outputs[len(outputs)] = out
        return out  # 只返回 head 最终输出

# ----------------------------
# 测试
# ----------------------------
if __name__ == "__main__":
    # 加载 YAML
    with open("resnet_multiinput.yaml") as f:
        cfg = yaml.safe_load(f)

    # 创建模型
    model = DynamicNet(cfg)

    # 假设输入：图像 + 音频向量
    img = torch.randn(2, 3, 224, 224)
    audio = torch.randn(2, 128, 56, 56)  # 映射到图像 feature map 尺寸
    out = model([img, audio])

    print("输出 shape:", out.shape)  # 应为 [2, 1000]


In [None]:
nc: [10, 5]  # 两个任务的类别数

stem:
  - {type: "image", channels: 3, height: 32, width: 32}  # 输入图像1
  - {type: "image", channels: 3, height: 32, width: 32}  # 输入图像2

backbone:
  # 对图像1处理
  - {from: [0], module: Conv2d, args: {in_channels: 3, out_channels: 16, kernel_size: 3, stride: 1, padding: 1}}
  - {from: [-1], module: ReLU, args: {}}
  - {from: [-1], module: MaxPool2d, args: {kernel_size: 2, stride: 2}}

  # 对图像2处理
  - {from: [1], module: Conv2d, args: {in_channels: 3, out_channels: 16, kernel_size: 3, stride: 1, padding: 1}}
  - {from: [-1], module: ReLU, args: {}}
  - {from: [-1], module: MaxPool2d, args: {kernel_size: 2, stride: 2}}

head:
  - {from: [2], module: LinearDyn, args: {out_features: 10}}  # 输出1
  - {from: [5], module: LinearDyn, args: {out_features: 5}}   # 输出2


In [None]:
stem:
  - {type: "image", channels: 3, height: 32, width: 32}  # 输入图像1
  - {type: "image", channels: 3, height: 32, width: 32}  # 输入图像2

backbone:
  - {from: [0], module: Conv2d, args: {in_channels: 3, out_channels: 16, kernel_size: 3, stride: 1, padding: 1}}
  - {from: [-1], module: ReLU, args: {}}
  - {from: [-1], module: MaxPool2d, args: {kernel_size: 2, stride: 2}}

  - {from: [1], module: Conv2d, args: {in_channels: 3, out_channels: 16, kernel_size: 3, stride: 1, padding: 1}}
  - {from: [-1], module: ReLU, args: {}}
  - {from: [-1], module: MaxPool2d, args: {kernel_size: 2, stride: 2}}

head:
  - {from: [2], module: Flatten, args: {}}
  - {from: [-1], module: Linear, args: {in_features: 16*16*16, out_features: 10}}  # 输出1
  - {from: [5], module: Flatten, args: {}}
  - {from: [-1], module: Linear, args: {in_features: 16*16*16, out_features: 5}}   # 输出2



In [None]:
import torch
import torch.nn as nn
import yaml

def build_model(cfg):
    layers_cfg = cfg['backbone'] + cfg['head']
    from_list = [l['from'] for l in layers_cfg]
    layers = nn.ModuleList()
    
    for l in layers_cfg:
        module_cls = getattr(nn, l['module'])
        layers.append(module_cls(**l['args']))
    
    def forward_fn(inputs):
        outputs = {i: inp for i, inp in enumerate(inputs)}
        results = []
        for i, layer in enumerate(layers):
            f = from_list[i]
            f = f if isinstance(f, list) else [f]
            x = torch.cat([outputs[j] for j in f], dim=1) if len(f) > 1 else outputs[f[0]]
            out = layer(x)
            outputs[len(outputs)] = out
            # 收集 head 输出
            if isinstance(layer, nn.Linear):
                results.append(out)
        return results

    model = nn.Module()
    model.layers = layers
    model.forward = forward_fn
    return model

# ----------------------------
# 测试流程
# ----------------------------
if __name__ == "__main__":
    with open("simple_twoinput_minimal.yaml") as f:
        cfg = yaml.safe_load(f)

    model = build_model(cfg)
    img1 = torch.randn(2, 3, 32, 32)
    img2 = torch.randn(2, 3, 32, 32)
    out1, out2 = model([img1, img2])

    print("输出1 shape:", out1.shape)  # [2, 10]
    print("输出2 shape:", out2.shape)  # [2, 5]


In [None]:
nc: [10, 5]  # 两个任务的分类数

# stem 定义输入
stem:
  - {type: "image", channels: 3, height: 32, width: 32}  # 输入1
  - {type: "image", channels: 3, height: 32, width: 32}  # 输入2

# backbone 定义中间层
backbone:
  # 输入1
  - {from: [0], module: Conv, args: {out_channels: 16, kernel_size: 3, stride: 1, padding: 1}}
  - {from: [-1], module: ReLU, args: {}}
  - {from: [-1], module: MaxPool, args: {kernel_size: 2, stride: 2}}

  # 输入2
  - {from: [1], module: Conv, args: {out_channels: 16, kernel_size: 3, stride: 1, padding: 1}}
  - {from: [-1], module: ReLU, args: {}}
  - {from: [-1], module: MaxPool, args: {kernel_size: 2, stride: 2}}

  # 拼接输入1和输入2
  - {from: [2,5], module: "Concat", args: {dim: 1}}

head:
  - {from: [-1], module: FC, args: {out_features: 10}}  # 输出1
  - {from: [-2], module: FC, args: {out_features: 5}}   # 输出2
  
import torch
import torch.nn as nn
import yaml

# ----------------------------
# 工厂函数
# ----------------------------
def module_factory(module_name, args, prev_out=None):
    """仅生成可训练模块，Concat 不在此处理"""
    if module_name == "Conv":
        in_ch = prev_out.shape[1] if prev_out is not None else args.get('in_channels')
        return nn.Conv2d(in_channels=in_ch, **{k: v for k, v in args.items() if k != 'in_channels'})
    
    elif module_name == "BatchNorm":
        num_features = prev_out.shape[1] if prev_out is not None else args['num_features']
        return nn.BatchNorm2d(num_features)
    
    elif module_name == "ReLU":
        return nn.ReLU(inplace=True)
    
    elif module_name == "MaxPool":
        return nn.MaxPool2d(**args)
    
    elif module_name == "GlobalAvgPool":
        return nn.AdaptiveAvgPool2d((1, 1))
    
    elif module_name == "FC":
        in_features = prev_out.numel() // prev_out.shape[0] if prev_out is not None else args['in_features']
        return nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features, args['out_features'])
        )
    
    elif module_name == "ResBlock":
        return ResBlock(**args)  # 自定义模块
    
    else:
        raise ValueError(f"Unknown module: {module_name}")

# ----------------------------
# 构建模型
# ----------------------------
def build_model(cfg):
    layers_cfg = cfg['backbone'] + cfg['head']
    from_list = [l['from'] for l in layers_cfg]

    class Net(nn.Module):
        def __init__(self):
            super().__init__()
            self.layers_cfg = layers_cfg
            self.from_list = from_list
            self.layers = nn.ModuleList()
            for cfg in layers_cfg:
                if cfg['module'] != 'Concat':
                    # 用 Identity 占位，真实层在 forward 动态生成
                    self.layers.append(nn.Identity())
                else:
                    self.layers.append(None)

        def forward(self, inputs):
            outputs = {i: inp for i, inp in enumerate(inputs)}
            results = []
            for i, cfg in enumerate(self.layers_cfg):
                f = self.from_list[i]
                f = f if isinstance(f, list) else [f]
                
                # 如果是多输入拼接
                if cfg['module'] == 'Concat':
                    out = torch.cat([outputs[j] for j in f], dim=cfg['args'].get('dim',1))
                else:
                    x = torch.cat([outputs[j] for j in f], dim=1) if len(f) > 1 else outputs[f[0]]
                    layer = module_factory(cfg['module'], cfg['args'], prev_out=x)
                    out = layer(x)
                
                outputs[len(outputs)] = out
                
                # 如果是 FC，认为是 head 输出
                if cfg['module'] == 'FC':
                    results.append(out)
            return results

    return Net()

# ----------------------------
# 测试
# ----------------------------
if __name__ == "__main__":
    with open("multiinput_multioutput_concat.yaml") as f:
        cfg = yaml.safe_load(f)
    
    model = build_model(cfg)
    img1 = torch.randn(2, 3, 32, 32)
    img2 = torch.randn(2, 3, 32, 32)
    out1, out2 = model([img1, img2])

    print("输出1 shape:", out1.shape)  # [2, 10]
    print("输出2 shape:", out2.shape)  # [2, 5]
