In [14]:
from onnx import TensorProto, helper
import numpy as np

from finn.core.datatype import DataType
from finn.transformation.infer_shapes import InferShapes
from finn.transformation.infer_datatypes import InferDataTypes
from finn.transformation.general import GiveUniqueNodeNames
from finn.transformation.lower_convs_to_matmul import LowerConvsToMatMul

from finn.transformation.fpgadataflow.prepare_ip import PrepareIP
from finn.transformation.fpgadataflow.prepare_rtlsim import PrepareRTLSim
from finn.transformation.fpgadataflow.hlssynth_ip import HLSSynthIP
import finn.core.onnx_exec as oxe
from finn.core.modelwrapper import ModelWrapper
from finn.util.basic import gen_finn_dt_tensor
import finn.transformation.fpgadataflow.convert_to_hls_layers as to_hls

from finn.transformation.fpgadataflow.prepare_cppsim import PrepareCppSim
from finn.transformation.fpgadataflow.compile_cppsim import CompileCppSim
from finn.transformation.fpgadataflow.set_exec_mode import SetExecMode
from finn.custom_op.general.im2col import compute_conv_output_dim
from finn.custom_op.registry import getCustomOp
from finn.analysis.fpgadataflow.exp_cycles_per_layer import exp_cycles_per_layer


def test_convert_to_hls_conv_layer(conv_config, depthwise, exec_mode):
    kernel_size, stride, pad, is_square_input = conv_config
    np.random.seed(0)
    idt = DataType.UINT4

    in_chn = 16
    k_h, k_w = kernel_size
    stride_h, stride_w = stride
    pad_h = pad[0] + pad[2]
    pad_w = pad[1] + pad[3]

    if is_square_input:
        in_feature_dim_h, in_feature_dim_w = [7, 7]
    else:
        in_feature_dim_h, in_feature_dim_w = [9, 7]
    
    if depthwise is True:
        group = out_chn = in_chn
        conv_param_shape = [out_chn, 1, k_h, k_w]
    else:
        group = 1
        out_chn = 20
        conv_param_shape = [out_chn, in_chn, k_h, k_w]

    out_feature_dim_h = compute_conv_output_dim(
        in_feature_dim_h, k_h, stride_h, pad_h
    )
    out_feature_dim_w = compute_conv_output_dim(
        in_feature_dim_w, k_w, stride_w, pad_w
    )

    input_shape = [1, in_chn, in_feature_dim_h, in_feature_dim_w]
    output_shape = [1, out_chn, out_feature_dim_h, out_feature_dim_w]

    conv_weight_dt = DataType.UINT4

    conv_config = {}
    conv_config["dilations"] = [1, 1]
    conv_config["group"] = group
    conv_config["kernel_shape"] = [k_h, k_w]
    conv_config["pads"] = pad
    conv_config["strides"] = [stride_h, stride_w]

    top_in = helper.make_tensor_value_info("top_in", TensorProto.FLOAT, input_shape)
    top_out = helper.make_tensor_value_info("top_out", TensorProto.FLOAT, output_shape)
    value_info = [
        helper.make_tensor_value_info("p1", TensorProto.FLOAT, conv_param_shape)
    ]

    modelproto = helper.make_model(
        helper.make_graph(
            name="conv_test",
            inputs=[top_in],
            outputs=[top_out],
            value_info=value_info,
            nodes=[
                helper.make_node("Conv", ["top_in", "p1"], ["top_out"], **conv_config)
            ],
        )
    )

    model = ModelWrapper(modelproto)
    model.set_tensor_datatype("top_in", idt)
    model.set_tensor_datatype("top_out", idt)
    model.set_tensor_datatype("p1", conv_weight_dt)
    model.set_initializer("p1", gen_finn_dt_tensor(conv_weight_dt, conv_param_shape))

    model = model.transform(InferShapes())
    model = model.transform(InferDataTypes())
    
    model.save("/tmp/test_original_model.onnx")

    new_model = model.transform(LowerConvsToMatMul())
    new_model = new_model.transform(to_hls.InferConvInpGen())
    if depthwise is True:
        new_model = new_model.transform(to_hls.InferVVAU())
    else:
        new_model = new_model.transform(to_hls.InferQuantizedStreamingFCLayer())
        fc_node = new_model.get_nodes_by_op_type("StreamingFCLayer_Batch")[0]
        fc_inst = getCustomOp(fc_node)
        mw = fc_inst.get_nodeattr("MW")
        mh = fc_inst.get_nodeattr("MH")
        pe_cands = list(filter(lambda x: mh % x == 0, range(2, mh + 1)))
        simd_cands = list(filter(lambda x: mw % x == 0, range(2, mw + 1)))
        fc_inst.set_nodeattr("PE", pe_cands[0])
        fc_inst.set_nodeattr("SIMD", simd_cands[0])

    new_model = new_model.transform(GiveUniqueNodeNames())
    new_model = new_model.transform(InferShapes())
    new_model = new_model.transform(InferDataTypes())

    if exec_mode == "cppsim":
        new_model = new_model.transform(PrepareCppSim())
        new_model = new_model.transform(CompileCppSim())
        new_model = new_model.transform(SetExecMode("cppsim"))
    elif exec_mode == "rtlsim":
        new_model = new_model.transform(SetExecMode("rtlsim"))
        new_model = new_model.transform(GiveUniqueNodeNames())
        new_model = new_model.transform(PrepareIP("xc7z020clg400-1", 5))
        new_model = new_model.transform(HLSSynthIP())
        new_model = new_model.transform(PrepareRTLSim())
    else:
        raise Exception("Unknown exec_mode")
        
    x = gen_finn_dt_tensor(idt, input_shape)
    inp_dict = {model.graph.input[0].name: x}
    
    output_model = oxe.execute_onnx(model, inp_dict, return_full_exec_context=False)[model.graph.output[0].name]
    output_newmodel = oxe.execute_onnx(new_model, inp_dict, return_full_exec_context=False)[new_model.graph.output[0].name]
    return output_model, output_newmodel
    #assert oxe.compare_execution(model, new_model, inp_dict)
    
    new_model.save("/tmp/test_transformed_model.onnx")
    
    if (k_h==k_w==1) and (stride_h > 1 or stride_w > 1) and (pad == 0):
        assert new_model.graph.node[1].op_type == "DownSampler"
        if exec_mode == "rtlsim":
            node = new_model.get_nodes_by_op_type("DownSampler")[0]
            inst = getCustomOp(node)
            cycles_rtlsim = inst.get_nodeattr("cycles_rtlsim")
            exp_cycles_dict = new_model.analysis(exp_cycles_per_layer)
            exp_cycles = exp_cycles_dict[node.name]
            assert np.isclose(exp_cycles, cycles_rtlsim, atol=11)
            assert exp_cycles != 0

    if pad_h == 1 and pad_w == 1:
        padding_node = new_model.get_nodes_by_op_type("FMPadding_Batch")[0]
        padding_inst = getCustomOp(padding_node)
        assert padding_inst.get_nodeattr("SIMD") == in_chn

    if depthwise is True and exec_mode == "rtlsim":
        node = new_model.get_nodes_by_op_type("Vector_Vector_Activate_Batch")[0]
        inst = getCustomOp(node)
        cycles_rtlsim = inst.get_nodeattr("cycles_rtlsim")
        exp_cycles_dict = new_model.analysis(exp_cycles_per_layer)
        exp_cycles = exp_cycles_dict[node.name]
        assert np.isclose(exp_cycles, cycles_rtlsim, atol=11)
        assert exp_cycles != 0
    

In [15]:
# conv_config:
#[kernel_size_h, kernel_size_w]
#[stride_h, stride_w]
#[pad_H_begin, pad_W_begin, pad_H_end, pad_W_end]
#is_sqaure_input
conv_config = [([1, 1], [2, 2], [0, 0, 0, 0], True),
               ([1, 1], [3, 3], [0, 0, 0, 0], True),
               ([3, 3], [2, 2], [1, 1, 1, 1], True),
               ([3, 3], [1, 1], [0, 0, 0, 0], True),
               ([3, 3], [1, 1], [1, 1, 1, 1], True),
               ([5, 5], [2, 2], [1, 1, 1, 1], True),
               ([3, 2], [2, 2], [1, 1, 1, 1], False),
               ([3, 3], [1, 1], [0, 0, 0, 0], False)
              ]

depthwise = [False, True]
exec_mode = ["cppsim", "rtlsim"]



output_model, output_newmodel = test_convert_to_hls_conv_layer(conv_config[-1], depthwise[1], exec_mode[0])

In [10]:
from finn.core.modelwrapper import ModelWrapper
from finn.util.basic import get_by_name
from finn.analysis.fpgadataflow.hls_synth_res_estimation import hls_synth_res_estimation
from finn.analysis.fpgadataflow.res_estimation import res_estimation

model = ModelWrapper("/tmp/test_transformed_model.onnx")

cycles_dict = {}
for n in model.graph.node:
    #print(n)
    backend = get_by_name(n.attribute, "backend", "name")
    if backend is not None:
        is_fpgadataflow_node = backend.s.decode('utf-8')=="fpgadataflow"
        if is_fpgadataflow_node:
            inst = getCustomOp(n)
            cycles_rtlsim = inst.get_nodeattr("cycles_rtlsim")
            cycles_dict[n.name] = cycles_rtlsim

print(cycles_dict)
print("-----\n")
            
hls_synth_resources = model.analysis(hls_synth_res_estimation)
expected_resources = model.analysis(res_estimation)

print(hls_synth_resources)
print("-----\n")
print(expected_resources)

{'FMPadding_Batch_0': 86, 'ConvolutionInputGenerator_0': 176, 'Vector_Vector_Activate_Batch_0': 150}
-----

{'FMPadding_Batch_0': {'BRAM_18K': '0', 'FF': '94', 'LUT': '279', 'DSP48E': '0', 'URAM': '0'}, 'ConvolutionInputGenerator_0': {'BRAM_18K': '0', 'FF': '1212', 'LUT': '1754', 'DSP48E': '0', 'URAM': '0'}, 'Vector_Vector_Activate_Batch_0': {'BRAM_18K': '0', 'FF': '982', 'LUT': '1507', 'DSP48E': '0', 'URAM': '0'}}
-----

{'FMPadding_Batch_0': {'BRAM_18K': 0, 'BRAM_efficiency': 1, 'LUT': 0, 'URAM': 0, 'URAM_efficiency': 1, 'DSP': 0}, 'ConvolutionInputGenerator_0': {'BRAM_18K': 0, 'BRAM_efficiency': 1, 'LUT': 620, 'URAM': 0, 'URAM_efficiency': 1, 'DSP': 0}, 'Vector_Vector_Activate_Batch_0': {'BRAM_18K': 0, 'BRAM_efficiency': 1, 'LUT': 997, 'URAM': 0, 'URAM_efficiency': 1, 'DSP': 0}}


In [12]:
from finn.util.visualization import showInNetron

showInNetron("/tmp/test_original_model.onnx")

Serving '/tmp/test_original_model.onnx' at http://0.0.0.0:8081


In [13]:
showInNetron("/tmp/test_transformed_model.onnx")

Stopping http://0.0.0.0:8081
Serving '/tmp/test_transformed_model.onnx' at http://0.0.0.0:8081
