Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions compute/bulk_upload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package compute

import (
"math"
"testing"

"github.com/zerfoo/ztensor/internal/cuda"
"github.com/zerfoo/ztensor/numeric"
"github.com/zerfoo/ztensor/tensor"
)

// TestGPUEngine_UploadWeights_BulkPath verifies that a many-tensor upload
// (above the bulk threshold) collapses into a single allocation, that data
// is preserved across all uploaded tensors, and that the engine retains
// ownership of the underlying device buffer (zerfoo/ztensor#103).
func TestGPUEngine_UploadWeights_BulkPath(t *testing.T) {
if !cuda.Available() {
t.Skip("CUDA not available")
}

ops := numeric.Float32Ops{}
gpuEng, err := NewGPUEngine[float32](ops)
if err != nil {
t.Fatalf("NewGPUEngine: %v", err)
}
defer func() { _ = gpuEng.Close() }()

// N must exceed bulkUploadF32MinTensors (64) to exercise the bulk path.
const N = 128
const elemsPer = 17

tensors := make([]*tensor.TensorNumeric[float32], N)
for i := range N {
data := make([]float32, elemsPer)
for j := range elemsPer {
data[j] = float32(i*1000 + j)
}
tt, _ := tensor.New[float32]([]int{elemsPer}, data)
tensors[i] = tt
}

if got := len(gpuEng.bulkUploadBuffers); got != 0 {
t.Fatalf("bulkUploadBuffers before upload = %d, want 0", got)
}

if err := gpuEng.UploadWeights(tensors); err != nil {
t.Fatalf("UploadWeights: %v", err)
}

// One bulk buffer should now back all N tensors.
if got := len(gpuEng.bulkUploadBuffers); got != 1 {
t.Fatalf("bulkUploadBuffers after upload = %d, want 1 (bulk path collapses to one allocation)", got)
}

// Every tensor must now be GPU-resident.
for i, tt := range tensors {
if _, ok := tt.GetStorage().(*tensor.GPUStorage[float32]); !ok {
t.Fatalf("tensor[%d] storage = %T, want *GPUStorage[float32]", i, tt.GetStorage())
}
}

// Round-trip a sample to verify data preservation across the bulk copy.
for _, i := range []int{0, 1, N / 2, N - 1} {
got := tensors[i].Data()
want := float32(i*1000 + (elemsPer - 1))
if math.Abs(float64(got[elemsPer-1]-want)) > 1e-6 {
t.Errorf("tensor[%d][%d] = %f, want %f", i, elemsPer-1, got[elemsPer-1], want)
}
}
}

// TestGPUEngine_UploadWeights_BelowBulkThreshold verifies that small inputs
// stay on the per-tensor path and the bulk allocation slice remains empty.
func TestGPUEngine_UploadWeights_BelowBulkThreshold(t *testing.T) {
if !cuda.Available() {
t.Skip("CUDA not available")
}

ops := numeric.Float32Ops{}
gpuEng, err := NewGPUEngine[float32](ops)
if err != nil {
t.Fatalf("NewGPUEngine: %v", err)
}
defer func() { _ = gpuEng.Close() }()

// Below bulkUploadF32MinTensors=64.
const N = 8
tensors := make([]*tensor.TensorNumeric[float32], N)
for i := range N {
data := make([]float32, 4)
for j := range 4 {
data[j] = float32(i + j)
}
tt, _ := tensor.New[float32]([]int{4}, data)
tensors[i] = tt
}

if err := gpuEng.UploadWeights(tensors); err != nil {
t.Fatalf("UploadWeights: %v", err)
}
if got := len(gpuEng.bulkUploadBuffers); got != 0 {
t.Errorf("bulkUploadBuffers = %d, want 0 (below threshold should use per-tensor path)", got)
}
for i, tt := range tensors {
if _, ok := tt.GetStorage().(*tensor.GPUStorage[float32]); !ok {
t.Fatalf("tensor[%d] storage = %T, want *GPUStorage[float32]", i, tt.GetStorage())
}
}
}
121 changes: 120 additions & 1 deletion compute/gpu_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ type GPUEngine[T tensor.Numeric] struct {
// Incremented atomically in allocWeight when capture is detected;
// checked and reset in EndCapture.
captureAllocCount atomic.Int64

// bulkUploadBuffers retains the device pointers allocated by the
// UploadWeights bulk path. Each one backs N non-owning view tensors
// whose Free is a no-op, so the engine owns the underlying allocation
// and frees it in Close. Empty on engines that never receive a
// large-N UploadWeights call.
bulkUploadBuffers []unsafe.Pointer
}

// NewGPUEngine creates a new GPUEngine backed by CUDA via the GRAL abstraction.
Expand Down Expand Up @@ -347,9 +354,111 @@ func (e *GPUEngine[T]) checkVRAMBounds(op string, allocBytes int) error {
// On devices with managed memory support (e.g., GB10), weights are allocated
// with cudaMallocManaged and populated via direct CPU memcpy. The GPU can
// then access them without any explicit H2D transfer.
// bulkUploadF32MinTensors is the threshold above which UploadWeights
// collapses many small per-tensor cudaMalloc + cudaMemcpy round-trips into
// a single allocation and a single H2D copy. The per-tensor pattern wedges
// the CUDA driver on GB10 Blackwell when N is in the tens-of-thousands
// (zerfoo/ztensor#103); the bulk path keeps a constant number of driver
// round-trips regardless of input size.
const bulkUploadF32MinTensors = 64

// bulkUploadF32 fast-paths the F32 weight upload by allocating one device
// buffer for all eligible tensors and performing one H2D copy. Each tensor
// receives a non-owning GPUStorage view into the bulk buffer; the engine
// retains the bulk pointer and frees it in Close.
//
// Returns the number of tensors that were bulk-uploaded. The caller's
// per-tensor loop will naturally skip these because their storage is now
// *tensor.GPUStorage[float32].
//
// Bulk path is skipped when:
// - CUDA graph capture is active (per-tensor path emits async ops as
// graph nodes; bulk would offer no benefit at the typical capture-
// time tensor counts).
// - Fewer than bulkUploadF32MinTensors tensors are eligible.
func (e *GPUEngine[T]) bulkUploadF32(tensors []*tensor.TensorNumeric[float32]) (int, error) {
if cap, ok := e.pool.(gpuapi.CaptureAwareAllocator); ok && cap.IsCapturing() {
return 0, nil
}

type entry struct {
t *tensor.TensorNumeric[float32]
offset int
nelem int
}
eligible := make([]entry, 0, len(tensors))
total := 0
for _, t := range tensors {
if t == nil {
continue
}
switch any(t.GetStorage()).(type) {
case *tensor.GPUStorage[float32], *tensor.Float16Storage,
*tensor.Q4Storage, *tensor.Q4KStorage, *tensor.Q5_0Storage,
*tensor.Q5KStorage, *tensor.Q6KStorage, *tensor.Q8Storage,
*tensor.MmapStorage, *tensor.FP8E4M3Storage,
*tensor.BFloat16Storage:
continue
}
n := len(t.Data())
if n == 0 {
continue
}
eligible = append(eligible, entry{t: t, offset: total, nelem: n})
total += n * f32Size
}
if len(eligible) < bulkUploadF32MinTensors {
return 0, nil
}
if err := e.ensureNotCapturing(); err != nil {
return 0, err
}

var devPtr unsafe.Pointer
var err error
if e.managedMem {
devPtr, err = mallocManagedFn(total)
} else {
devPtr, err = e.runtime.Malloc(total)
}
if err != nil {
return 0, fmt.Errorf("bulk alloc f32 (%d tensors, %d bytes): %w",
len(eligible), total, err)
}

if e.managedMem {
dst := unsafe.Slice((*byte)(devPtr), total)
for _, en := range eligible {
src := unsafe.Slice((*byte)(unsafe.Pointer(&en.t.Data()[0])), en.nelem*f32Size)
copy(dst[en.offset:en.offset+en.nelem*f32Size], src)
}
} else {
host := make([]byte, total)
for _, en := range eligible {
src := unsafe.Slice((*byte)(unsafe.Pointer(&en.t.Data()[0])), en.nelem*f32Size)
copy(host[en.offset:en.offset+en.nelem*f32Size], src)
}
if err := e.runtime.Memcpy(devPtr, unsafe.Pointer(&host[0]), total, gpuapi.MemcpyHostToDevice); err != nil {
_ = e.runtime.Free(devPtr)
return 0, fmt.Errorf("bulk H2D f32 (%d bytes): %w", total, err)
}
}

e.bulkUploadBuffers = append(e.bulkUploadBuffers, devPtr)
for _, en := range eligible {
sub := unsafe.Add(devPtr, en.offset)
view := tensor.NewGPUStorageViewFromPtr[float32](sub, en.nelem, e.deviceID)
en.t.SetStorage(view)
}
return len(eligible), nil
}

func (e *GPUEngine[T]) UploadWeights(tensors []*tensor.TensorNumeric[float32]) error {
e.setDevice()
uploaded := 0
uploaded, err := e.bulkUploadF32(tensors)
if err != nil {
return err
}
q4Uploaded := 0
for _, t := range tensors {
if t == nil {
Expand Down Expand Up @@ -838,6 +947,16 @@ func (e *GPUEngine[T]) DestroyGraph(handle GraphHandle) error {
func (e *GPUEngine[T]) Close() error {
var firstErr error

// Free bulk-upload buffers (one per UploadWeights call that hit the
// bulk path). Per-tensor views into these buffers have a no-op Free,
// so the engine owns the lifetime.
for _, ptr := range e.bulkUploadBuffers {
if err := e.runtime.Free(ptr); err != nil && firstErr == nil {
firstErr = err
}
}
e.bulkUploadBuffers = nil

// Free FP8 scratch buffers before draining the pool.
if e.fp8Scratch != nil {
e.fp8Scratch.free(e.pool, e.deviceID)
Expand Down
Loading