From f7ab43b02cd349b0af33040280d26673f100a778 Mon Sep 17 00:00:00 2001 From: David Ndungu Date: Tue, 28 Apr 2026 20:44:57 -0700 Subject: [PATCH] feat(compute): bulk-upload F32 weights to one device buffer (#103) UploadWeights previously issued one cudaMalloc + cudaMemcpy per input tensor. Wolf's CrossAsset training pre-uploads ~106K sample tensors plus ~50 graph parameters in a single call, producing ~213K back-to-back synchronous driver round-trips. On GB10 Blackwell with unified memory, the driver's allocation-table lock contends with the default-stream queue and wedges the CUDA context: nvidia-smi hangs, podman cannot tear down the container, no error surfaces because cudaMalloc never returns. Add a bulk path that activates when >= 64 eligible F32 tensors are queued. It computes the total byte size, issues one allocWeight, copies all source data into the resulting buffer (one cudaMemcpy H2D, or one host-side memcpy on managed-memory hosts), and creates non-owning GPUStorage views per tensor. The engine retains the bulk pointer in bulkUploadBuffers and frees it in Close. Capture-time uploads bypass the bulk path: the per-tensor route already emits async ops as graph nodes, and capture-time tensor counts are small. Tests cover both the bulk activation (N=128) and the below-threshold fallback (N=8) on real CUDA. Both build and vet clean on aarch64 and amd64; the existing compute test suite is unchanged. Closes zerfoo/ztensor#103. --- compute/bulk_upload_test.go | 109 ++++++++++++++++++++++++++++++++ compute/gpu_engine.go | 121 +++++++++++++++++++++++++++++++++++- 2 files changed, 229 insertions(+), 1 deletion(-) create mode 100644 compute/bulk_upload_test.go diff --git a/compute/bulk_upload_test.go b/compute/bulk_upload_test.go new file mode 100644 index 0000000..d164704 --- /dev/null +++ b/compute/bulk_upload_test.go @@ -0,0 +1,109 @@ +package compute + +import ( + "math" + "testing" + + "github.com/zerfoo/ztensor/internal/cuda" + "github.com/zerfoo/ztensor/numeric" + "github.com/zerfoo/ztensor/tensor" +) + +// TestGPUEngine_UploadWeights_BulkPath verifies that a many-tensor upload +// (above the bulk threshold) collapses into a single allocation, that data +// is preserved across all uploaded tensors, and that the engine retains +// ownership of the underlying device buffer (zerfoo/ztensor#103). +func TestGPUEngine_UploadWeights_BulkPath(t *testing.T) { + if !cuda.Available() { + t.Skip("CUDA not available") + } + + ops := numeric.Float32Ops{} + gpuEng, err := NewGPUEngine[float32](ops) + if err != nil { + t.Fatalf("NewGPUEngine: %v", err) + } + defer func() { _ = gpuEng.Close() }() + + // N must exceed bulkUploadF32MinTensors (64) to exercise the bulk path. + const N = 128 + const elemsPer = 17 + + tensors := make([]*tensor.TensorNumeric[float32], N) + for i := range N { + data := make([]float32, elemsPer) + for j := range elemsPer { + data[j] = float32(i*1000 + j) + } + tt, _ := tensor.New[float32]([]int{elemsPer}, data) + tensors[i] = tt + } + + if got := len(gpuEng.bulkUploadBuffers); got != 0 { + t.Fatalf("bulkUploadBuffers before upload = %d, want 0", got) + } + + if err := gpuEng.UploadWeights(tensors); err != nil { + t.Fatalf("UploadWeights: %v", err) + } + + // One bulk buffer should now back all N tensors. + if got := len(gpuEng.bulkUploadBuffers); got != 1 { + t.Fatalf("bulkUploadBuffers after upload = %d, want 1 (bulk path collapses to one allocation)", got) + } + + // Every tensor must now be GPU-resident. + for i, tt := range tensors { + if _, ok := tt.GetStorage().(*tensor.GPUStorage[float32]); !ok { + t.Fatalf("tensor[%d] storage = %T, want *GPUStorage[float32]", i, tt.GetStorage()) + } + } + + // Round-trip a sample to verify data preservation across the bulk copy. + for _, i := range []int{0, 1, N / 2, N - 1} { + got := tensors[i].Data() + want := float32(i*1000 + (elemsPer - 1)) + if math.Abs(float64(got[elemsPer-1]-want)) > 1e-6 { + t.Errorf("tensor[%d][%d] = %f, want %f", i, elemsPer-1, got[elemsPer-1], want) + } + } +} + +// TestGPUEngine_UploadWeights_BelowBulkThreshold verifies that small inputs +// stay on the per-tensor path and the bulk allocation slice remains empty. +func TestGPUEngine_UploadWeights_BelowBulkThreshold(t *testing.T) { + if !cuda.Available() { + t.Skip("CUDA not available") + } + + ops := numeric.Float32Ops{} + gpuEng, err := NewGPUEngine[float32](ops) + if err != nil { + t.Fatalf("NewGPUEngine: %v", err) + } + defer func() { _ = gpuEng.Close() }() + + // Below bulkUploadF32MinTensors=64. + const N = 8 + tensors := make([]*tensor.TensorNumeric[float32], N) + for i := range N { + data := make([]float32, 4) + for j := range 4 { + data[j] = float32(i + j) + } + tt, _ := tensor.New[float32]([]int{4}, data) + tensors[i] = tt + } + + if err := gpuEng.UploadWeights(tensors); err != nil { + t.Fatalf("UploadWeights: %v", err) + } + if got := len(gpuEng.bulkUploadBuffers); got != 0 { + t.Errorf("bulkUploadBuffers = %d, want 0 (below threshold should use per-tensor path)", got) + } + for i, tt := range tensors { + if _, ok := tt.GetStorage().(*tensor.GPUStorage[float32]); !ok { + t.Fatalf("tensor[%d] storage = %T, want *GPUStorage[float32]", i, tt.GetStorage()) + } + } +} diff --git a/compute/gpu_engine.go b/compute/gpu_engine.go index 68d8a19..0c3f1cb 100644 --- a/compute/gpu_engine.go +++ b/compute/gpu_engine.go @@ -133,6 +133,13 @@ type GPUEngine[T tensor.Numeric] struct { // Incremented atomically in allocWeight when capture is detected; // checked and reset in EndCapture. captureAllocCount atomic.Int64 + + // bulkUploadBuffers retains the device pointers allocated by the + // UploadWeights bulk path. Each one backs N non-owning view tensors + // whose Free is a no-op, so the engine owns the underlying allocation + // and frees it in Close. Empty on engines that never receive a + // large-N UploadWeights call. + bulkUploadBuffers []unsafe.Pointer } // NewGPUEngine creates a new GPUEngine backed by CUDA via the GRAL abstraction. @@ -347,9 +354,111 @@ func (e *GPUEngine[T]) checkVRAMBounds(op string, allocBytes int) error { // On devices with managed memory support (e.g., GB10), weights are allocated // with cudaMallocManaged and populated via direct CPU memcpy. The GPU can // then access them without any explicit H2D transfer. +// bulkUploadF32MinTensors is the threshold above which UploadWeights +// collapses many small per-tensor cudaMalloc + cudaMemcpy round-trips into +// a single allocation and a single H2D copy. The per-tensor pattern wedges +// the CUDA driver on GB10 Blackwell when N is in the tens-of-thousands +// (zerfoo/ztensor#103); the bulk path keeps a constant number of driver +// round-trips regardless of input size. +const bulkUploadF32MinTensors = 64 + +// bulkUploadF32 fast-paths the F32 weight upload by allocating one device +// buffer for all eligible tensors and performing one H2D copy. Each tensor +// receives a non-owning GPUStorage view into the bulk buffer; the engine +// retains the bulk pointer and frees it in Close. +// +// Returns the number of tensors that were bulk-uploaded. The caller's +// per-tensor loop will naturally skip these because their storage is now +// *tensor.GPUStorage[float32]. +// +// Bulk path is skipped when: +// - CUDA graph capture is active (per-tensor path emits async ops as +// graph nodes; bulk would offer no benefit at the typical capture- +// time tensor counts). +// - Fewer than bulkUploadF32MinTensors tensors are eligible. +func (e *GPUEngine[T]) bulkUploadF32(tensors []*tensor.TensorNumeric[float32]) (int, error) { + if cap, ok := e.pool.(gpuapi.CaptureAwareAllocator); ok && cap.IsCapturing() { + return 0, nil + } + + type entry struct { + t *tensor.TensorNumeric[float32] + offset int + nelem int + } + eligible := make([]entry, 0, len(tensors)) + total := 0 + for _, t := range tensors { + if t == nil { + continue + } + switch any(t.GetStorage()).(type) { + case *tensor.GPUStorage[float32], *tensor.Float16Storage, + *tensor.Q4Storage, *tensor.Q4KStorage, *tensor.Q5_0Storage, + *tensor.Q5KStorage, *tensor.Q6KStorage, *tensor.Q8Storage, + *tensor.MmapStorage, *tensor.FP8E4M3Storage, + *tensor.BFloat16Storage: + continue + } + n := len(t.Data()) + if n == 0 { + continue + } + eligible = append(eligible, entry{t: t, offset: total, nelem: n}) + total += n * f32Size + } + if len(eligible) < bulkUploadF32MinTensors { + return 0, nil + } + if err := e.ensureNotCapturing(); err != nil { + return 0, err + } + + var devPtr unsafe.Pointer + var err error + if e.managedMem { + devPtr, err = mallocManagedFn(total) + } else { + devPtr, err = e.runtime.Malloc(total) + } + if err != nil { + return 0, fmt.Errorf("bulk alloc f32 (%d tensors, %d bytes): %w", + len(eligible), total, err) + } + + if e.managedMem { + dst := unsafe.Slice((*byte)(devPtr), total) + for _, en := range eligible { + src := unsafe.Slice((*byte)(unsafe.Pointer(&en.t.Data()[0])), en.nelem*f32Size) + copy(dst[en.offset:en.offset+en.nelem*f32Size], src) + } + } else { + host := make([]byte, total) + for _, en := range eligible { + src := unsafe.Slice((*byte)(unsafe.Pointer(&en.t.Data()[0])), en.nelem*f32Size) + copy(host[en.offset:en.offset+en.nelem*f32Size], src) + } + if err := e.runtime.Memcpy(devPtr, unsafe.Pointer(&host[0]), total, gpuapi.MemcpyHostToDevice); err != nil { + _ = e.runtime.Free(devPtr) + return 0, fmt.Errorf("bulk H2D f32 (%d bytes): %w", total, err) + } + } + + e.bulkUploadBuffers = append(e.bulkUploadBuffers, devPtr) + for _, en := range eligible { + sub := unsafe.Add(devPtr, en.offset) + view := tensor.NewGPUStorageViewFromPtr[float32](sub, en.nelem, e.deviceID) + en.t.SetStorage(view) + } + return len(eligible), nil +} + func (e *GPUEngine[T]) UploadWeights(tensors []*tensor.TensorNumeric[float32]) error { e.setDevice() - uploaded := 0 + uploaded, err := e.bulkUploadF32(tensors) + if err != nil { + return err + } q4Uploaded := 0 for _, t := range tensors { if t == nil { @@ -838,6 +947,16 @@ func (e *GPUEngine[T]) DestroyGraph(handle GraphHandle) error { func (e *GPUEngine[T]) Close() error { var firstErr error + // Free bulk-upload buffers (one per UploadWeights call that hit the + // bulk path). Per-tensor views into these buffers have a no-op Free, + // so the engine owns the lifetime. + for _, ptr := range e.bulkUploadBuffers { + if err := e.runtime.Free(ptr); err != nil && firstErr == nil { + firstErr = err + } + } + e.bulkUploadBuffers = nil + // Free FP8 scratch buffers before draining the pool. if e.fp8Scratch != nil { e.fp8Scratch.free(e.pool, e.deviceID)