# 临时测试

In [1]:
from dataclasses import dataclass
import tvm
from tvm import te
import vta
import numpy as np
from tvm import rpc
from vta.testing import simulator


@dataclass
class Workload:
    batch_size: int
    height: int
    width: int
    in_channels: int
    out_channels: int
    kernel_h: int
    kernel_w: int
    pad_h: int
    pad_w: int
    stride_h: int
    stride_w: int

    def __post_init__(self):
        # Derive output feature map dimensions
        self.fout_height = (self.height + 2 * self.pad_h - self.kernel_h) // self.stride_h + 1
        self.fout_width = (self.width + 2 * self.pad_w - self.kernel_w) // self.stride_w + 1

    def data(self, env: vta.environment.Environment):
        # Input feature map: (N, IC, H, W, n, ic)
        return (
            self.batch_size // env.BATCH,
            self.in_channels // env.BLOCK_IN,
            self.height,
            self.width,
            env.BATCH,
            env.BLOCK_IN,
        )
    
    def kernel(self, env: vta.environment.Environment):
        # Kernel: (OC, IC, H, W, oc, ic)
        return (
            self.out_channels // env.BLOCK_OUT,
            self.in_channels // env.BLOCK_IN,
            self.kernel_h,
            self.kernel_w,
            env.BLOCK_OUT,
            env.BLOCK_IN,
        )
    
    def output(self, env: vta.environment.Environment):
        # Output feature map: (N, OC, H, W, n, oc)
        return (
            self.batch_size // env.BATCH,
            self.out_channels // env.BLOCK_OUT,
            self.fout_height,
            self.fout_width,
            env.BATCH,
            env.BLOCK_OUT,
        )

# Load VTA parameters from the 3rdparty/vta-hw/config/vta_config.json file
env = vta.get_env()

In [2]:
self = Workload(1, 14, 14, 256, 256, 3, 3, 1, 1, 1, 1)
assert self.batch_size % env.BATCH == 0
assert self.in_channels % env.BLOCK_IN == 0
assert self.out_channels % env.BLOCK_OUT == 0

In [3]:
# Convolution reduction axes
dy = te.reduce_axis((0, self.kernel_h), name="dy")
dx = te.reduce_axis((0, self.kernel_w), name="dx")
ic = te.reduce_axis((0, self.in_channels // env.BLOCK_IN), name="ic")
ic_tns = te.reduce_axis((0, env.BLOCK_IN), name="ic_tns")
# Input placeholder tensors
data = te.placeholder(self.data(env), name="data", dtype=env.inp_dtype)
kernel = te.placeholder(self.kernel(env), name="kernel", dtype=env.wgt_dtype)

# Copy buffers:
#   Apply spatial padding to input feature map
data_buf = tvm.topi.nn.pad(data, [0, 0, self.pad_h, self.pad_w, 0, 0], name="data_buf")
kernel_buf = te.compute(self.kernel(env), lambda *i: kernel(*i), "kernel_buf")

# Declare 2D convolution
res_conv = te.compute(
    self.output(env),
    lambda bo, co, i, j, bi, ci: te.sum(
        data_buf[bo, ic, i * self.stride_h + dy, j * self.stride_w + dx, bi, ic_tns].astype(env.acc_dtype)
        * kernel_buf[co, ic, dy, dx, ci, ic_tns].astype(env.acc_dtype),
        axis=[ic, dy, dx, ic_tns],
    ),
    name="res_conv",
)

# Add shift stage for fix-point normalization
res_shr = te.compute(self.output(env), lambda *i: res_conv(*i) >> 8, name="res_shr")
# Apply clipping between (0, input max value)
inp_max = (1 << (env.INP_WIDTH - 1)) - 1
res_max = te.compute(self.output(env), lambda *i: tvm.te.max(res_shr(*i), 0), "res_max")
res_min = te.compute(self.output(env), lambda *i: tvm.te.min(res_max(*i), inp_max), "res_min")
# Result Tensor
res = te.compute(self.output(env), lambda *i: res_min(*i).astype(env.inp_dtype), name="res")


In [4]:
# 定义 tiling sizes
b_block = 1 // env.BATCH
oc_block = 1 #128 // env.BLOCK_OUT
ic_block = 1 #// env.BLOCK_IN
h_block = 1
w_block = 14

s = te.create_schedule(res.op)
# 沿着空间和输出通道维度平铺输出张量（因为默认情况下做单批推理，沿着批维分割没有效果）
b, oc, y, x, b_tns, oc_tns = s[res].op.axis
b_out, b_inn = s[res].split(b, factor=b_block)
oc_out, oc_inn = s[res].split(oc, factor=oc_block)
y_out, y_inn = s[res].split(y, factor=h_block)
x_out, x_inn = s[res].split(x, factor=w_block)
s[res].reorder(b_out, oc_out, y_out, x_out, b_inn, oc_inn, y_inn, x_inn, b_tns, oc_tns)

# 将中间计算移动到每个输出计算 tile 中
s[res_conv].compute_at(s[res], x_out)
s[res_shr].compute_at(s[res], x_out)
s[res_max].compute_at(s[res], x_out)
s[res_min].compute_at(s[res], x_out)

# 沿着规约轴（输入通道）应用额外的循环分割（loop split）
b_inn, oc_inn, y_inn, x_inn, b_tns, oc_tns = s[res_conv].op.axis
ic_out, ic_inn = s[res_conv].split(ic, factor=ic_block)
# 重排轴
s[res_conv].reorder(ic_out, b_inn, oc_inn, y_inn, ic_inn, dy, dx, x_inn, b_tns, oc_tns, ic_tns)

In [5]:
vta.lower(s, [data, kernel, res], simple_mode=True).show()



In [6]:
# VTA 仅支持 2 个虚拟线程
v_threads = 2

# 沿输出通道外轴执行虚拟线程 split 
_, tx = s[res].split(oc_out, factor=v_threads)
s[res].reorder(tx, b_out)
s[res].bind(tx, te.thread_axis("cthread"))
# Set scope of SRAM buffers
s[data_buf].set_scope(env.inp_scope)
s[kernel_buf].set_scope(env.wgt_scope)
s[res_conv].set_scope(env.acc_scope)
s[res_shr].set_scope(env.acc_scope)
s[res_min].set_scope(env.acc_scope)
s[res_max].set_scope(env.acc_scope)

# Block data and kernel cache reads
s[data_buf].compute_at(s[res_conv], ic_out)
s[kernel_buf].compute_at(s[res_conv], ic_out)

# Use DMA copy pragma on DRAM->SRAM operations
s[data_buf].pragma(s[data_buf].op.axis[0], env.dma_copy)
s[kernel_buf].pragma(s[kernel_buf].op.axis[0], env.dma_copy)

# 在每个结果块中对 SRAM->DRAM 操作使用 DMA copy pragma（这意味着这些 copy 应该沿着 b_inn 或结果轴 4 执行）
s[res].pragma(s[res].op.axis[4], env.dma_copy)
# Apply tensorization over the batch tensor tile axis
s[res_conv].tensorize(b_tns, env.gemm)

# Add an ALU pragma over the shift and clipping operations
s[res_shr].pragma(s[res_shr].op.axis[0], env.alu)
s[res_min].pragma(s[res_min].op.axis[0], env.alu)
s[res_max].pragma(s[res_max].op.axis[0], env.alu)

In [7]:
# This library facilitates 2D convolution testing
from tvm.topi.testing import conv2d_nchw_python
# Compile the TVM module
with vta.build_config(disabled_pass={"tir.CommonSubexprElimTIR"}):
    my_conv = vta.build(
        s, [data, kernel, res], tvm.target.Target("ext_dev", host=env.target_host), name="my_conv"
    )



In [8]:

remote = rpc.LocalSession()
temp = tvm.contrib.utils.tempdir()
my_conv.save(temp.relpath("conv2d.o"))
remote.upload(temp.relpath("conv2d.o"))
f = remote.load_module("conv2d.o")

# Get the remote device context
ctx = remote.ext_dev(0)

# Initialize the data and kernel arrays randomly in the int range
# of (-128, 128] in NCHW layout
data_np = np.random.randint(-128, 128, size=(self.batch_size, self.in_channels, self.height, self.width)).astype(
    data.dtype
)
kernel_np = np.random.randint(
    -128, 128, size=(self.out_channels, self.in_channels, self.kernel_h, self.kernel_w)
).astype(kernel.dtype)

# Apply packing to the data and kernel arrays from a 2D NCHW
# to a 4D NCHWnc packed layout
data_packed = data_np.reshape(
    self.batch_size // env.BATCH, env.BATCH, self.in_channels // env.BLOCK_IN, env.BLOCK_IN, self.height, self.width
).transpose((0, 2, 4, 5, 1, 3))

kernel_packed = kernel_np.reshape(
    self.out_channels // env.BLOCK_OUT,
    env.BLOCK_OUT,
    self.in_channels // env.BLOCK_IN,
    env.BLOCK_IN,
    self.kernel_h,
    self.kernel_w,
).transpose((0, 2, 4, 5, 1, 3))

# Format the input/output arrays with tvm.nd.array to the DLPack standard
data_nd = tvm.nd.array(data_packed, ctx)
kernel_nd = tvm.nd.array(kernel_packed, ctx)
res_nd = tvm.nd.array(np.zeros(self.output(env)).astype(res.dtype), ctx)

# Clear stats
if env.TARGET in ["sim", "tsim"]:
    simulator.clear_stats()

# Invoke the module to perform the computation
f(data_nd, kernel_nd, res_nd)

# Verify against numpy implementation
res_ref = conv2d_nchw_python(
    data_np.astype(env.acc_dtype),
    kernel_np.astype(env.acc_dtype),
    (self.stride_h, self.stride_w),
    (self.pad_h, self.pad_w),
).astype(env.acc_dtype)
res_ref = res_ref >> env.INP_WIDTH
res_ref = np.clip(res_ref, 0, inp_max)
res_ref = res_ref.astype(res.dtype)
res_ref = res_ref.reshape(
    (
        self.batch_size // env.BATCH,
        env.BATCH,
        self.out_channels // env.BLOCK_OUT,
        env.BLOCK_OUT,
        self.fout_height,
        self.fout_width,
    )
).transpose((0, 2, 4, 5, 1, 3))
tvm.testing.assert_allclose(res_ref, res_nd.numpy())

# Print stats
if env.TARGET in ["sim", "tsim"]:
    sim_stats = simulator.stats()
    print("Execution statistics:")
    for k, v in sim_stats.items():
        print("\t{:<16}: {:>16}".format(k, v))

print("Successful 2D convolution test!")

2023-04-13 19:38:02.262 INFO load_module /tmp/tmp790d4vbd/conv2d.o


Execution statistics:
	inp_load_nbytes :          2293760
	wgt_load_nbytes :          8257536
	acc_load_nbytes :                0
	uop_load_nbytes :              144
	out_store_nbytes:            50176
	gemm_counter    :           451584
	alu_counter     :             9408
Successful 2D convolution test!
