In [1]:
import os
import struct
import subprocess
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
import numpy as np

# Convenience: device choice
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

import subprocess

#for isolating the verilog
#new_dir = "../mnistmlptest/LSTM_Verilog"
#os.chdir(new_dir)
notebook_dir = os.getcwd()  
print("Notebook folder:", notebook_dir)

Device: cpu
Notebook folder: /workspace/mnistmlptest/LSTM_Verilog


In [2]:
def lmul_bits(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
    #same LMUL bits
    a = a.to(torch.float32)
    b = b.to(torch.float32)

    a_bits = a.view(torch.int32)
    b_bits = b.view(torch.int32)

    a_bf16 = (a_bits >> 16) & 0xFFFF
    b_bf16 = (b_bits >> 16) & 0xFFFF

    a_sign = (a_bf16 >> 15) & 0x1
    b_sign = (b_bf16 >> 15) & 0x1

    a_field = a_bf16 & 0x7FFF
    b_field = b_bf16 & 0x7FFF

    a_exp = (a_field >> 7) & 0xFF
    b_exp = (b_field >> 7) & 0xFF
    zero_mask = (a_exp == 0) | (b_exp == 0)

    OFFSET_MOD = 0x4080  
    sum_full = a_field.to(torch.int32) + b_field.to(torch.int32) + OFFSET_MOD

    carry2 = (sum_full >> 15) & 0x3

    mask_underflow = (carry2 == 0)
    mask_normal    = (carry2 == 1)
    mask_overflow  = (carry2 >= 2)

    field_sel = torch.zeros_like(sum_full)
    field_sel = torch.where(mask_normal, sum_full & 0x7FFF, field_sel)
    field_sel = torch.where(mask_overflow, torch.tensor(0x7FFF, dtype=torch.int32, device=sum_full.device), field_sel)

    s_result = (a_sign ^ b_sign).to(torch.int32)
    s_result = torch.where(field_sel == 0, torch.tensor(0, device=sum_full.device, dtype=torch.int32), s_result)

    result_bits_bf16 = ((s_result << 15) | field_sel).to(torch.int32)
    result_bits_f32 = result_bits_bf16 << 16
    result = result_bits_f32.view(torch.float32)

    # bias correction you added
    result = result + (result / (1 << 5)) + (result / (1 << 6))
    result = torch.where(zero_mask, torch.zeros_like(result), result)

    return result


def lmul(a, b, M=7):
    return lmul_bits(a, b)


In [3]:
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('.', train=True, download=True, transform=transforms.ToTensor()),
    batch_size=128, shuffle=True
)

test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('.', train=False, transform=transforms.ToTensor()),
    batch_size=1000
)

In [4]:
# Cell 3: Data loaders + LSTM code (identical to yours)
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('.', train=True, download=True, transform=transforms.ToTensor()),
    batch_size=128, shuffle=True
)
test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('.', train=False, transform=transforms.ToTensor()),
    batch_size=1000
)

class LSTMLayer(nn.Module):
    def __init__(self, input_size, hidden_size, use_lmul=False, M=7):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.use_lmul = use_lmul
        self.M = M
        self.W = nn.Linear(input_size + hidden_size, 4 * hidden_size)

    def forward(self, x_t, h_prev, c_prev):
        combined = torch.cat((x_t, h_prev), dim=1)
        if self.use_lmul:
            W = self.W.weight
            b = self.W.bias
            x_exp = combined.unsqueeze(1)
            W_exp = W.unsqueeze(0)
            prod = lmul(x_exp, W_exp, M=self.M)
            gates = prod.sum(dim=2) + b
        else:
            gates = self.W(combined)
        i, f, g, o = torch.chunk(gates, 4, dim=1)
        i = torch.sigmoid(i); f = torch.sigmoid(f); o = torch.sigmoid(o); g = torch.tanh(g)
        if self.use_lmul:
            c_t = lmul(f, c_prev, M=self.M) + lmul(i, g, M=self.M)
            h_t = lmul(o, torch.tanh(c_t), M=self.M)
        else:
            c_t = f * c_prev + i * g
            h_t = o * torch.tanh(c_t)
        return h_t, c_t

class LSTMClassifier(nn.Module):
    def __init__(self, input_size=28, hidden_size=128, use_lmul=False, M=7):
        super().__init__()
        self.hidden_size = hidden_size
        self.use_lmul = use_lmul
        self.lstm_cell = LSTMLayer(input_size, hidden_size, use_lmul, M)
        self.fc = nn.Linear(hidden_size, 10)

    def forward(self, x):
        B = x.size(0)
        x = x.squeeze(1)
        h = torch.zeros(B, self.hidden_size)
        c = torch.zeros(B, self.hidden_size)
        for t in range(28):
            x_t = x[:, t, :]
            h, c = self.lstm_cell(x_t, h, c)
        out = self.fc(h)
        return F.log_softmax(out, dim=1)

def train_model(model, optimizer, loader, epochs=2):
    model.train()
    for epoch in range(epochs):
        for data, target in loader:
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()

def test_acc(model, loader):
    model.eval()
    correct = total = 0
    with torch.no_grad():
        for data, target in loader:
            pred = model(data).argmax(dim=1)
            correct += (pred == target).sum().item()
            total += len(target)
    return 100 * correct / total


In [5]:
# Cell 4: train baseline and extract weights + one sample input vector
model = LSTMClassifier(use_lmul=False).to(device)
opt = torch.optim.Adam(model.parameters(), lr=1e-3)

print("Training baseline model (this may take a bit)...")
train_model(model, opt, train_loader, epochs=3)  # reduce epochs if you want
print("Done training.")

# Test baseline accuracy (optional)
baseline_acc = test_acc(model, test_loader)
print(f"Baseline accuracy: {baseline_acc:.2f}%")

# Extract W matrix from the LSTM layer (shape [4*H, I+H])
W = model.lstm_cell.W.weight.detach().cpu().clone()  # torch.Tensor [O, I]
O, I_plus_H = W.shape
print("W shape:", W.shape)  # expect [512, 156] for H=128, input_size=28

# Prepare a single input vector `combined` = [x_row, h_prev] for a single timestep
# We'll just pick the first test batch and first timestep's combined vector (x_row concatenated with zeros h_prev)
# Build combined vector: x_t (28 dims) and h_prev zeros (128 dims) => length 156
sample_data, _ = next(iter(test_loader))  # batch of size 1000
sample_data = sample_data[0:1]  # pick first example
x_rows = sample_data.squeeze(1)  # [28, 28]
x_row0 = x_rows[0]               # shape [28]
x_row0 = x_row0.flatten()        # ensure shape [28]

h0 = torch.zeros(128)            # shape [128]

combined = torch.cat((x_row0, h0), dim=0)  # shape [156]
combined = combined.unsqueeze(0)           # [1, 156]
print("Combined vector shape:", combined.shape)



Training baseline model (this may take a bit)...
Done training.
Baseline accuracy: 96.71%
W shape: torch.Size([512, 156])
Combined vector shape: torch.Size([1, 912])


In [6]:
# Cell 5: write memory files for Verilog
def float_to_bf16_upper16_bits(f):
    # f: python float or numpy float
    i = struct.unpack(">I", struct.pack(">f", float(f)))[0]
    return (i >> 16) & 0xFFFF  # uint16

def float_to_u32bits(f):
    return struct.unpack(">I", struct.pack(">f", float(f)))[0]

# Prepare directories/files
open("W_bf16.txt", "w").close()
open("x_bf16.txt", "w").close()
open("fp32_products.txt", "w").close()

# Flatten W row-major and write BF16 upper16 hex per line
W_np = W.numpy()
rows, cols = W_np.shape
with open("W_bf16.txt", "w") as fW:
    for r in range(rows):
        for c in range(cols):
            val = W_np[r, c]
            bf16 = float_to_bf16_upper16_bits(val)
            fW.write(f"{bf16:04x}\n")

# Write x vector BF16 upper16
x_np = combined.squeeze(0).numpy()
with open("x_bf16.txt", "w") as fx:
    for k in range(len(x_np)):
        bf16 = float_to_bf16_upper16_bits(x_np[k])
        fx.write(f"{bf16:04x}\n")

# Precompute FP32 products for FP32 path: for each W_ij * x_j
# (we will write them in the same multiplication order: for row r in 0..rows-1, for col c in 0..cols-1)
with open("fp32_products.txt", "w") as fprod:
    for r in range(rows):
        for c in range(cols):
            prod = float(W_np[r, c]) * float(x_np[c])
            prod_bits = float_to_u32bits(prod)
            fprod.write(f"{prod_bits:08x}\n")

print("W_bf16.txt, x_bf16.txt, fp32_products.txt written.")
print(f"Total multiplies = {rows*cols}")


W_bf16.txt, x_bf16.txt, fp32_products.txt written.
Total multiplies = 79872


In [None]:
## Construct full paths to your Verilog files
lmul_file = "LMUL_LSTM.v"
tb_file = "TB_WX.v"
fp32 = "FP32.v"
sim_out = os.path.join(notebook_dir, "sim.out")


print("Compiling Verilog...")
proc = subprocess.run(["iverilog", "-g2012", "-o", sim_out, lmul_file, fp32, tb_file],capture_output=True, text=True)
if proc.returncode != 0:
    print("iverilog compilation failed:")
    print(proc.stderr)
else:
    print("Running simulation (vvp)...")
    proc = subprocess.run(["vvp", "sim.out"], capture_output=True, text=True)
    print(proc.stdout)
    if proc.returncode != 0:
        print("Simulation failed:")
        print(proc.stderr)
    else:
        print("Simulation finished, outputs written.")


Compiling Verilog...
Running simulation (vvp)...


In [4]:
import os

for f in os.listdir("."):
    print(f)

.ipynb_checkpoints
fp32_products.txt
lmul_bf16.v
lmul_tester.py
MNIST
py_lmul.py
sim.out
simple_function.v
top_lmul.v
W_bf16.txt
x_bf16.txt
__pycache__


Notebook folder: /workspace/mnistmlptest/LSTM_Verilog


In [7]:
import os

folder = "../mnistmlptest/LSTM_Verilog"
print("Folder exists?", os.path.exists(folder))
print("Contents of folder:")
for f in os.listdir(folder):
    print(f)

Folder exists? True
Contents of folder:
.ipynb_checkpoints
fp32_products.txt
LMUL_LSTM.v
lstm_verilog_tester.ipynb
sim.out
TB_WX.v
verilog_checksums.txt
W_bf16.txt
x_bf16.txt
