In [41]:
from typing import Type
import pyrtl
from pyrtl import *
import numpy as np
import onnx
import torch
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from enum import IntEnum
from hardware_accelerators.nn import load_model
from hardware_accelerators.dtypes import *
from hardware_accelerators.rtllib import *
from hardware_accelerators.simulation import AcceleratorSimulator
from hardware_accelerators.simulation.matrix_utils import *
from hardware_accelerators.simulation.buffer import WeightFIFOSimulator

# New Accelerator Sim Testing


## GEMM


In [None]:
simulator = AcceleratorSimulator.default_config(array_size=3, num_weight_tiles=2)

simulator.setup()

weights = np.ones((3, 3))
activations = np.array([[1, 2, 3], [-4, -5, -6], [7, 8, 9]])

simulator.load_weights(weights, 0)

simulator.execute_instruction(
    data_vec=activations[0],
    load_new_weights=True,
    flush_pipeline=False,
    activation_enable=True,
    activation_func="relu",
)
simulator.execute_instruction(
    data_vec=activations[1],
    accum_addr=1,
    flush_pipeline=False,
    activation_enable=True,
    activation_func="relu",
)
simulator.execute_instruction(
    data_vec=activations[2],
    accum_addr=2,
    activation_enable=True,
    activation_func="relu",
    flush_pipeline=True,
)

results = np.zeros((activations.shape[0], weights.shape[1]))

for i in range(3):
    results[i] = simulator._get_outputs()
    simulator.execute_instruction(nop=True)

gt = np.maximum(0, (activations @ weights))

assert np.isclose(results, gt).all()

simulator.history

[
 Simulation Step 0
 Input Signals:
 --------------------------------------------------------------------------------
   data_enable: 0
   data_inputs: None
   weight_start: 1
   weight_tile_addr: 0
   accum_addr: 0
   accum_mode: 0
   act_start: 0
   act_func: 0
 
 Systolic Array State:
 --------------------------------------------------------------------------------
 Inputs:
   w_en: 0
   enable: 0
   weights: [0. 0. 0.]
   data: [0. 0. 0.]
 
 Weights Matrix:
 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]
 
 Data Matrix:
 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]
 
 Accumulators:
 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]
 
 Control Registers:
 data_controls: [0, [0, 0]]
 accum_controls: [0, 0, 0]
 control_out: 0
 
 Outputs:
 [0. 0. 0.]
 ----------------------------------------
 
 
 Accumulator State:
 --------------------------------------------------------------------------------
 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]
 
 Outputs

## GEMV


In [None]:
simulator = AcceleratorSimulator.default_config(array_size=3, num_weight_tiles=2)
simulator.setup()

w = np.array([[1, 1, 1], [2, 2, 2], [3, 3, 3]]).T
activations = np.array([0, 0.5, 1])

simulator.load_weights(w, 0)

simulator.execute_instruction(
    data_vec=activations,
    load_new_weights=True,
    flush_pipeline=True,
    activation_enable=True,
    activation_func="relu",
)

print(f"Weights:\n{w}")
print(
    f"Systolic Array (weights loaded):\n{simulator.accelerator.inspect_systolic_array_state(simulator.sim)}"
)

simulator.execute_instruction(nop=True)
simulator.execute_instruction(nop=True)

results = simulator._get_outputs()
results

Weights:
[[1 2 3]
 [1 2 3]
 [1 2 3]]
Systolic Array (weights loaded):
Inputs:
  w_en: 0
  enable: 0
  weights: [0. 0. 0.]
  data: [0. 0. 0.]

Weights Matrix:
[[1. 2. 3.]
 [1. 2. 3.]
 [1. 2. 3.]]

Data Matrix:
[[0.  0.5 1. ]
 [0.5 1.  0. ]
 [1.  0.  0.5]]

Accumulators:
[[0.  1.  3. ]
 [0.5 3.  3. ]
 [0.  0.  0. ]]

Control Registers:
data_controls: [0, [0, 0]]
accum_controls: [0, 0, 1]
control_out: 0

Outputs:
[0. 0. 0.]
----------------------------------------



array([1.5, 3. , 4.5])

In [None]:
simulator.history

[
 Simulation Step 0
 Input Signals:
 --------------------------------------------------------------------------------
   data_enable: 0
   data_inputs: None
   weight_start: 1
   weight_tile_addr: 0
   accum_addr: 0
   accum_mode: 0
   act_start: 0
   act_func: 0
 
 Systolic Array State:
 --------------------------------------------------------------------------------
 Inputs:
   w_en: 0
   enable: 0
   weights: [0. 0. 0.]
   data: [0. 0. 0.]
 
 Weights Matrix:
 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]
 
 Data Matrix:
 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]
 
 Accumulators:
 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]
 
 Control Registers:
 data_controls: [0, [0, 0]]
 accum_controls: [0, 0, 0]
 control_out: 0
 
 Outputs:
 [0. 0. 0.]
 ----------------------------------------
 
 
 Accumulator State:
 --------------------------------------------------------------------------------
 [[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]
 
 Outputs

## Weight Loading


In [None]:
# simulator = AcceleratorSimulator.default_config(array_size=3, num_weight_tiles=2)
simulator.setup()

w = np.array([[1, 1, 1], [2, 2, 2], [3, 3, 3]])
w_neg = w * -1

simulator.load_weights(w, 0)
simulator.load_weights(w_neg, 1)

simulator.execute_instruction(load_new_weights=True, weight_tile_addr=0)
simulator.execute_instruction(load_new_weights=True, weight_tile_addr=1)
simulator.execute_instruction(nop=True)

for step in simulator.history:
    print(f"Step {step.step}\n{step.systolic_state.weights}\n")

Step 0
[[0. 0. 0.]
 [0. 0. 0.]
 [0. 0. 0.]]

Step 1
[[0. 0. 0.]
 [0. 0. 0.]
 [0. 0. 0.]]

Step 2
[[3. 1. 2.]
 [0. 0. 0.]
 [0. 0. 0.]]

Step 3
[[2. 3. 1.]
 [3. 1. 2.]
 [0. 0. 0.]]

Step 4
[[1. 2. 3.]
 [2. 3. 1.]
 [3. 1. 2.]]

Step 5
[[1. 2. 3.]
 [2. 3. 1.]
 [3. 1. 2.]]

Step 6
[[-3. -1. -2.]
 [ 1.  2.  3.]
 [ 2.  3.  1.]]

Step 7
[[-2. -3. -1.]
 [-3. -1. -2.]
 [ 1.  2.  3.]]



# PyTorch Utilities


## Load MNIST Test Data


In [None]:
# Data transformation: convert images to tensor and normalize them
transform = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,)),
    ]
)
# Download MNIST test data
test_dataset = datasets.MNIST(
    root="./data", train=False, download=True, transform=transform
)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

100.0%
100.0%
100.0%
100.0%


In [54]:
image, label = next(iter(test_loader))
image = image.numpy().reshape(-1)
image.shape, label

((784,), tensor([7]))

## Initializing the trained PyTorch model


In [34]:
model = load_model("models/mlp_mnist.pth")
model.eval()

MLP(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (fc1): Linear(in_features=784, out_features=128, bias=True)
  (relu): ReLU()
  (fc2): Linear(in_features=128, out_features=10, bias=True)
)

In [38]:
for name, param in model.named_parameters():
    print(name)

fc1.weight
fc1.bias
fc2.weight
fc2.bias


In [55]:
for name, module in model.named_modules():
    print(name)


flatten
fc1
relu
fc2


In [None]:
inputs = image  # numpy vector representing the image


# numpy softmax
def softmax(x):
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=0)


fc1_weight = model.fc1.weight.data.numpy()
fc1_bias = model.fc1.bias.data.numpy()
fc2_weight = model.fc2.weight.data.numpy()
fc2_bias = model.fc2.bias.data.numpy()

x = inputs @ fc1_weight.T + fc1_bias
x = np.maximum(0, x)
x = x @ fc2_weight.T + fc2_bias
x = softmax(x)

# get the index of the maximum value
predicted_class = np.argmax(x)
print(f"Predicted class: {predicted_class}")

Predicted class: 7


### Generalized parameter/layer extraction


In [51]:
def get_layer_info(model):
    layers = []
    for name, module in model.named_modules():
        # Skip the root module itself
        if name == "":
            continue

        if isinstance(module, nn.Linear):
            layers.append(
                {
                    "type": "linear",
                    "weight": module.weight.data.numpy(),
                    "bias": (
                        module.bias.data.numpy() if module.bias is not None else None
                    ),
                }
            )
        elif isinstance(module, nn.ReLU):
            layers.append({"type": "relu"})
        elif isinstance(module, nn.Flatten):
            layers.append({"type": "flatten"})
        # Add more layer types as needed
        # elif isinstance(module, nn.Conv2d):
        #     layers.append(...)

    return layers


def numpy_inference(image, layers):
    x = image

    for layer in layers:
        if layer["type"] == "linear":
            x = np.dot(layer["weight"], x)
            if layer["bias"] is not None:
                x = x + layer["bias"]
        elif layer["type"] == "relu":
            x = np.maximum(0, x)
        elif layer["type"] == "flatten":
            # If input is already 1D, this is a no-op
            x = x.flatten()

    return x


# Extract model structure
layers = get_layer_info(model)

# Print model structure
for i, layer in enumerate(layers):
    if layer["type"] == "linear":
        print(
            f"Layer {i}: Linear - Weight shape: {layer['weight'].shape}, Bias shape: {layer['bias'].shape}"
        )
    else:
        print(f"Layer {i}: {layer['type']}")

# Run inference
prediction = numpy_inference(image, layers)
predicted_class = np.argmax(prediction)
predicted_class

Layer 0: flatten
Layer 1: Linear - Weight shape: (128, 784), Bias shape: (128,)
Layer 2: relu
Layer 3: Linear - Weight shape: (10, 128), Bias shape: (10,)


np.int64(7)

# Running model on the accelerator sim


In [None]:
config = AcceleratorConfig(
    array_size=4,
    num_weight_tiles=8,
    data_type=BF16,
    weight_type=BF16,
    accum_type=BF16,
    pe_adder=float_adder,
    pe_multiplier=float_multiplier,
    pipeline=False,
    accum_adder=float_adder,
    accum_addr_width=8,
)

simulator = AcceleratorSimulator(config)