In [1]:
import random
import numpy as np
import torch

def set_seed(seed=42):
    random.seed(seed)                      # Python 随机模块
    np.random.seed(seed)                   # Numpy 随机模块
    torch.manual_seed(seed)                # PyTorch CPU 随机种子
    torch.cuda.manual_seed(seed)           # PyTorch 当前 GPU 随机种子
    torch.cuda.manual_seed_all(seed)       # 所有 GPU 随机种子（多卡时）

    # 保证每次返回的卷积算法确定（固定 cudnn 的随机性）
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

    print(f"Random seed set as {seed}")

# 调用种子设置
set_seed(42)

Random seed set as 42


In [2]:
import os
import h5py
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

# 数据集路径
dataset_path = "/home/free/prj2/finn/GOLD_XYZ_OSC.0001_1024.hdf5"

class RadioMLDataset(Dataset):
    def __init__(self, split="train", sample_ratio=0.1, target_snr=30):
        """
        新增target_snr参数：筛选指定SNR的样本（如30dB）
        """
        assert os.path.exists(dataset_path), f"数据集不存在：{dataset_path}"
        self.h5_file = h5py.File(dataset_path, 'r')
        
        # 读取所有样本的SNR值（Z维度）
        self.snr_all = self.h5_file['Z'][:].squeeze()  # 形状：(N,)，每个样本的SNR值
        self.target_snr = target_snr
        
        # 步骤1：筛选出30dB SNR的样本索引
        snr_indices = np.where(self.snr_all == self.target_snr)[0]
        print(f"30dB SNR的样本总数：{len(snr_indices)}")
        
        # 步骤2：按比例抽样（如10%），若样本不足则全部使用
        sample_num = min(int(len(snr_indices)*sample_ratio), len(snr_indices))
        sampled_indices = np.random.choice(snr_indices, size=sample_num, replace=False)
        
        # 步骤3：划分训练/测试集（9:1）
        np.random.shuffle(sampled_indices)
        split_idx = int(0.9 * len(sampled_indices))
        self.indices = sampled_indices[:split_idx] if split == "train" else sampled_indices[split_idx:]
        
        # 读取数据和标签（仅筛选后的样本）
        self.data = self.h5_file['X']  # (N, 1024, 2)
        self.labels = np.argmax(self.h5_file['Y'][:], axis=1)  # 0-23

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        real_idx = self.indices[idx]
        # 读取样本并转置为(2, 1024)，归一化
        x = self.data[real_idx].astype(np.float32).transpose(1, 0)
        x = x / np.max(np.abs(x))  # 信号归一化
        y = self.labels[real_idx]
        return torch.from_numpy(x), torch.tensor(y, dtype=torch.long)

    def __del__(self):
        self.h5_file.close()

# 创建30dB SNR的数据集
train_dataset = RadioMLDataset(split="train", sample_ratio=1.0, target_snr=30)  # sample_ratio=1.0：使用所有30dB样本
test_dataset = RadioMLDataset(split="test", sample_ratio=1.0, target_snr=30)

# 数据加载器
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print(f"30dB SNR训练集大小：{len(train_dataset)} | 测试集大小：{len(test_dataset)}")
print(f"输入形状：{train_dataset[0][0].shape} | 标签示例：{train_dataset[0][1]}")


30dB SNR的样本总数：98304
30dB SNR的样本总数：98304
30dB SNR训练集大小：88473 | 测试集大小：9831
输入形状：torch.Size([2, 1024]) | 标签示例：16


In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class VGG10_1D(nn.Module):
    def __init__(self, num_classes=24, in_channels=2):
        super(VGG10_1D, self).__init__()
        # 卷积块1: 2→64
        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels, 64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.Conv1d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        # 卷积块2: 64→128
        self.conv2 = nn.Sequential(
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.Conv1d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        # 卷积块3: 128→256
        self.conv3 = nn.Sequential(
            nn.Conv1d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            nn.Conv1d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        # 卷积块4: 256→512
        self.conv4 = nn.Sequential(
            nn.Conv1d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Conv1d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        # 卷积块5: 512→512（VGG10共10层卷积）
        self.conv5 = nn.Sequential(
            nn.Conv1d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Conv1d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        # 全连接层（输入维度计算：1024经过5次池化→1024/(2^5)=32，512*32=16384）
        self.fc_layers = nn.Sequential(
            nn.Linear(512 * 32, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, num_classes)
        )

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = x.view(x.size(0), -1)  # 展平
        x = self.fc_layers(x)
        return x

# 初始化模型
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = VGG10_1D(num_classes=24).to(device)
print(f"Model initialized on {device}")


Model initialized on cuda


In [4]:
import torch.optim as optim

# 损失函数与优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=5e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)  # 学习率衰减
def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    total_loss = 0.0
    correct = 0
    total = 0
    for batch_idx, (data, target) in enumerate(loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        _, predicted = output.max(1)
        total += target.size(0)
        correct += predicted.eq(target).sum().item()
    
    avg_loss = total_loss / len(loader)
    accuracy = 100. * correct / total
    return avg_loss, accuracy

# 测试函数
def test_epoch(model, loader, criterion, device):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            total_loss += loss.item()
            _, predicted = output.max(1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()
    
    avg_loss = total_loss / len(loader)
    accuracy = 100. * correct / total
    return avg_loss, accuracy
num_epochs = 20
best_acc = 0.0

for epoch in range(num_epochs):
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    test_loss, test_acc = test_epoch(model, test_loader, criterion, device)
    scheduler.step()  # 更新学习率
    
    print(f"Epoch [{epoch+1}/{num_epochs}]")
    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}%")
    print(f"Test Loss: {test_loss:.4f} | Test Acc: {test_acc:.2f}%")
    
    # 保存最优模型
    if test_acc > best_acc:
        best_acc = test_acc
        torch.save(model.state_dict(), "vgg10_1d_best.pth")
        print(f"Best model saved (Acc: {best_acc:.2f}%)")

print(f"Training finished. Best Test Acc: {best_acc:.2f}%")


Epoch [1/20]
Train Loss: 1.1811 | Train Acc: 51.51%
Test Loss: 0.8223 | Test Acc: 62.12%
Best model saved (Acc: 62.12%)
Epoch [2/20]
Train Loss: 0.8869 | Train Acc: 61.27%
Test Loss: 0.7337 | Test Acc: 66.45%
Best model saved (Acc: 66.45%)
Epoch [3/20]
Train Loss: 0.7524 | Train Acc: 66.45%
Test Loss: 0.6213 | Test Acc: 69.21%
Best model saved (Acc: 69.21%)
Epoch [4/20]
Train Loss: 0.6315 | Train Acc: 70.60%
Test Loss: 0.5024 | Test Acc: 75.03%
Best model saved (Acc: 75.03%)
Epoch [5/20]
Train Loss: 0.5646 | Train Acc: 73.15%
Test Loss: 0.4817 | Test Acc: 75.74%
Best model saved (Acc: 75.74%)
Epoch [6/20]
Train Loss: 0.5208 | Train Acc: 74.93%
Test Loss: 0.4424 | Test Acc: 78.67%
Best model saved (Acc: 78.67%)
Epoch [7/20]
Train Loss: 0.4910 | Train Acc: 76.12%
Test Loss: 0.4255 | Test Acc: 78.74%
Best model saved (Acc: 78.74%)
Epoch [8/20]
Train Loss: 0.4708 | Train Acc: 76.98%
Test Loss: 0.3606 | Test Acc: 82.02%
Best model saved (Acc: 82.02%)
Epoch [9/20]
Train Loss: 0.4431 | Train 

In [5]:
import torch
import torch.nn as nn

# 定义VGG10 1D模型结构（需与训练时一致）
class VGG10_1D(nn.Module):
    def __init__(self, num_classes=24, in_channels=2):
        super(VGG10_1D, self).__init__()
        # 卷积块1: 2→64
        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels, 64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.Conv1d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        # 卷积块2: 64→128
        self.conv2 = nn.Sequential(
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.Conv1d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        # 卷积块3: 128→256
        self.conv3 = nn.Sequential(
            nn.Conv1d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            nn.Conv1d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        # 卷积块4: 256→512
        self.conv4 = nn.Sequential(
            nn.Conv1d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Conv1d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        # 卷积块5: 512→512
        self.conv5 = nn.Sequential(
            nn.Conv1d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Conv1d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        # 全连接层
        self.fc_layers = nn.Sequential(
            nn.Linear(512 * 32, 4096),  # 1024/(2^5)=32
            nn.ReLU(inplace=True),
            nn.Dropout(0.0),  # FINN建议移除Dropout
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.0),
            nn.Linear(4096, num_classes)
        )

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = x.view(x.size(0), -1)  # 展平
        x = self.fc_layers(x)
        return x

# 初始化模型并加载权重
device = torch.device("cpu")  # FINN导出建议用CPU
model = VGG10_1D(num_classes=24).to(device)
model.load_state_dict(torch.load("vgg10_1d_best.pth", map_location=device))
model.eval()  # 评估模式（关闭Dropout/BatchNorm训练态）

# 导出ONNX（FINN要求静态输入形状，batch_size设为1）
input_shape = (1, 2, 1024)  # [batch, in_channels, seq_len]
dummy_input = torch.randn(input_shape).to(device)

# 导出ONNX（关键参数：opset_version=11，去掉动态维度）
torch.onnx.export(
    model,
    dummy_input,
    "vgg10_1d_finn.onnx",  # 输出文件名
    input_names=["input"],
    output_names=["output"],
    opset_version=11,  # FINN兼容的ONNX版本
    do_constant_folding=True,  # 常量折叠优化
    dynamic_axes=None  # 禁用动态维度（FINN要求静态形状）
)

print("ONNX模型导出完成：vgg10_1d_finn.onnx")


ONNX模型导出完成：vgg10_1d_finn.onnx


In [1]:
import warnings

from transformation.base import Transformation
from transformation.general import RemoveUnusedTensors
from transformation.infer_shapes import InferShapes

def get_by_name(container, name, name_field="name"):
    """Return item from container by .name field if it exists, None otherwise.
    Will throw an Exception if multiple items are found, since this violates the
    ONNX standard."""
    names = [getattr(x, name_field) for x in container]

    inds = [i for i, e in enumerate(names) if e == name]
    if len(inds) > 1:
        raise Exception("Found multiple get_by_name matches, undefined behavior")
    elif len(inds) == 0:
        return None
    else:
        ind = inds[0]
        return container[ind]

def _find_invalid_nodes(model):
    """
    Check whether the graph contains any node types that are not supported by the
    Change3Dto4DTensors transformation.

    """
    valid_nodes = [
        "Add",
        "Mul",
        "BatchNormalization",
        "MultiThreshold",
        "Conv",
        "Transpose",
        "LogSoftmax",
        "ArgMax",
        "Div",
        "TopK",
        "MatMul",
        "Flatten",
        "Reshape",
        "MaxPool",
        "Relu",    # 无维度属性，仅逐元素运算，不影响4D转换
        "Constant",# 常量节点，无维度依赖
        "Gemm"     # 矩阵乘法，1D场景下可兼容4D张量
    ]
    invalid_nodes = []
    for n in model.graph.node:
        node_op_type = n.op_type
        if node_op_type in valid_nodes:
            continue
        else:
            invalid_nodes.append(node_op_type)

    return invalid_nodes


class Change3DTo4DTensors(Transformation):
    """
    Replaces 3D tensors with 4D tensors assuming the following format:
    [N, C, H] -> [N, C, H, 1].
    The attributes of a (specific) set of supported nodes are changed accordingly.
    If the graph contains unsupported nodes, a warning is raised and the transformation
    is not applied.
    """

    def apply(self, model):
        graph_modified = False

        invalid_nodes = _find_invalid_nodes(model)
        if len(invalid_nodes) > 0:
            warnings.warn(
                "Transformation is not applied,\
                 found unsupported nodes in the graph: {}.".format(
                    invalid_nodes
                )
            )
            return (model, graph_modified)

        # Infer the shapes of each tensor, remove unused tensors
        # and give each tensor a readable name
        model = model.transform(InferShapes())
        model = model.transform(RemoveUnusedTensors())

        # This list contains all nodes with initializers that need to be converted
        nodes_with_initializers = ["Mul", "Conv", "Add", "Div", "Reshape"]
        # Obtain a list of initializer names (used to filter out only value infos)
        initializers_names = [x.name for x in model.graph.initializer]

        all_tensors = {}
        # Extract the inputs
        all_tensors = {
            **all_tensors,
            **{
                x.name: [x.type.tensor_type.elem_type, model.get_tensor_shape(x.name)]
                for x in model.graph.input
            },
        }
        # Extract only the output tensors
        all_tensors = {
            **all_tensors,
            **{
                x.name: [x.type.tensor_type.elem_type, model.get_tensor_shape(x.name)]
                for x in model.graph.value_info
                if x.name not in initializers_names
            },
        }
        # Extract only initializers from nodes that are relevant for conversion
        all_tensors = {
            **all_tensors,
            **{
                x.name: [x.data_type, x.dims]
                for x in model.graph.initializer
                if model.find_consumer(x.name).op_type in nodes_with_initializers
            },
        }
        # Extract the outputs
        all_tensors = {
            **all_tensors,
            **{
                x.name: [x.type.tensor_type.elem_type, model.get_tensor_shape(x.name)]
                for x in model.graph.output
            },
        }

        # The list below contains tensor names that are the output of nodes that
        # reduce the tensor's dimension. The shape of these tensors also needs
        # to be extended
        tensors_reduced_dimension = []
        for n in model.graph.node:
            node_op_type = n.op_type
            input_shape = model.get_tensor_shape(n.input[0])
            # Find tensors that are the output of nodes that reduce the dimension
            if node_op_type == "ArgMax":
                keep_dims = get_by_name(n.attribute, "keepdims", "name").i
                if len(input_shape) == 3 and keep_dims == 0:
                    node_out = n.output
                    for n_o in node_out:
                        tensors_reduced_dimension.append(n_o)
            # Each node from the list of supported nodes is made compatible
            # with 4D tensors
            if node_op_type == "Transpose":
                perm = get_by_name(n.attribute, "perm", "name").ints
                if (
                    len(perm) == 3
                ):  # Meaning that the transpose operation was on a 3D tensor
                    perm.append(3)  # append 4th dimension
            elif node_op_type in ["ArgMax", "LogSoftMax", "TopK", "Flatten"]:
                axis = get_by_name(n.attribute, "axis", "name")
                if len(input_shape) == 3 and axis.i < 0:
                    axis.i = 3 + axis.i  # count dimensions from the front
            elif node_op_type == "Conv":
                dilations = get_by_name(n.attribute, "dilations", "name").ints
                kernel_shape = get_by_name(n.attribute, "kernel_shape", "name").ints
                pads = get_by_name(n.attribute, "pads", "name").ints
                strides = get_by_name(n.attribute, "strides", "name").ints
                if len(dilations) == 1:  # we must add another dimension to it
                    dilations.append(
                        1
                    )  # only equal dilation value along each spatial axis is supported
                if len(kernel_shape) == 1:  # we must add another dimension to it
                    kernel_shape.append(1)
                if (
                    len(pads) == 2
                ):  # pads = [x1_begin, x1_end] --> [x1_begin, x2_begin, x1_end, x2_end]
                    pads.insert(1, 0)
                    pads.append(0)
                if len(strides) == 1:  # strides = [stride_h, stride_w]
                    strides.append(1)
            elif node_op_type == "MaxPool":
                kernel_shape = get_by_name(n.attribute, "kernel_shape", "name").ints
                pads = get_by_name(n.attribute, "pads", "name").ints
                strides = get_by_name(n.attribute, "strides", "name").ints
                if len(kernel_shape) == 1:  # we must add another dimension to it
                    kernel_shape.append(1)
                if (
                    len(pads) == 2
                ):  # pads = [x1_begin, x1_end] --> [x1_begin, x2_begin, x1_end, x2_end]
                    pads.insert(1, 0)
                    pads.append(0)
                if len(strides) == 1:  # strides = [stride_h, stride_w]
                    strides.append(1)

        # Change format of each input/value_info/output tensor
        for k, v in all_tensors.items():
            tensor_type = v[0]
            shape = v[1]
            # Add extra dimension for tensors that either:
            # 1) Have 3 dimensions ( (N,C,H) -> (N,C,H,1) )
            # 2) Come after operations that reduce their dimension: e.g. {Argmax, ...}
            if len(shape) == 3 or k in tensors_reduced_dimension:
                shape.append(1)
                model.set_tensor_shape(k, shape, tensor_type)

        return (model, graph_modified)

In [2]:
from qonnx.core.modelwrapper import ModelWrapper
from finn.builder.build_dataflow_config import DataflowBuildConfig
from qonnx.transformation.general import GiveUniqueNodeNames
import finn.transformation.streamline.absorb as absorb
import finn.transformation.fpgadataflow.convert_to_hw_layers as to_hw
# 导入自定义的Transformation类


def step_pre_streamline(model: ModelWrapper, cfg: DataflowBuildConfig):
    model = model.transform(Change3DTo4DTensors())
    # 吸收标量运算到TopK层
    model = model.transform(absorb.AbsorbScalarMulAddIntoTopK())
    return model
def step_convert_final_layers(model: ModelWrapper, cfg: DataflowBuildConfig):
    model = model.transform(to_hw.InferChannelwiseLinearLayer())
    model = model.transform(to_hw.InferLabelSelectLayer())
    model = model.transform(GiveUniqueNodeNames())
    return model


In [None]:
# Copyright (c) 2020, Xilinx
# All rights reserved.
# （省略BSD协议声明，实际使用时需保留）

import os
import shutil

import finn.builder.build_dataflow as build
import finn.builder.build_dataflow_config as build_cfg
from finn.util.basic import alveo_default_platform



# 模型名称与平台配置
model_name = "vgg10_1d_finn"
zynq_platforms = ["PYNQ-Z2"]  # 目标平台为PYNQ-Z2
alveo_platforms = []
platforms_to_build = zynq_platforms + alveo_platforms

# 平台到Shell流类型的映射
def platform_to_shell(platform):
    if platform in zynq_platforms:
        return build_cfg.ShellFlowType.VIVADO_ZYNQ
    elif platform in alveo_platforms:
        return build_cfg.ShellFlowType.VITIS_ALVEO
    else:
        raise Exception(f"Unknown platform: {platform}")

# 选择目标时钟周期（PYNQ-Z2推荐7ns~10ns，此处选8ns即125MHz，稳定性优先）
def select_clk_period(platform):
    if platform == "PYNQ-Z2":
        return 8.0  # 125MHz
    else:
        return 4.0  # 其他平台默认250MHz

# 构建步骤选择（适配PYNQ-Z2）
def select_build_steps(platform):
    steps = [
        "step_tidy_up",
        step_pre_streamline,
        "step_streamline",
        "step_convert_to_hw",
        step_convert_final_layers,
        "step_create_dataflow_partition",
        "step_specialize_layers",
        "step_target_fps_parallelization",
        "step_apply_folding_config",
        "step_minimize_bit_width",
        "step_generate_estimate_reports",
        "step_hw_codegen",
        "step_hw_ipgen",
        "step_set_fifo_depths",
        "step_create_stitched_ip",
        "step_measure_rtlsim_performance",  # 可选，若需要RTLSIM性能测试
        "step_out_of_context_synthesis",
        "step_synthesize_bitfile",
        "step_make_pynq_driver",
        "step_deployment_package",
    ]
    return steps

# 创建发布目录
os.makedirs("release", exist_ok=True)

# 遍历目标平台构建
for platform_name in platforms_to_build:
    shell_flow_type = platform_to_shell(platform_name)
    # PYNQ-Z2作为Zynq平台，直接使用板名作为发布目录名
    release_platform_name = platform_name
    platform_dir = f"release/{release_platform_name}"
    os.makedirs(platform_dir, exist_ok=True)

    vitis_opt = build_cfg.VitisOptStrategyCfg.SIZE

    cfg = build_cfg.DataflowBuildConfig(
        steps=select_build_steps(platform_name),
        output_dir=f"output_{model_name}_{release_platform_name}",
        synth_clk_period_ns=select_clk_period(platform_name),
        board=platform_name,
        shell_flow_type=shell_flow_type,
        vitis_platform=None,  # PYNQ-Z2为Vivado Zynq流，无需Vitis平台
    # 层特化配置文件（若未创建，先注释掉这行，避免文件找不到报错）
    # specialize_layers_config_file=f"specialize_layers_config/{platform_name}_specialize_layers.json",
    # 折叠配置文件（必须存在，即使是空文件）
        folding_config_file=f"folding_config/{platform_name}_folding_config.json",
        split_large_fifos=True,  # 拆分大FIFO以适配PYNQ-Z2的BRAM资源
        standalone_thresholds=True,
    # 替换为你版本支持的枚举值（SIZE适配Z2资源）
        vitis_opt_strategy=vitis_opt,
    # 生成的输出产物（重点包含PYNQ驱动和部署包）
        generate_outputs=[
            build_cfg.DataflowOutputType.ESTIMATE_REPORTS,
            build_cfg.DataflowOutputType.STITCHED_IP,
            build_cfg.DataflowOutputType.RTLSIM_PERFORMANCE,
            build_cfg.DataflowOutputType.BITFILE,
            build_cfg.DataflowOutputType.DEPLOYMENT_PACKAGE,
            build_cfg.DataflowOutputType.PYNQ_DRIVER,
    ],
   
)



# 模型文件路径（根目录）
model_file = "vgg10_1d_finn.onnx"
if not os.path.exists(model_file):
        raise FileNotFoundError(f"Model file {model_file} not found!")

    # 执行数据流构建
build.build_dataflow_cfg(model_file, cfg)

    # 复制产物到发布目录
bitfile_gen_dir = os.path.join(cfg.output_dir, "bitfile")
files_to_copy = [
        "finn-accel.bit",    # 比特流文件
        "finn-accel.hwh",    # 硬件描述文件（PYNQ需要）
        "finn-accel.xclbin", # Alveo用，PYNQ-Z2可忽略
    ]
for f in files_to_copy:
        src = os.path.join(bitfile_gen_dir, f)
        if os.path.exists(src):
            dst = os.path.join(platform_dir, f.replace("finn-accel", model_name))
            shutil.copy(src, dst)
            print(f"Copied {f} to {dst}")

    # 复制运行时权重和PYNQ驱动
weight_gen_dir = os.path.join(cfg.output_dir, "driver", "runtime_weights")
if os.path.exists(weight_gen_dir):
        weight_dst_dir = os.path.join(platform_dir, f"{model_name}_runtime_weights")
        if os.path.exists(weight_dst_dir):
            shutil.rmtree(weight_dst_dir)
        shutil.copytree(weight_gen_dir, weight_dst_dir)
        print(f"Copied runtime weights to {weight_dst_dir}")

driver_gen_dir = os.path.join(cfg.output_dir, "driver")
driver_files = ["finn_driver.py", "finn_util.py"]  # PYNQ驱动核心文件
for df in driver_files:
        src = os.path.join(driver_gen_dir, df)
        if os.path.exists(src):
            dst = os.path.join(platform_dir, df.replace("finn", model_name))
            shutil.copy(src, dst)
            print(f"Copied PYNQ driver {df} to {dst}")


Building dataflow accelerator from vgg10_1d_finn.onnx
Intermediate outputs will be generated in /tmp/finn_dev_root
Final outputs will be generated in output_vgg10_1d_finn_PYNQ-Z2
Build log is at output_vgg10_1d_finn_PYNQ-Z2/build_dataflow.log
Running step: step_tidy_up [1/20]
Running step: step_pre_streamline [2/20]
Running step: step_streamline [3/20]
Running step: step_convert_to_hw [4/20]
Running step: step_convert_final_layers [5/20]
Running step: step_create_dataflow_partition [6/20]
> [0;32m/home/free/prj2/finn/deps/qonnx/src/qonnx/transformation/create_generic_partitions.py[0m(119)[0;36mapply[0;34m()[0m
[0;32m    117 [0;31m                    [0;32mif[0m [0mnode[0m [0;32mis[0m [0;32mnot[0m [0;32mNone[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m    118 [0;31m                        assert (
[0m[0;32m--> 119 [0;31m                            [0mself[0m[0;34m.[0m[0mpartitioning[0m[0;34m([0m[0mnode[0m[0;34m)[0m [0;34m!=[0m [0mpartition_id

Traceback (most recent call last):
  File "/home/free/prj2/finn/src/finn/builder/build_dataflow.py", line 158, in build_dataflow_cfg
    model = transform_step(model, cfg)
  File "/home/free/prj2/finn/src/finn/builder/build_dataflow_steps.py", line 372, in step_create_dataflow_partition
    parent_model = model.transform(
  File "/home/free/prj2/finn/deps/qonnx/src/qonnx/core/modelwrapper.py", line 140, in transform
    (transformed_model, model_was_changed) = transformation.apply(transformed_model)
  File "/home/free/prj2/finn/src/finn/transformation/fpgadataflow/create_dataflow_partition.py", line 80, in apply
    parent_model = model.transform(
  File "/home/free/prj2/finn/deps/qonnx/src/qonnx/core/modelwrapper.py", line 140, in transform
    (transformed_model, model_was_changed) = transformation.apply(transformed_model)
  File "/home/free/prj2/finn/deps/qonnx/src/qonnx/transformation/create_generic_partitions.py", line 119, in apply
    self.partitioning(node) != partition_id
Asse