In [None]:
from qonnx.core.modelwrapper import ModelWrapper

from qonnx.transformation.fold_constants import FoldConstants

from qonnx.transformation.general import (
    ConvertSubToAdd,
    ConvertDivToMul,
    GiveReadableTensorNames,
    GiveUniqueNodeNames,
    SortGraph,
    RemoveUnusedTensors,
    GiveUniqueParameterTensors,
    RemoveStaticGraphInputs,
    ApplyConfig,
)

from finn.transformation.streamline.absorb import (
    AbsorbScalarMulAddIntoTopK,
    AbsorbAddIntoMultiThreshold,
    AbsorbMulIntoMultiThreshold,
    FactorOutMulSignMagnitude,
    Absorb1BitMulIntoMatMul,
    Absorb1BitMulIntoConv,
    AbsorbConsecutiveTransposes,
    AbsorbTransposeIntoMultiThreshold,
    AbsorbSignBiasIntoMultiThreshold
)

from finn.transformation.streamline.collapse_repeated import (
    CollapseRepeatedAdd,
    CollapseRepeatedMul,
)

from finn.transformation.streamline.reorder import (
    MoveAddPastMul,
    MoveScalarMulPastMatMul,
    MoveScalarAddPastMatMul,
    MoveAddPastConv,
    MoveScalarMulPastConv,
    MoveScalarLinearPastInvariants,
    MoveMaxPoolPastMultiThreshold,
    MakeScaleResizeNHWC,
    MoveMulPastMaxPool,
    MakeMaxPoolNHWC,
)

from finn.transformation.streamline.round_thresholds import RoundAndClipThresholds
from finn.transformation.streamline.sign_to_thres import ConvertSignToThres
from qonnx.transformation.batchnorm_to_affine import BatchNormToAffine

from qonnx.transformation.make_input_chanlast import MakeInputChannelsLast

# just for not linear
from finn.transformation.streamline.reorder import (
    MoveLinearPastEltwiseAdd,
    MoveLinearPastFork,
)

from qonnx.transformation.double_to_single_float import DoubleToSingleFloat
from qonnx.transformation.remove import RemoveIdentityOps
from qonnx.core.datatype import DataType

from qonnx.transformation.infer_shapes import InferShapes
from qonnx.transformation.infer_datatypes import InferDataTypes
from qonnx.transformation.infer_data_layouts import InferDataLayouts
from qonnx.transformation.insert_topk import InsertTopK
import finn.transformation.fpgadataflow.convert_to_hls_layers as to_hls
from qonnx.transformation.lower_convs_to_matmul import LowerConvsToMatMul

from finn.builder.build_dataflow_config import (
    DataflowBuildConfig,
    ShellFlowType,
)

from finn.transformation.fpgadataflow.prepare_ip import PrepareIP
from finn.transformation.fpgadataflow.hlssynth_ip import HLSSynthIP
from finn.transformation.fpgadataflow.replace_verilog_relpaths import (
    ReplaceVerilogRelPaths,
)

from finn.transformation.move_reshape import RemoveCNVtoFCFlatten

from qonnx.util.config import extract_model_config_to_json
from finn.transformation.fpgadataflow.set_fifo_depths import (
    InsertAndSetFIFODepths,
    RemoveShallowFIFOs,
    SplitLargeFIFOs,
)
from finn.transformation.fpgadataflow.insert_dwc import InsertDWC
from finn.transformation.fpgadataflow.insert_fifo import InsertFIFO
from finn.transformation.fpgadataflow.create_dataflow_partition import CreateDataflowPartition
from finn.util.visualization import showSrc, showInNetron
from finn.custom_op.fpgadataflow.matrixvectoractivation import MatrixVectorActivation as MVAU
from finn.custom_op.fpgadataflow.thresholding_batch import Thresholding_Batch

In [None]:
import os
import onnx
import torch
import brevitas

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Target device: " + str(device))
print(torch.__version__)
print(brevitas.__version__)

In [None]:
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import brevitas.nn as qnn
from brevitas.quant import Int8Bias
from brevitas.core.quant import QuantType
from brevitas.quant import (Int8ActPerTensorFloat, 
                            SignedBinaryWeightPerTensorConst, 
                            SignedTernaryWeightPerTensorConst,
                            SignedBinaryActPerTensorConst)

set_weight_bit_width = 4
set_activation_bit_width = 4

class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        
        self.double_conv = nn.Sequential(
            
            qnn.QuantConv2d(in_channels, out_channels, kernel_size = 3, weight_bit_width=set_weight_bit_width,
                            padding = 1, bias=False, return_quant_tensor=True),
            #qnn.QuantConv2d(in_channels, out_channels, kernel_size = 3, weight_quant=SignedBinaryWeightPerTensorConst,
            #                padding = 1, bias=False, return_quant_tensor=True),
            #nn.BatchNorm2d(out_channels),
            qnn.QuantReLU(bit_width=set_activation_bit_width, return_quant_tensor=True),
            
            qnn.QuantConv2d(out_channels, out_channels, kernel_size = 3, weight_bit_width=set_weight_bit_width,
                            padding = 1, bias=False, return_quant_tensor=True),
            #qnn.QuantConv2d(out_channels, out_channels, kernel_size = 3, weight_quant=SignedBinaryWeightPerTensorConst,
            #                padding = 1, bias=False, return_quant_tensor=True),
            #nn.BatchNorm2d(out_channels),
            qnn.QuantReLU(bit_width=set_activation_bit_width, return_quant_tensor=True)
        )

    def forward(self, x):
        return self.double_conv(x)


class Encoder_1(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            
            nn.MaxPool2d(2), 
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)

class Encoder_2(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            
            nn.MaxPool2d(3), 
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Decoder_1(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        
        self.up = nn.Sequential(
            
            qnn.QuantConv2d(in_channels, in_channels//2, kernel_size=1, weight_bit_width=set_weight_bit_width, 
                            bias=False, return_quant_tensor=True),
            #qnn.QuantConv2d(in_channels, in_channels//2, kernel_size=1, weight_quant=SignedBinaryWeightPerTensorConst, 
            #                bias=False, return_quant_tensor=True),
            nn.BatchNorm2d(in_channels//2),
            qnn.QuantReLU(bit_width=set_activation_bit_width, return_quant_tensor=True),
            qnn.QuantUpsamplingNearest2d(scale_factor=3, return_quant_tensor=True)
        )
        self.conv = DoubleConv(in_channels//2, out_channels)
        self.quant_inp = qnn.QuantIdentity(act_quant = Int8ActPerTensorFloat, bit_width=set_activation_bit_width, return_quant_tensor=True)

    def forward(self, x1, x2):
        # x1 is the feature map after upsampling
        # x2 is the feature map from the corresponding layer in the downsampling path for concatenation
        x1 = self.up(x1)
        x1 = self.quant_inp(x1)
        x2 = self.quant_inp(x2)
        x = x1 + x2
        x = self.quant_inp(x)
        
        return self.conv(x)


class Decoder_2(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.up = nn.Sequential(
            
            qnn.QuantConv2d(in_channels, in_channels, kernel_size=1, weight_bit_width=set_weight_bit_width, 
                            bias=False,  return_quant_tensor=True),
            #qnn.QuantConv2d(in_channels, in_channels, kernel_size=1, weight_quant=SignedBinaryWeightPerTensorConst, 
            #                bias=False,  return_quant_tensor=True),
            nn.BatchNorm2d(in_channels),
            qnn.QuantReLU(bit_width=set_activation_bit_width, return_quant_tensor=True),
            qnn.QuantUpsamplingNearest2d(scale_factor=2, return_quant_tensor=True)
        )
        self.conv = DoubleConv(in_channels, out_channels)
        self.quant_inp = qnn.QuantIdentity(act_quant = Int8ActPerTensorFloat, bit_width=set_activation_bit_width, return_quant_tensor=True)
 
    def forward(self, x1, x2):
        # x1 is the feature map after upsampling
        # x2 is the feature map from the corresponding layer in the downsampling path for concatenation
        x1 = self.up(x1)
        x1 = self.quant_inp(x1)
        x2 = self.quant_inp(x2)
        x = x1 + x2
        x = self.quant_inp(x)
        
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv = qnn.QuantConv2d(in_channels, out_channels, kernel_size = 1, weight_bit_width=set_weight_bit_width, bias=False)
        #self.conv = qnn.QuantConv2d(in_channels, out_channels, kernel_size = 1, weight_quant=SignedBinaryWeightPerTensorConst, bias=False)

    def forward(self, x):
        return self.conv(x)


In [None]:
class QuantWeightActUNet_one4all(nn.Module):
    def __init__(self, input_channels, output_channels):
        super().__init__()
        self.input_channels = input_channels
        self.output_channels = output_channels

        self.quant_inp = qnn.QuantIdentity(bit_width=4, return_quant_tensor=True)
        self.input_double_conv = DoubleConv(input_channels, 16)
        self.encoder_block1 = Encoder_1(16, 32)
        self.encoder_block2 = Encoder_2(32, 64)
        self.decoder_block3 = Decoder_1(64, 16)
        self.decoder_block4 = Decoder_2(16, 8)
        self.output_conv = OutConv(8, output_channels)

    def forward(self, x):
        x = self.quant_inp(x)
        x1 = self.input_double_conv(x)
        x2 = self.encoder_block1(x1)
        x3 = self.encoder_block2(x2)
        x = self.decoder_block3(x3, x2)
        x = self.decoder_block4(x, x1)
        out = self.output_conv(x)
        return out

In [None]:
model = QuantWeightActUNet_one4all(2, 4)

In [None]:
import numpy as np
from torch.utils.data import TensorDataset

data_list = []
ancilla_list = []

num_samples = 1000

save_dir = "../rotSC_sample4D_01p/"

for i in range(num_samples):
    filename_data = os.path.join(save_dir, f'data_sample4D_{i}.npy')
    data = np.load(filename_data)
    filename_ancilla = os.path.join(save_dir, f'ancilla_sample4D_{i}.npy')
    ancilla = np.load(filename_ancilla)
    data_list.append(data.squeeze(1))
    ancilla_list.append(ancilla)

In [None]:
test_data_list = []
test_ancilla_list = []

num_samples_test = 100

save_dir_test = "../rotSC_sample4D_test_01p/"

for i in range(num_samples_test):
    filename_data = os.path.join(save_dir_test, f'data_sample4D_test_{i}.npy')
    data = np.load(filename_data)
    filename_ancilla = os.path.join(save_dir_test, f'ancilla_sample4D_test_{i}.npy')
    ancilla = np.load(filename_ancilla)
    test_data_list.append(data.squeeze(1))
    test_ancilla_list.append(ancilla)

In [None]:
import torch
import torch.nn.functional as F

# ancilla as input, data as target
data_np = np.concatenate(data_list, axis=0)
data_tensor = torch.tensor(data_np, dtype=torch.long)
padded_data_tensor = F.pad(data_tensor, (0, 1, 0, 1), "constant", 0)

ancilla_np = np.concatenate(ancilla_list, axis=0)
ancilla_tensor = torch.tensor(ancilla_np, dtype=torch.float32)

train_quantized_dataset = TensorDataset(ancilla_tensor, padded_data_tensor)

test_data_np = np.concatenate(test_data_list, axis=0)
test_data_tensor = torch.tensor(test_data_np, dtype=torch.long)
padded_test_data_tensor = F.pad(test_data_tensor, (0, 1, 0, 1), "constant", 0)

test_ancilla_np = np.concatenate(test_ancilla_list, axis=0)
test_ancilla_tensor = torch.tensor(test_ancilla_np, dtype=torch.float32)

test_quantized_dataset = TensorDataset(test_ancilla_tensor, padded_test_data_tensor)

In [None]:
from torch.utils.data import DataLoader, Dataset, Subset

batch_size = 64

# dataset loaders
train_quantized_loader = DataLoader(train_quantized_dataset, batch_size=batch_size, shuffle=True)
test_quantized_loader = DataLoader(test_quantized_dataset, batch_size=batch_size, shuffle=False)

In [None]:
def train(model, train_loader, optimizer, criterion):
    losses = []
    # ensure model is in training mode
    model.train()    
    
    for i, (input, target) in enumerate(train_loader):
        input, target = input.to(device), target.to(device)
        optimizer.zero_grad()   
                
        # forward pass
        output = model(input)
        loss = criterion(output, target)
        
        # backward pass + run optimizer to update weights
        loss.backward()
        optimizer.step()
        
        # keep track of loss value
        losses.append(loss.data.cpu().numpy()) 
           
    return losses

In [None]:
def validate(model, test_loader, criterion):
    losses = []
    # ensure model is in training mode
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad(): 
        for i, (input, target) in enumerate(test_loader):        
            input , target = input.to(device), target.to(device) 
                    
            # forward pass
            #ee_choice = 'bb'
            output = model(input)
            loss = criterion(output, target)    
            # keep track of loss value
            losses.append(loss.data.cpu().numpy()) 
            # test accuracy
            _, predicted = output.max(dim=1)
            check_each_h = (predicted == target).all(dim=1)
            check_each_w = check_each_h.all(dim=1)
            correct += check_each_w.sum().item()
            total += target.size(0)
            
    accuracy = correct / total
           
    return losses, accuracy

In [None]:
U_net_model = QuantWeightActUNet_one4all(2, 4)
U_net_model.to(device)
num_epochs = 50
running_loss_train = []
running_loss_validate = []
running_test_acc = []

In [None]:
from torch.optim.lr_scheduler import StepLR

# learning rate: the size of the steps that the optimizer takes along the gradient towards minimizing the loss function.
lr = 0.01
# loss criterion and optimizer
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(U_net_model.parameters(), lr=lr, betas=(0.9, 0.999))
scheduler = StepLR(optimizer, step_size=25, gamma=1)

In [None]:
import numpy as np
from tqdm import tqdm, trange
# Setting seeds for reproducibility
torch.manual_seed(0)
np.random.seed(0)

t = trange(num_epochs, desc="Training loss", leave=True)

for epoch in t:
    loss_epoch_train = train(U_net_model, train_quantized_loader, optimizer, criterion)
    loss_epoch_validation, accuracy = validate(U_net_model, test_quantized_loader, criterion)
    scheduler.step()
    #accuracy = test(U_net_model, test_quantized_loader)
    t.set_description("Training loss = %f, Validation loss = %f, accuracy = %f" % (np.mean(loss_epoch_train), np.mean(loss_epoch_validation), accuracy))
    #t.set_description("Training loss = %f" % (np.mean(loss_epoch_train)))
    t.refresh() # to show immediately the update
    running_loss_train.append(loss_epoch_train)
    running_loss_validate.append(loss_epoch_validation)
    running_test_acc.append(accuracy)

  return super().rename(names)
Training loss = 0.002266, Validation loss = 0.030074, accuracy = 0.920000: 100%|██████████| 50/50 [00:31<00:00,  1.57it/s]


In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

def display_loss_plot(losses_1, losses_2, title, xlabel="Iterations", ylabel="Loss"):
    plt.figure(figsize=(8, 5))
    x_axis = [i for i in range(len(losses_1))]
    plt.plot(x_axis, losses_1, label = "Training loss")
    plt.plot(x_axis, losses_2, label = "Validation loss")
    plt.title(title)
    plt.ylim(0, 0.03)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.legend()
    plt.show()
    
train_loss_per_epoch = [np.mean(train_loss_per_epoch) for train_loss_per_epoch in running_loss_train]
validate_loss_per_epoch = [np.mean(validate_loss_per_epoch) for validate_loss_per_epoch in running_loss_validate]
display_loss_plot(train_loss_per_epoch, validate_loss_per_epoch, "Training and Validation Loss Over Epochs")

In [None]:
def display_accu_plot(accu, losses_1, losses_2, title, xlabel="Iterations", ylabel1="Accuracy", ylabel2="Loss"):
    fig, ax1 = plt.subplots(figsize=(8, 5))
    x_axis = [i for i in range(len(accu))]
    ax1.plot(x_axis, accu, color = "green", label = "Accuracy")
    ax1.set_xlabel(xlabel)
    ax1.set_ylabel(ylabel1, color='green')
    ax1.tick_params(axis='y', labelcolor='green')
    
    # Find and mark the maximum accuracy
    max_accu = max(accu)
    max_index = accu.index(max_accu)
    ax1.scatter(max_index, max_accu, color='green')
    ax1.text(max_index, max_accu, f'{max_accu:.2f}', ha='left', color='green')
    
    # Create a second y-axis for the loss
    ax2 = ax1.twinx()
    ax2.plot(x_axis, losses_1, color='red', label = "Training loss")
    ax2.plot(x_axis, losses_2, color='orange', label = "Validation loss")
    ax2.set_ylabel(ylabel2, color='red')
    ax2.tick_params(axis='y', labelcolor='red')
    ax2.set_ylim(0, 0.03)

    ax1.legend(loc='lower left')
    ax2.legend(loc='lower right')
    
    plt.title(title)
    fig.tight_layout()  # Adjust layout to make room for both y-labels
    plt.show()
    
accu_per_epoch = [np.mean(accu_per_epoch) for accu_per_epoch in running_test_acc]
train_loss_per_epoch = [np.mean(train_loss_per_epoch) for train_loss_per_epoch in running_loss_train]
validate_loss_per_epoch = [np.mean(validate_loss_per_epoch) for validate_loss_per_epoch in running_loss_validate]
display_accu_plot(accu_per_epoch, train_loss_per_epoch, validate_loss_per_epoch, "Accuracy and Loss Over Epochs")

In [None]:
from finn.util.test import get_test_model_trained
from brevitas.export import export_qonnx
from qonnx.util.cleanup import cleanup as qonnx_cleanup
from qonnx.core.modelwrapper import ModelWrapper
from finn.transformation.qonnx.convert_qonnx_to_finn import ConvertQONNXtoFINN
from qonnx.transformation.infer_shapes import InferShapes
from qonnx.transformation.fold_constants import FoldConstants
from qonnx.transformation.general import GiveReadableTensorNames, GiveUniqueNodeNames, RemoveStaticGraphInputs
from qonnx.transformation.infer_datatypes import InferDataTypes
from qonnx.transformation.infer_data_layouts import InferDataLayouts
from qonnx.transformation.general import (
    ConvertSubToAdd,
    ConvertDivToMul,
    GiveReadableTensorNames,
    GiveUniqueNodeNames,
    SortGraph,
    RemoveUnusedTensors,
    GiveUniqueParameterTensors,
    RemoveStaticGraphInputs,
    ApplyConfig,
)

model_dir = os.environ['FINN_ROOT'] + "/notebooks/HLS_unet/onnx_model_01ER3LUnet"

In [None]:
U_net_model.cpu()
export_onnx_path = model_dir + "/unet_export.onnx"
export_qonnx(U_net_model, torch.randn(1, 2, 6, 6), export_onnx_path)
qonnx_cleanup(export_onnx_path, out_file=export_onnx_path)
model = ModelWrapper(export_onnx_path)

model = model.transform(ConvertQONNXtoFINN())
model = model.transform(GiveUniqueParameterTensors())
model = model.transform(InferShapes())
model = model.transform(FoldConstants())
model = model.transform(RemoveStaticGraphInputs())
model = model.transform(GiveUniqueNodeNames())
model = model.transform(GiveReadableTensorNames())
model = model.transform(InferDataTypes())
# model = model.transform(InsertTopK(k=1, axis=1))
model = model.transform(InferShapes())
model = model.transform(GiveUniqueNodeNames())
model = model.transform(GiveReadableTensorNames())
model = model.transform(InferDataTypes())
model = model.transform(InferDataLayouts())
model.save(model_dir + "/unet.onnx")

In [None]:
showInNetron(model_dir + "/unet.onnx")

In [None]:
def step_resnet50_streamline_linear(model: ModelWrapper):
    streamline_transformations = [
        MoveLinearPastFork(),
        MoveMulPastMaxPool(),
        AbsorbSignBiasIntoMultiThreshold(), # Absorb add into MultiThreshold
        AbsorbScalarMulAddIntoTopK(),  # before MoveAddPastMul to avoid int->float
        ConvertSubToAdd(),
        ConvertDivToMul(),
        RemoveIdentityOps(),
        CollapseRepeatedMul(),
        BatchNormToAffine(), # BatchNorm to mul + add (B <1, C, 1, 1>)
        ConvertSignToThres(),
        MoveAddPastMul(),
        MoveScalarAddPastMatMul(),
        MoveAddPastConv(),
        MoveScalarMulPastMatMul(),
        MoveScalarMulPastConv(),
        MoveScalarLinearPastInvariants(),
        MoveAddPastMul(),
        CollapseRepeatedAdd(),
        CollapseRepeatedMul(),
        AbsorbAddIntoMultiThreshold(),
        FactorOutMulSignMagnitude(),
        MoveMaxPoolPastMultiThreshold(),
        AbsorbMulIntoMultiThreshold(),
        Absorb1BitMulIntoMatMul(),
        Absorb1BitMulIntoConv(),
        RoundAndClipThresholds(),
    ]
    for trn in streamline_transformations:
        model = model.transform(trn)
        model = model.transform(GiveUniqueNodeNames())
    return model


def step_resnet50_streamline_nonlinear(model: ModelWrapper):
    streamline_transformations = [
        MoveLinearPastEltwiseAdd(),
        MoveLinearPastFork(),
    ]
    for trn in streamline_transformations:
        model = model.transform(trn)
        model = model.transform(GiveUniqueNodeNames())
    return model

def step_resnet50_streamline(model: ModelWrapper):

    for iter_id in range(4):
        model = step_resnet50_streamline_linear(model)
        model = step_resnet50_streamline_nonlinear(model)

        # big loop tidy up
        model = model.transform(RemoveUnusedTensors())
        model = model.transform(GiveReadableTensorNames())
        model = model.transform(InferDataTypes())
        model = model.transform(SortGraph())

    model = model.transform(DoubleToSingleFloat())

    return model

model = ModelWrapper(model_dir + "/unet.onnx")
model = step_resnet50_streamline(model)
model.save(model_dir + "/unet_streamline.onnx")

In [None]:
showInNetron(model_dir + "/unet_streamline.onnx")

In [None]:
def step_resnet50_convert_to_hls(model: ModelWrapper):
    model.set_tensor_datatype(model.graph.input[0].name, DataType["UINT4"])
    model = model.transform(InferDataLayouts())

    model = model.transform(DoubleToSingleFloat())
    model = model.transform(InferDataTypes())
    model = model.transform(SortGraph())

    to_hls_transformations = [
        to_hls.InferAddStreamsLayer,
        LowerConvsToMatMul,
        to_hls.InferChannelwiseLinearLayer,
        MakeMaxPoolNHWC,
        to_hls.InferPool_Batch,
        AbsorbTransposeIntoMultiThreshold,
        RoundAndClipThresholds,
        to_hls.InferQuantizedMatrixVectorActivation,
        to_hls.InferThresholdingLayer,
        AbsorbConsecutiveTransposes,
        to_hls.InferConcatLayer,
        MakeScaleResizeNHWC,
        to_hls.InferUpsample,
        to_hls.InferConvInpGen,
        to_hls.InferDuplicateStreamsLayer,
        #to_hls.InferLabelSelectLayer,
        #InferLabelSelectLayer_reshape,
        #MoveTransposePastTopK,
        AbsorbConsecutiveTransposes,
        InferDataLayouts
    ]
    for trn in to_hls_transformations:
        model = model.transform(trn())
        model = model.transform(InferDataLayouts())
        model = model.transform(GiveUniqueNodeNames())
        model = model.transform(InferDataTypes())

    model = model.transform(RemoveCNVtoFCFlatten())
    model = model.transform(GiveReadableTensorNames())
    model = model.transform(RemoveUnusedTensors())
    model = model.transform(SortGraph())

    return model

In [None]:
model = ModelWrapper(model_dir + "/unet_streamline.onnx")
model = step_resnet50_convert_to_hls(model)

In [None]:
model.save(model_dir + "/unet_hls.onnx")
# showInNetron(model_dir + "/unet_hls.onnx")

In [None]:
parent_model = model.transform(CreateDataflowPartition())
parent_model.save(model_dir + "/unet_dataflow_parent.onnx")
#showInNetron(model_dir + "/unet_dataflow_parent.onnx")

In [None]:
from qonnx.custom_op.registry import getCustomOp
sdp_node = parent_model.get_nodes_by_op_type("StreamingDataflowPartition")[0]
sdp_node = getCustomOp(sdp_node)
dataflow_model_filename = sdp_node.get_nodeattr("model")
# save the dataflow partition with a different name for easier access
dataflow_model = ModelWrapper(dataflow_model_filename)
dataflow_model.save(model_dir + "/unet_dataflow_model.onnx")

In [None]:
# Generate MVAU params.h to save weights

from qonnx.core.datatype import DataType
from qonnx.util.basic import (
    calculate_matvec_accumulator_range,
    interleave_matrix_outer_dim_from_partitions,
    roundup_to_integer_multiple,
)
from finn.util.data_packing import (
    npy_to_rtlsim_input,
    numpy_to_hls_code,
    pack_innermost_dim_as_hex_string,
    rtlsim_output_to_npy,
)

# set up loaded model and save_path
model = ModelWrapper(model_dir + "/unet_dataflow_model.onnx")
save_path = os.path.join(model_dir, "MVAU_params_for_dataflow")
save_path_dat = os.path.join(model_dir, "MVAU_memstream_dat")
save_path_npy = os.path.join(model_dir, "MVAU_weights_npy")
# Check if the directory exists, if not, create it
if not os.path.exists(save_path):
    os.makedirs(save_path)
if not os.path.exists(save_path_npy):
    os.makedirs(save_path_npy)
if not os.path.exists(save_path_dat):
    os.makedirs(save_path_dat)
fc_layers = model.get_nodes_by_op_type("MatrixVectorActivation")

# TODO: define a list like below to save pe, simd, wmem, ... all the params related to layers
# DONE
weights_shape = [
    (2, 16, 9, False),
    (16, 16, 9, False),
    
    (16, 32, 9, False),
    (32, 32, 9, False),
    
    (32, 32, 18, False),# wmem doubled
    (64, 32, 18, False),# wmem doubled
    
    (64, 32, 1, False),
    
    (32, 16, 9, False),
    (16, 16, 9, False),
    
    (16, 16, 1, False),
    
    (16, 8, 9, False),
    (8, 8, 9, False),
    
    (8, 4, 1, False)
] # For the hw, the max PE is set to 32, so the output channels larger than 32 should be set to 32 and the according wmem should be doubled.

weights_shape_dat = []
ws_simd = 8
ws_pe = 32

for (simd, pe, wmem, is_padding) in weights_shape:
    if is_padding:
        new_weights_shape = (ws_simd, ws_pe, wmem, is_padding)
    else:
        new_weights_shape = (ws_simd, ws_pe, int(wmem*simd/ws_simd*pe/ws_pe), is_padding)
        
    weights_shape_dat.append(new_weights_shape)

# weight_file_mode can be selected from {hls_header, decoupled_npy, decoupled_verilog_dat}
weight_file_mode = "hls_header"

for index, (fcl, (simd, pe, wmem, is_padding), (new_simd, new_pe, new_wmem, new_is_padding)) in enumerate(zip(fc_layers, weights_shape, weights_shape_dat)):
    fcl_inst = MVAU(fcl)
    fcl_inst.set_nodeattr("SIMD", simd)
    fcl_inst.set_nodeattr("PE", pe)
    fcl_inst.set_nodeattr("mem_mode", "decoupled")
    fcl_inst.set_nodeattr("runtime_writeable_weights", 0)
    fcl_inst.set_nodeattr("ram_style", "block")
    #fcl_inst.set_nodeattr("MW", 32)
    #fcl_inst.set_nodeattr("MH", 64)
    #fcl_inst.set_nodeattr("mem_mode", "internal_decoupled")
    #fcl_inst.generate_params(model, save_path)
    
    weights = model.get_initializer(fcl_inst.onnx_node.input[1])
    weight_tensor = weights.T
    if fcl_inst.get_weight_datatype() == DataType["BIPOLAR"]:
        # convert bipolar to binary
        weight_tensor = (weight_tensor + 1) / 2
    if is_padding:
        weight_tensor = interleave_matrix_outer_dim_from_partitions(weight_tensor, pe)
        weight_tensor = weight_tensor.reshape(1, pe, wmem, simd) #set pe, wmem, simd accordingly
        if (pe < new_pe) and (simd <= new_simd):
            new_shape = (1, new_pe, wmem, new_simd)
            new_weights = np.zeros(new_shape)
            new_weights[:, :weight_tensor.shape[1], :, :weight_tensor.shape[3]] = weight_tensor
        elif (pe < new_pe) and (simd > new_simd):
            new_shape = (1, new_pe, wmem, simd)
            new_weights = np.zeros(new_shape)
            new_weights[:, :weight_tensor.shape[1], :, :] = weight_tensor
            new_weights = new_weights.reshape(1, new_pe, int(wmem*simd/new_simd), new_simd)
        elif (pe > new_pe) and (simd < new_simd):
            new_shape = (1, pe, wmem, new_simd)
            new_weights = np.zeros(new_shape)
            new_weights[:, :, :, :weight_tensor.shape[3]] = weight_tensor
            new_weights = new_weights.reshape(1, new_pe, int(wmem*pe/new_pe), new_simd)
    else:
        weight_tensor = interleave_matrix_outer_dim_from_partitions(weight_tensor, pe)
        new_weights = weight_tensor.reshape(1, pe, wmem, simd) #set pe, wmem, simd accordingly
    
    weight_tensor = np.flip(new_weights, axis=-1)
    #collected_tensors.append(weight_tensor)
    export_wdt = DataType[fcl_inst.get_nodeattr("weightDataType")]
    if export_wdt == DataType["BIPOLAR"]:
        export_wdt = DataType["BINARY"]
    if weight_file_mode == "hls_header":
        weight_hls_code = numpy_to_hls_code(weight_tensor, export_wdt, "weights", True, True)
        weight_h_filename = "{}/params{}.h".format(save_path, index)
        weight_cpp_filename = "{}/params{}.cpp".format(save_path, index)
        f_weights = open(weight_cpp_filename, "w")
        f_weights.write("#include \"params{}.h\"\n".format(index))
        f_weights.write("\n")
        if export_wdt.bitwidth() != 1:
            f_weights.write(
                "const FixedPointWeights<{},{},{},{}> weights{} = \n".format(
                    simd,
                    export_wdt.get_hls_datatype_str(),
                    pe,
                    wmem,
                    index
                )
            )
        else:
            f_weights.write(
                "const BinaryWeights<{},{},{}> mvau_weights{} = \n".format(
                    new_simd,
                    new_pe,
                    new_wmem,
                    index
                )
            )
        #f_weights.write("{")
        f_weights.write(weight_hls_code)
        #f_weights.write("}")
        f_weights.close()

        f_weights = open(weight_h_filename, "w")
        f_weights.write("#ifndef PARAMS{}_H\n".format(index))
        f_weights.write("#define PARAMS{}_H\n".format(index))
        f_weights.write("\n")
        f_weights.write("#include \"../weights.hpp\"\n")
        f_weights.write("\n")
        f_weights.write(
            "extern const FixedPointWeights<{},{},{},{}> weights{};\n".format(
                simd,
                export_wdt.get_hls_datatype_str(),
                pe,
                wmem,
                index
            )
        )
        f_weights.write("\n")
        f_weights.write("#endif")
    elif "decoupled" in weight_file_mode:
        # create a weight stream for various flavors of decoupled mode:
        # transpose weight tensor from (1, PE, WMEM, SIMD) to (1, WMEM, PE, SIMD)
        weight_tensor_unflipped = np.transpose(weight_tensor, (0, 2, 1, 3))
        # reverse SIMD flip for saving weights in .npy
        weight_tensor_simd_flipped = np.flip(weight_tensor_unflipped, axis=-1)
        # PE flip for saving weights in .dat
        weight_tensor_pe_flipped = np.flip(weight_tensor_unflipped, axis=-2)
        # reshape weight tensor (simd_flipped and pe_flipped) to desired shape
        # simd_flipped
        weight_tensor_simd_flipped = weight_tensor_simd_flipped.reshape(1, -1, new_pe * new_simd)
        weight_tensor_simd_flipped = weight_tensor_simd_flipped.copy()
        # flipped
        weight_tensor_pe_flipped = weight_tensor_pe_flipped.reshape(1, -1, new_pe * new_simd)
        weight_tensor_pe_flipped = weight_tensor_pe_flipped.copy()
        weight_file_name_npy = "{}/weights_{}.npy".format(save_path_npy, index)
        weight_file_name_dat = "{}/memstream_{}.dat".format(save_path_dat, index)
        if weight_file_mode == "decoupled_npy":
            # save weight stream into npy for cppsim
            np.save(weight_file_name_npy, weight_tensor_simd_flipped)
        elif weight_file_mode == "decoupled_verilog_dat":
            # convert weight values into hexstring
            weight_width = fcl_inst.get_weightstream_width()
            # pad to nearest 4 bits to get hex strings
            weight_width_padded = roundup_to_integer_multiple(weight_width, 4)
            weight_tensor_pe_flipped = pack_innermost_dim_as_hex_string(
                weight_tensor_pe_flipped, export_wdt, weight_width_padded, prefix=""
            )
            # add zeroes to pad out file to 1024 entries
            weight_stream = weight_tensor_pe_flipped.flatten()
            weight_stream = weight_stream.copy()
            with open(weight_file_name_dat, "w") as f:
                for val in weight_stream:
                    f.write(val + "\n")
print("Complete.")
model.save(model_dir + "/unet_dataflow_model_01.onnx")

In [None]:
# Generate MVAU thresh.h to save activation value

# set up loaded model and save_path
model = ModelWrapper(model_dir + "/unet_dataflow_model_01.onnx")
save_path = os.path.join(model_dir, "MVAU_params")
# Check if the directory exists, if not, create it
if not os.path.exists(save_path):
    os.makedirs(save_path)
fc_layers = model.get_nodes_by_op_type("MatrixVectorActivation")


thresh_shape = [
    (16, 16, 1),
    (16, 16, 1),
    (32, 32, 1),
    (32, 32, 1),
    (32, 64, 2),
    (32, 64, 2),
    (32, 32, 1),
    (16, 16, 1),
    (16, 16, 1),
    (16, 16, 1),
    (8, 8, 1),
    (8, 8, 1),
]
for index, (fcl, (pe, mh, tmem)) in enumerate(zip(fc_layers, thresh_shape)):
    fcl_inst = MVAU(fcl)
    if len(fcl_inst.onnx_node.input) > 2:
        thresholds = model.get_initializer(fcl_inst.onnx_node.input[2])
        if thresholds is not None:
            #fcl_inst.set_nodeattr("PE", pe)
            #fcl_inst.set_nodeattr("MH", mh)
            # tmem = mh // pe
            n_thres_steps = thresholds.shape[1]
            threshold_tensor = thresholds
            #if threshold_tensor.shape[0] == 1:
            #    threshold_tensor = np.tile(threshold_tensor, (mh, 1))
            threshold_tensor = interleave_matrix_outer_dim_from_partitions(threshold_tensor, pe)
            threshold_tensor = threshold_tensor.reshape(1, pe, tmem, n_thres_steps)
            tdt = DataType[fcl_inst.get_nodeattr("accDataType")]
            print(tdt)
            thresholds_hls_code = numpy_to_hls_code(threshold_tensor, tdt, "thresholds", False, True)
            tdt_hls = tdt.get_hls_datatype_str()
            tdt_hls_usrset = "ap_int<12>"
            # use binary to export bipolar activations
            export_odt = fcl_inst.get_output_datatype()
            odt_hls = export_odt.get_hls_datatype_str()
            
            # write thresholds into thresh.h
            thresh_h_filename = "{}/MVAU_thresh{}.h".format(save_path, index)
            thresh_cpp_filename = "{}/MVAU_thresh{}.cpp".format(save_path, index)
# cpp file  
            f_thresh = open(thresh_cpp_filename, "w")
            f_thresh.write("#include \"MVAU_thresh{}.h\"\n".format(index))
            f_thresh.write("\n")
            f_thresh.write(
                "ThresholdsActivation<{},{},{},{},{},{},{}> mvau_threshs{} = \n".format(
                    tmem,
                    pe,
                    threshold_tensor.shape[-1],
                    tdt_hls_usrset,
                    odt_hls,
                    fcl_inst.get_nodeattr("ActVal"),
                    "comp::less_equal<%s, %s>" % (tdt_hls_usrset, tdt_hls_usrset),
                    index,
                )
            )
            f_thresh.write(thresholds_hls_code)
            f_thresh.close()
# header file
            f_thresh = open(thresh_h_filename, "w")
            f_thresh.write("#ifndef MVAU_THRESH{}_H\n".format(index))
            f_thresh.write("#define MVAU_THRESH{}_H\n".format(index))
            f_thresh.write("\n")
            f_thresh.write("#include \"../activations.hpp\"\n")
            f_thresh.write("\n")
            f_thresh.write(
                "extern ThresholdsActivation<{},{},{},{},{},{},{}> mvau_threshs{};\n".format(
                    tmem,
                    pe,
                    threshold_tensor.shape[-1],
                    tdt_hls_usrset,
                    odt_hls,
                    fcl_inst.get_nodeattr("ActVal"),
                    "comp::less_equal<%s, %s>" % (tdt_hls_usrset, tdt_hls_usrset),
                    index,
                )
            )
            f_thresh.write("\n")
            f_thresh.write("#endif\n")
            f_thresh.close()

model.save(model_dir + "/unet_dataflow_model_02.onnx")

In [None]:
# Generate Thresholding_Batch thresh.h to save activation value

model = ModelWrapper(model_dir + "/unet_dataflow_model_02.onnx")
save_path = os.path.join(model_dir, "Thresholding_params")
# Check if the directory exists, if not, create it
if not os.path.exists(save_path):
    os.makedirs(save_path)
Thresholding_layers = model.get_nodes_by_op_type("Thresholding_Batch")


thresh_shape = [
    (2, 2, 1),
    (16, 16, 1),
    (32, 32, 1),
    (32, 32, 1),
    (32, 32, 1),
    (16, 16, 1),
    (16, 16, 1),
]
for index, (fcl, (pe, mh, tmem)) in enumerate(zip(Thresholding_layers, thresh_shape)):
    fcl_inst = Thresholding_Batch(fcl)
    fcl_inst.set_nodeattr("PE", pe)
    fcl_inst.set_nodeattr("NumChannels", mh)
    fcl_inst.set_nodeattr("weightDataType", "INT32")
    thresholds = model.get_initializer(fcl_inst.onnx_node.input[1])
    n_thres_steps = thresholds.shape[1]
    assert n_thres_steps == fcl_inst.get_nodeattr("numSteps"), "Mismatch in threshold steps"
    if not fcl_inst.get_input_datatype().signed():
        # ensure all thresholds are nonnegative
        assert (thresholds >= 0).all()
    # ensure all thresholds are integer
    assert np.equal(np.mod(thresholds, 1), 0).all(), "Need int threshold tensor"
    #fcl_inst.set_nodeattr("PE", pe)
    #fcl_inst.set_nodeattr("MH", mh)
    # tmem = mh // pe
    threshold_tensor = thresholds
    if threshold_tensor.shape[0] == 1:
        threshold_tensor = np.tile(threshold_tensor, (mh, 1))
    threshold_tensor = interleave_matrix_outer_dim_from_partitions(threshold_tensor, pe)
    threshold_tensor = threshold_tensor.reshape(1, pe, tmem, n_thres_steps)
    tdt = fcl_inst.get_weight_datatype()
    print(tdt)
    thresholds_hls_code = numpy_to_hls_code(threshold_tensor, tdt, "thresholds", False, True)
    tdt_hls = tdt.get_hls_datatype_str()
    if tdt_hls == "ap_int<32>":
        tdt_hls = "ap_int<8>"
    tdt_hls_usrset = "ap_int<8>"
    # use binary to export bipolar activations
    export_odt = fcl_inst.get_output_datatype()
    odt_hls = export_odt.get_hls_datatype_str()
    
    thresh_h_filename = "{}/Thresholding_thresh{}.h".format(save_path, index)
    thresh_cpp_filename = "{}/Thresholding_thresh{}.cpp".format(save_path, index)
# Write to cpp file which contain the data
    f_thresh = open(thresh_cpp_filename, "w")
    f_thresh.write("#include \"Thresholding_thresh{}.h\"\n".format(index))
    f_thresh.write("\n")
    f_thresh.write(
        "ThresholdsActivation<{},{},{},{},{},{},{}> thresB_threshs{} = \n".format(
            tmem,
            pe,
            threshold_tensor.shape[-1],
            tdt_hls,
            odt_hls,
            fcl_inst.get_nodeattr("ActVal"),
            "comp::less_equal<%s, %s>" % (tdt_hls, tdt_hls),
            index,
        )
    )
    f_thresh.write(thresholds_hls_code)
    f_thresh.close()
# Write to header file
    f_thresh = open(thresh_h_filename, "w")
    tdt_hls = tdt.get_hls_datatype_str()
    if tdt_hls == "ap_int<32>":
        tdt_hls = "ap_int<8>"
    tdt_hls_usrset = "ap_int<8>"
    # use binary to export bipolar activations
    export_odt = fcl_inst.get_output_datatype()
    odt_hls = export_odt.get_hls_datatype_str()
    f_thresh.write("#ifndef THRESHOLDING_THRESH{}_H\n".format(index))
    f_thresh.write("#define THRESHOLDING_THRESH{}_H\n".format(index))
    f_thresh.write("\n")
    f_thresh.write("#include \"../activations.hpp\"\n")
    f_thresh.write("\n")
    f_thresh.write(
        "extern ThresholdsActivation<{},{},{},{},{},{},{}> thresB_threshs{};\n".format(
            tmem,
            pe,
            threshold_tensor.shape[-1],
            tdt_hls,
            odt_hls,
            fcl_inst.get_nodeattr("ActVal"),
            "comp::less_equal<%s, %s>" % (tdt_hls, tdt_hls),
            index,
        )
    )
    f_thresh.write("\n")
    f_thresh.write("#endif\n")
    #f_thresh.write(thresholds_hls_code)
    f_thresh.close()

model.save(model_dir + "/unet_dataflow_model_03.onnx")

In [None]:
from finn.util.basic import pynq_part_map
#print(pynq_part_map.keys())

pynq_board = "RFSoC4x2"
fpga_part = pynq_part_map[pynq_board]
target_clk_ns = 10

In [None]:
from finn.transformation.fpgadataflow.make_zynq_proj import ZynqBuild
model = ModelWrapper(model_dir + "/unet_dataflow_model_03.onnx")
model = model.transform(InsertAndSetFIFODepths(fpga_part))
model.save(model_dir + "/unet_dataflow_model_03_withfifo.onnx")
model = ModelWrapper(model_dir + "/unet_dataflow_model_03_withfifo.onnx")
model = model.transform(ZynqBuild(platform = pynq_board, period_ns = target_clk_ns))
model.save(model_dir + "/unet_dataflow_model_03_ZynqBuild.onnx")

In [None]:
from finn.transformation.fpgadataflow.make_pynq_driver import MakePYNQDriver
model = ModelWrapper(model_dir + "/unet_dataflow_model_03_ZynqBuild.onnx")
model = model.transform(MakePYNQDriver("zynq-iodma"))
model.save(model_dir + "/unet_synth.onnx")

In [None]:
showInNetron(model_dir + "/unet_synth.onnx")