# 1D Convolution on GPU

In [1]:
!pip install tlcpack-nightly-cu102 -f https://tlcpack.ai/wheels
!pip install "numpy<2.0.0"

Looking in links: https://tlcpack.ai/wheels


## 3. Implement `make_conv1d_gpu_scheduler_func` function in `src.ops`

In that function, you are required to implemented 1D convolution and use TVM to optimize it.
Let $x \in \mathbb{R}^m$ and $y \in \mathbb{R}^n$, then
$$
\operatorname{conv1d}(x, y)_i = \sum_{j=-\infty}^{\infty} x[j]y[i-j], \forall i \in \{0, 1, \dots, m + n - 1\}
$$

Please use zero padding and unit stride. Please see the numpy convolution function for more detail: [link](https://numpy.org/doc/stable/reference/generated/numpy.convolve.html).

The `make_conv1d_gpu_scheduler_func` takes $m$ and $n$, which are the size of the two 1D input array.
You should return both the TVM scheduler and the TVM opterator for
1. Input $x$
2. Input $y$
3. Output $out$

The scheduler should be able to used to build a function with signature $func(x, y, out)$.
Please see the following cells for usage.

In [2]:
import time
import tvm
import numpy as np
from tvm import te

# benchmark for tvm implementation
def benchmark_conv1d_tvm(schedule_func, M, N, device, a_np, w_np, num_runs=30, repeat=20):
    s, A, W, B = schedule_func(M, N)
    func = tvm.build(s, [A, W, B], target="cuda")

    a_tvm = tvm.nd.array(a_np, device)
    w_tvm = tvm.nd.array(w_np, device)
    out_tvm = tvm.nd.array(np.zeros((M + N - 1,), dtype=a_np.dtype), device)

    evaluator = func.time_evaluator(func.entry_name, device, number=num_runs, repeat=repeat)
    cost = evaluator(a_tvm, w_tvm, out_tvm).mean  # average time in seconds
    return cost, out_tvm.asnumpy(), func, (s, A, W, B)

# benchmark for numpy
def benchmark_conv1d_numpy(a_np, w_np, num_runs=10):
    t0 = time.time()
    out = None
    for _ in range(num_runs):
        out = np.convolve(a_np, w_np)
    t1 = time.time()
    return (t1 - t0) / num_runs, out


In [3]:
# np baseline
M = 16384
N = 32
dtype = 'float32'
a_np = np.random.rand(M).astype(dtype)
w_np = np.random.rand(N).astype(dtype)
ref = np.convolve(a_np, w_np)


# naive baseline
def make_conv1d_gpu_scheduler_naive(M, N, dtype="float32", verbose=True):
    A = te.placeholder((M,), name="A", dtype=dtype)
    W = te.placeholder((N,), name="W", dtype=dtype)
    k = te.reduce_axis((0, M + N - 1), "k")   # k in [0, M+N-1)
    B = te.compute(
        (M + N - 1,),   # output shape, n from (0, M + N - 1)
        # if_then_else: if satisfy "any" condition, return 0 else A[k] * W[n - k]
        lambda n: te.sum(tvm.tir.if_then_else(
            tvm.tir.any(k < 0, k >= M, n - k < 0, n - k >= N),
            tvm.tir.const(0.0, "float32"),
            A[k] * W[n - k]), axis=k),
        name="B",
    )
    s = te.create_schedule(B.op)
    i = B.op.axis[0]
    s[B].bind(i, te.thread_axis("blockIdx.x"))
    if verbose:
        print("=" * 100)
        print(tvm.lower(s, [A, W, B], simple_mode=True))
        print("=" * 100)

    return s, A, W, B

In [None]:
# naive TVM
dev = tvm.cuda()
naive_time, naive_res, naive_func, naive_comp = benchmark_conv1d_tvm(
    make_conv1d_gpu_scheduler_naive, M, N, dev, a_np, w_np
)
np.testing.assert_allclose(naive_res, ref, rtol=1e-4)
print(f"[TVM Naive] time: {naive_time*1e3:.4f} ms")

# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((16384,), "float32"), W: T.Buffer((32,), "float32"), B: T.Buffer((16415,), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
        blockIdx_x = T.launch_thread("blockIdx.x", 16415)
        B[blockIdx_x] = T.float32(0)
        for k in range(16415):
            B[blockIdx_x] = B[blockIdx_x] + T.if_then_else(16384 <= k or blockIdx_x - k < 0 or 32 <= blockIdx_x - k, T.float32(0), A[k] * W[blockIdx_x - k])
[TVM Naive] time: 18.2860 ms


In [None]:
# optimize v1, compute refactor
def make_conv1d_gpu_scheduler_v1(M, N, dtype="float32", verbose=True):
    A = te.placeholder((M,), name="A", dtype=dtype)
    W = te.placeholder((N,), name="W", dtype=dtype)
    r = te.reduce_axis((0, N), name="r")
    B = te.compute(
        (M + N - 1,),
        lambda i: te.sum(
            tvm.tir.if_then_else(
                tvm.tir.all(i - r >= 0, i - r < M),
                A[i - r],
                tvm.tir.const(0, dtype)
            ) * W[r],
            axis=r
        ),
        name="B"
    )

    s = te.create_schedule(B.op)
    i = B.op.axis[0]
    s[B].bind(i, te.thread_axis("blockIdx.x"))
    if verbose:
        print("=" * 100)
        print(tvm.lower(s, [A, W, B], simple_mode=True))
        print("=" * 100)

    return s, A, W, B

# optimize v1: less if-else
dev = tvm.cuda()
naive_time, naive_res, naive_func, naive_comp = benchmark_conv1d_tvm(
    make_conv1d_gpu_scheduler_v1, M, N, dev, a_np, w_np
)
np.testing.assert_allclose(naive_res, ref, rtol=1e-4)
print(f"[TVM Opt v1] time: {naive_time*1e3:.4f} ms")

# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((16384,), "float32"), W: T.Buffer((32,), "float32"), B: T.Buffer((16415,), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
        blockIdx_x = T.launch_thread("blockIdx.x", 16415)
        B[blockIdx_x] = T.float32(0)
        for r in range(32):
            B[blockIdx_x] = B[blockIdx_x] + T.if_then_else(0 <= blockIdx_x - r and blockIdx_x - r < 16384, A[blockIdx_x - r], T.float32(0)) * W[r]
[TVM Opt v1] time: 0.1070 ms


In [None]:
# optimize v2: v1 + basic threads
def make_conv1d_gpu_scheduler_v2(M, N, dtype="float32", verbose=True):
    s, A, W, B = make_conv1d_gpu_scheduler_v1(M, N, dtype, False)

    # out axis
    i = B.op.axis[0]
    block_i, thread_i = s[B].split(i, factor=8)

    # bind to block and thread
    s[B].bind(block_i, te.thread_axis("blockIdx.x"))
    s[B].bind(thread_i, te.thread_axis("threadIdx.x"))

    if verbose:
        print("=" * 100)
        print(tvm.lower(s, [A, W, B], simple_mode=True))
        print("=" * 100)

    return s, A, W, B


# optimize v2
dev = tvm.cuda()
naive_time, naive_res, naive_func, naive_comp = benchmark_conv1d_tvm(
    make_conv1d_gpu_scheduler_v2, M, N, dev, a_np, w_np
)
np.testing.assert_allclose(naive_res, ref, rtol=1e-4)
print(f"[TVM Opt v2] time: {naive_time*1e3:.4f} ms")

# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((16384,), "float32"), W: T.Buffer((32,), "float32"), B: T.Buffer((16415,), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
        blockIdx_x = T.launch_thread("blockIdx.x", 2052)
        threadIdx_x = T.launch_thread("threadIdx.x", 8)
        if T.likely(blockIdx_x * 8 + threadIdx_x < 16415):
            B[blockIdx_x * 8 + threadIdx_x] = T.float32(0)
        for r in range(32):
            if T.likely(blockIdx_x * 8 + threadIdx_x < 16415):
                B[blockIdx_x * 8 + threadIdx_x] = B[blockIdx_x * 8 + threadIdx_x] + T.if_then_else(0 <= blockIdx_x * 8 + threadIdx_x - r and blockIdx_x * 8 + threadIdx_x - r < 16384, A[blockIdx_x * 8 + threadIdx_x - r], T.float32(0)) * W[r]
[TVM Opt v2] time: 0.0251 ms


In [None]:
# optimize v3: v1 + 2D threads
def make_conv1d_gpu_scheduler_v3(M, N, dtype="float32", verbose=True):
    s, A, W, B = make_conv1d_gpu_scheduler_v1(M, N, dtype, False)

    i = B.op.axis[0]
    block_i, thread_i = s[B].split(i, factor=16)
    warp_i, lane_i = s[B].split(thread_i, factor=4)

    s[B].bind(block_i, te.thread_axis("blockIdx.x"))
    s[B].bind(warp_i, te.thread_axis("threadIdx.y"))
    s[B].bind(lane_i, te.thread_axis("threadIdx.x"))

    if verbose:
        print("=" * 100)
        print(tvm.lower(s, [A, W, B], simple_mode=True))
        print("=" * 100)

    return s, A, W, B

dev = tvm.cuda()
naive_time, naive_res, naive_func, naive_comp = benchmark_conv1d_tvm(
    make_conv1d_gpu_scheduler_v3, M, N, dev, a_np, w_np
)
np.testing.assert_allclose(naive_res, ref, rtol=1e-4)
print(f"[TVM Opt v3] time: {naive_time*1e3:.4f} ms")

# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((16384,), "float32"), W: T.Buffer((32,), "float32"), B: T.Buffer((16415,), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
        blockIdx_x = T.launch_thread("blockIdx.x", 1026)
        threadIdx_y = T.launch_thread("threadIdx.y", 4)
        threadIdx_x = T.launch_thread("threadIdx.x", 4)
        if T.likely(blockIdx_x * 16 + threadIdx_y * 4 + threadIdx_x < 16415):
            B[blockIdx_x * 16 + threadIdx_y * 4 + threadIdx_x] = T.float32(0)
        for r in range(32):
            if T.likely(blockIdx_x * 16 + threadIdx_y * 4 + threadIdx_x < 16415):
                B[blockIdx_x * 16 + threadIdx_y * 4 + threadIdx_x] = B[blockIdx_x * 16 + threadIdx_y * 4 + threadIdx_x] + T.if_then_else(0 <= blockIdx_x * 16 + threadIdx_y * 4 + threadIdx_x - r and blockIdx_x * 16 + threadIdx_y * 4 + threadIdx_x - 

In [None]:
# optimize v4: v1 + 1D thread + cache + split reduce
def make_conv1d_gpu_scheduler_v4(M, N, dtype="float32", verbose=True):
    s, A, W, B = make_conv1d_gpu_scheduler_v1(M, N, dtype, False)

    # IMPORTANT: create caches BEFORE thread binding
    C_local = s.cache_write(B, "local")
    W_shared = s.cache_read(W, "shared", [C_local])

    i = B.op.axis[0]
    block_i, thread_i = s[B].split(i, factor=32)
    s[B].bind(block_i, te.thread_axis("blockIdx.x"))
    s[B].bind(thread_i, te.thread_axis("threadIdx.x"))

    # schedule the local cache
    s[C_local].compute_at(s[B], thread_i)

    i_local = C_local.op.axis[0]
    rx = C_local.op.reduce_axis[0]
    # split the reduction axis
    rxo, rxi = s[C_local].split(rx, factor=4)

    # schedule shared memory
    s[W_shared].compute_at(s[C_local], rxo)

    if verbose:
        print("=" * 100)
        print(tvm.lower(s, [A, W, B], simple_mode=True))
        print("=" * 100)

    return s, A, W, B

dev = tvm.cuda()
naive_time, naive_res, naive_func, naive_comp = benchmark_conv1d_tvm(
    make_conv1d_gpu_scheduler_v4, M, N, dev, a_np, w_np
)
np.testing.assert_allclose(naive_res, ref, rtol=1e-4)
print(f"[TVM Opt v4] time: {naive_time*1e3:.4f} ms")

# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((16384,), "float32"), W: T.Buffer((32,), "float32"), B: T.Buffer((16415,), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
        blockIdx_x = T.launch_thread("blockIdx.x", 513)
        B_local = T.allocate([1], "float32", "local")
        W_shared = T.allocate([4], "float32", "shared")
        threadIdx_x = T.launch_thread("threadIdx.x", 32)
        B_local_1 = T.Buffer((1,), data=B_local, scope="local", align=4)
        B_local_1[0] = T.float32(0)
        for r_outer in range(8):
            W_shared_1 = T.Buffer((4,), data=W_shared, scope="shared", align=16)
            for ax0 in range(4):
                W_shared_1[ax0] = W[r_outer * 4 + ax0]
            for r_inner in range(4):
                if T.likely(blockIdx_x * 32 + threadIdx_x < 16415):
                    B_local_1[0] = B_local_1

In [None]:
!nvidia-smi

Sun Mar 23 19:26:52 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   46C    P0             26W /   70W |     102MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [None]:
# optimize v5: v4 + 2D threads + unroll
def make_conv1d_gpu_scheduler_v5(M, N, dtype="float32", verbose=True):
    s, A, W, B = make_conv1d_gpu_scheduler_v1(M, N, dtype, False)

    C_local = s.cache_write(B, "local")
    W_shared = s.cache_read(W, "shared", [C_local])

    i = B.op.axis[0]
    block_i, thread_i = s[B].split(i, factor=32)
    # split 2D threads
    warp_i, lane_i = s[B].split(thread_i, factor=4)
    s[B].bind(block_i, te.thread_axis("blockIdx.x"))
    s[B].bind(warp_i, te.thread_axis("threadIdx.y"))
    s[B].bind(lane_i, te.thread_axis("threadIdx.x"))

    s[C_local].compute_at(s[B], lane_i)

    rx = C_local.op.reduce_axis[0]

    # split the reduce axis
    rxo, rxi = s[C_local].split(rx, factor=8)

    s[W_shared].compute_at(s[C_local], rxo)

    # unroll
    s[C_local].unroll(rxi)

    if verbose:
        print("=" * 100)
        print(tvm.lower(s, [A, W, B], simple_mode=True))
        print("=" * 100)

    return s, A, W, B

dev = tvm.cuda()
naive_time, naive_res, naive_func, naive_comp = benchmark_conv1d_tvm(
    make_conv1d_gpu_scheduler_v5, M, N, dev, a_np, w_np
)
np.testing.assert_allclose(naive_res, ref, rtol=1e-4)
print(f"[TVM Opt v5] time: {naive_time*1e3:.4f} ms")

# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((16384,), "float32"), W: T.Buffer((32,), "float32"), B: T.Buffer((16415,), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
        blockIdx_x = T.launch_thread("blockIdx.x", 513)
        B_local = T.allocate([1], "float32", "local")
        W_shared = T.allocate([8], "float32", "shared")
        threadIdx_y = T.launch_thread("threadIdx.y", 8)
        threadIdx_x = T.launch_thread("threadIdx.x", 4)
        B_local_1 = T.Buffer((1,), data=B_local, scope="local", align=4)
        B_local_1[0] = T.float32(0)
        for r_outer in range(4):
            W_shared_1 = T.Buffer((8,), data=W_shared, scope="shared", align=32)
            for ax0 in range(8):
                W_shared_1[ax0] = W[r_outer * 8 + ax0]
            if T.likely(blockIdx_x * 32 + threadIdx_y * 4 + threadIdx_x < 16415):
            

In [5]:
import tvm
from tvm import te
import numpy as np
from tvm import autotvm
from tvm.autotvm.tuner import XGBTuner, RandomTuner
import time

@autotvm.template("conv1d_gpu")
def conv1d_gpu_template_simple(M, N, dtype="float32"):
    A = te.placeholder((M,), name="A", dtype=dtype)
    W = te.placeholder((N,), name="W", dtype=dtype)
    r = te.reduce_axis((0, N), name="r")

    B = te.compute(
        (M + N - 1,),
        lambda i: te.sum(
            tvm.tir.if_then_else(
                tvm.tir.all(i - r >= 0, i - r < M),
                A[i - r],
                tvm.tir.const(0, dtype)
            ) * W[r],
            axis=r
        ),
        name="B"
    )

    s = te.create_schedule(B.op)
    cfg = autotvm.get_config()

    cfg.define_knob("use_cache", [0, 1])
    cfg.define_knob("thread_x", [1,2,4,8,16])
    i = B.op.axis[0]

    if cfg["use_cache"].val:
        C_local = s.cache_write(B, "local")
        W_shared = s.cache_read(W, "shared", [C_local])

    block_i, thread_i = s[B].split(i, factor=cfg["thread_x"].val)
    s[B].bind(block_i, te.thread_axis("blockIdx.x"))
    s[B].bind(thread_i, te.thread_axis("threadIdx.x"))

    if cfg["use_cache"].val:
        C_local = s.cache_write(B, "local")
        W_shared = s.cache_read(W, "shared", [C_local])

        s[C_local].compute_at(s[B], thread_i)
        rx = C_local.op.reduce_axis[0]

        cfg.define_knob("split_reduction", [0, 1])
        if cfg["split_reduction"].val:
            rxo, rxi = s[C_local].split(rx, factor=8)
            s[W_shared].compute_at(s[C_local], rxo)
        else:
            s[W_shared].compute_at(s[C_local], rx)

    return s, [A, W, B]

def tune_conv1d_gpu_simple(M, N, dtype="float32",
                          log_file="conv1d_gpu_simple.log",
                          tuning_rounds=100):
    task = autotvm.task.create("conv1d_gpu_simple", args=(M, N, dtype), target="cuda")
    measure_option = autotvm.measure_option(
        builder=autotvm.LocalBuilder(),
        runner=autotvm.LocalRunner(number=5, repeat=3, min_repeat_ms=100, timeout=4)
    )

    tuner = RandomTuner(task)

    tuner.tune(
        n_trial=tuning_rounds,
        measure_option=measure_option,
        callbacks=[autotvm.callback.log_to_file(log_file)]
    )

    with autotvm.apply_history_best(log_file):
        with tvm.target.Target("cuda"):
            s, args = conv1d_gpu_template_simple(M, N, dtype)
            func = tvm.build(s, args)

    a_np = np.random.rand(M).astype(dtype)
    w_np = np.random.rand(N).astype(dtype)

    dev = tvm.cuda()
    a_tvm = tvm.nd.array(a_np, dev)
    w_tvm = tvm.nd.array(w_np, dev)
    b_tvm = tvm.nd.array(np.zeros((M + N - 1,), dtype=dtype), dev)

    func(a_tvm, w_tvm, b_tvm)

    evaluator = func.time_evaluator(func.entry_name, dev, number=10, repeat=3)
    time_cost = evaluator(a_tvm, w_tvm, b_tvm).mean

    ref = np.convolve(a_np, w_np)

    np.testing.assert_allclose(b_tvm.asnumpy(), ref, rtol=1e-4)

    print(f"best config time usage: {time_cost * 1e3:.4f} ms")
    return func, (time_cost * 1e3)

def make_conv1d_gpu_scheduler_autotvm_simple(M, N, dtype="float32", verbose=True, log_file="conv1d_gpu_simple.log"):
    with autotvm.apply_history_best(log_file):
        with tvm.target.Target("cuda"):
            s, args = conv1d_gpu_template_simple(M, N, dtype)
            if verbose:
                print("=" * 100)
                print(tvm.lower(s, args, simple_mode=True))
                print("=" * 100)
            return s, args[0], args[1], args[2]

def benchmark_conv1d_tvm(schedule_func, M, N, device, a_np, w_np, num_runs=10, repeat=3, log_file=None):
    if log_file and 'autotvm' in schedule_func.__name__:
        s, A, W, B = schedule_func(M, N, log_file=log_file)
    else:
        s, A, W, B = schedule_func(M, N)

    func = tvm.build(s, [A, W, B], target="cuda")
    a_tvm = tvm.nd.array(a_np, device)
    w_tvm = tvm.nd.array(w_np, device)
    out_tvm = tvm.nd.array(np.zeros((M + N - 1,), dtype=a_np.dtype), device)

    evaluator = func.time_evaluator(func.entry_name, device, number=num_runs, repeat=repeat)
    cost = evaluator(a_tvm, w_tvm, out_tvm).mean

    return cost, out_tvm.asnumpy(), func, (s, A, W, B)



In [7]:
M = 16384
N = 32
dtype = 'float32'
log_file = "conv1d_gpu_simple.log"

tuning_rounds = 50

print("===== autotvm =====")
tune_conv1d_gpu_simple(M, N, dtype, log_file, tuning_rounds)

a_np = np.random.rand(M).astype(dtype)
w_np = np.random.rand(N).astype(dtype)
ref = np.convolve(a_np, w_np)

dev = tvm.cuda()
autotvm_time, autotvm_res, _, _ = benchmark_conv1d_tvm(
    make_conv1d_gpu_scheduler_autotvm_simple, M, N, dev, a_np, w_np,
    log_file=log_file
)
np.testing.assert_allclose(autotvm_res, ref, rtol=1e-4)

print(f"[TVM Opt Auto] time: {autotvm_time*1e3:.4f} ms")

===== autotvm =====
best config time usage: 0.0404 ms
# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((16384,), "float32"), W: T.Buffer((32,), "float32"), B: T.Buffer((16415,), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
        blockIdx_x = T.launch_thread("blockIdx.x", 16415)
        threadIdx_x = T.launch_thread("threadIdx.x", 1)
        B[blockIdx_x] = T.float32(0)
        for r in range(32):
            B[blockIdx_x] = B[blockIdx_x] + T.if_then_else(0 <= blockIdx_x - r and blockIdx_x - r < 16384, A[blockIdx_x - r], T.float32(0)) * W[r]
[TVM Opt Auto] time: 0.0405 ms


In [None]:
# numPy baseline
numpy_time, numpy_out = benchmark_conv1d_numpy(a_np, w_np, num_runs=10)
print(f"[NumPy]   time: {numpy_time*1e3:.4f} ms")
np.testing.assert_allclose(numpy_out, ref, rtol=1e-4)


[NumPy]   time: 0.2369 ms


In [None]:
import torch
import torch.nn.functional as F

def benchmark_conv1d_torch(a_np, w_np, num_runs=10, device='cuda'):

    a_torch = torch.tensor(a_np, device=device, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
    w_torch = torch.tensor(w_np, device=device, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
    w_torch = w_torch.flip(-1)

    for _ in range(5):
        _ = F.conv1d(a_torch, w_torch, padding=(w_np.shape[0] - 1))

    torch.cuda.synchronize()
    t0 = time.time()
    for _ in range(num_runs):
        out_torch = F.conv1d(a_torch, w_torch, padding=(w_np.shape[0] - 1))
    torch.cuda.synchronize()
    t1 = time.time()

    avg_time = (t1 - t0) / num_runs
    out_arr = out_torch.view(-1).cpu().detach().numpy()
    return avg_time, out_arr

torch_time, torch_res = benchmark_conv1d_torch(a_np, w_np, num_runs=10, device='cuda')
np.testing.assert_allclose(torch_res, ref, rtol=1e-4)
print(f"[Torch] time: {torch_time*1e3:.4f} ms")


[Torch] time: 0.1491 ms


In [None]:
# pytest
%pip install ipytest
import ipytest
ipytest.autoconfig()


Collecting ipytest
  Downloading ipytest-0.14.2-py3-none-any.whl.metadata (17 kB)
Collecting jedi>=0.16 (from ipython->ipytest)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading ipytest-0.14.2-py3-none-any.whl (18 kB)
Downloading jedi-0.19.2-py2.py3-none-any.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m73.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: jedi, ipytest
Successfully installed ipytest-0.14.2 jedi-0.19.2


In [None]:
%%ipytest
import tvm
import torch
import pytest
import timeit
import numpy as np
import torch.nn.functional as F

dev = tvm.cuda(0)

make_conv1d_gpu_scheduler = make_conv1d_gpu_scheduler_v5

def ans_np(a_np, w_np):
    a_np = a_np.flatten()
    w_np = w_np.flatten()
    return np.convolve(a_np, w_np)

def make_conv1d_gpu_func(M, N):
    s, A, W, O = make_conv1d_gpu_scheduler(M, N)
    func = tvm.build(s, [A, W, O], "cuda")
    return func


def ans_torch(a_torch, w_torch):
    M, N = a_torch.size(0), w_torch.size(0)
    torch.cuda.synchronize()
    b_torch = F.conv1d(a_torch, w_torch, bias=None, stride=1,
                       padding=(N - 1), dilation=1, groups=1)
    torch.cuda.synchronize()
    return b_torch


@pytest.mark.parametrize('execution_number', range(5))
def test1_M1_N1(execution_number):
    # Define dimension
    M = 1
    N = 1
    func = make_conv1d_gpu_func(M, N)

    # Create random test data
    np.random.seed(seed=execution_number)
    a_np = np.random.rand(M).astype(np.float32)
    w_np = np.random.rand(N).astype(np.float32)
    b_np = ans_np(a_np, w_np)

    a = tvm.nd.array(a_np, dev)
    w = tvm.nd.array(w_np, dev)
    b = tvm.nd.array(np.zeros((M + N - 1), dtype='float32'), dev)
    func(a, w, b)
    b_out = b.numpy()

    assert b_np.shape == b_out.shape, \
        "Shape mismatch: " + str(b_np.shape) + "\t" + str(b_out.shape)
    assert np.allclose(b_np, b_out), "Value mismatch: %s %s" % (b_np, b_out)


@pytest.mark.parametrize('execution_number', [1, 10, 100, 1000, 10000])
def test1_Mvar_N1024(execution_number):
    # Define dimension
    M = execution_number
    N = 1024
    func = make_conv1d_gpu_func(M, N)

    # Create random test data
    np.random.seed(seed=1024)
    a_np = np.random.rand(M).astype(np.float32)
    w_np = np.random.rand(N).astype(np.float32)
    b_np = ans_np(a_np, w_np)

    a = tvm.nd.array(a_np, dev)
    w = tvm.nd.array(w_np, dev)
    b = tvm.nd.array(np.zeros((M + N - 1), dtype='float32'), dev)
    func(a, w, b)
    b_out = b.numpy()

    assert b_np.shape == b_out.shape, \
        "Shape mismatch: " + str(b_np.shape) + "\t" + str(b_out.shape)
    assert np.allclose(b_np, b_out), "Value mismatch: %s %s" % (b_np, b_out)


@pytest.mark.parametrize('execution_number', [1, 10, 100, 1000, 10000])
def test1_M1024_Nvar(execution_number):
    # Define dimension
    M = 1024
    N = execution_number
    func = make_conv1d_gpu_func(M, N)

    # Create random test data
    np.random.seed(seed=1024)
    a_np = np.random.rand(M).astype(np.float32)
    w_np = np.random.rand(N).astype(np.float32)
    b_np = ans_np(a_np, w_np)

    a = tvm.nd.array(a_np, dev)
    w = tvm.nd.array(w_np, dev)
    b = tvm.nd.array(np.zeros((M + N - 1), dtype='float32'), dev)
    func(a, w, b)
    b_out = b.numpy()

    assert b_np.shape == b_out.shape, \
        "Shape mismatch: " + str(b_np.shape) + "\t" + str(b_out.shape)
    assert np.allclose(b_np, b_out), "Value mismatch: %s %s" % (b_np, b_out)


[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                              [100%][0m
[32m[32m[1m15 passed[0m[32m in 9.11s[0m[0m
