From f099bfdec77d0332d06c194e59b01372eb876afa Mon Sep 17 00:00:00 2001 From: Jonathan Wang <31040440+jonathanpwang@users.noreply.github.com> Date: Tue, 25 Nov 2025 16:51:12 +0000 Subject: [PATCH 01/20] Proper erroring --- crates/cuda-common/src/memory_manager/mod.rs | 27 ++++++++--- .../cuda-common/src/memory_manager/vm_pool.rs | 45 ++++++++++++++----- 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/crates/cuda-common/src/memory_manager/mod.rs b/crates/cuda-common/src/memory_manager/mod.rs index a51927cf..ae76b754 100644 --- a/crates/cuda-common/src/memory_manager/mod.rs +++ b/crates/cuda-common/src/memory_manager/mod.rs @@ -59,14 +59,18 @@ impl MemoryManager { let mut tracked_size = size; let ptr = if size < self.pool.page_size { let mut ptr: *mut c_void = std::ptr::null_mut(); - check(unsafe { cudaMallocAsync(&mut ptr, size, cudaStreamPerThread) })?; + check(unsafe { cudaMallocAsync(&mut ptr, size, cudaStreamPerThread) }) + .expect("Failed to allocate small memory block via cudaMallocAsync"); self.allocated_ptrs .insert(NonNull::new(ptr).expect("cudaMalloc returned null"), size); Ok(ptr) } else { tracked_size = size.next_multiple_of(self.pool.page_size); - self.pool - .malloc_internal(tracked_size, current_stream_id()?) + self.pool.malloc_internal( + tracked_size, + current_stream_id() + .expect("Failed to get current CUDA stream ID for large allocation"), + ) }; self.current_size += tracked_size; @@ -80,13 +84,24 @@ impl MemoryManager { /// The pointer `ptr` must be a valid, previously allocated device pointer. /// The caller must ensure that `ptr` is not used after this function is called. unsafe fn d_free(&mut self, ptr: *mut c_void) -> Result<(), MemoryError> { - let nn = NonNull::new(ptr).ok_or(MemoryError::NullPointer)?; + let nn = NonNull::new(ptr) + .ok_or(MemoryError::NullPointer) + .expect("Attempted to free null pointer"); if let Some(size) = self.allocated_ptrs.remove(&nn) { self.current_size -= size; - check(unsafe { cudaFreeAsync(ptr, cudaStreamPerThread) })?; + check(unsafe { cudaFreeAsync(ptr, cudaStreamPerThread) }) + .expect("Failed to free small memory block via cudaFreeAsync"); } else { - self.current_size -= self.pool.free_internal(ptr, current_stream_id()?)?; + self.current_size -= self + .pool + .free_internal( + ptr, + current_stream_id().expect( + "Failed to get current CUDA stream ID for freeing large allocation", + ), + ) + .expect("Failed to free large memory block from virtual memory pool"); } Ok(()) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index 2ea60574..0fecb365 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -37,7 +37,7 @@ struct FreeRegion { id: usize, } -const VIRTUAL_POOL_SIZE: usize = 1usize << 45; // 32 TB +const VIRTUAL_POOL_SIZE: usize = 8usize << 40; // 8 TB /// Virtual memory pool implementation. pub(super) struct VirtualMemoryPool { @@ -133,7 +133,9 @@ impl VirtualMemoryPool { if best_region.is_none() { // Phase 2: Defragmentation - best_region = self.defragment_or_create_new_pages(requested, stream_id)?; + best_region = self + .defragment_or_create_new_pages(requested, stream_id) + .expect("Failed to defragment or create new pages in virtual memory pool"); } if let Some(ptr) = best_region { @@ -209,7 +211,8 @@ impl VirtualMemoryPool { let size = self .used_regions .remove(&ptr) - .ok_or(MemoryError::InvalidPointer)?; + .ok_or(MemoryError::InvalidPointer) + .expect("Pointer not found in used_regions - invalid pointer freed"); self.free_region_insert(ptr, size, stream_id); @@ -316,7 +319,8 @@ impl VirtualMemoryPool { to_defrag.push(ptr); accumulated_size += size; if accumulated_size >= requested { - self.remap_regions(to_defrag, stream_id)?; + self.remap_regions(to_defrag, stream_id) + .expect("Failed to remap regions during defragmentation"); return Ok(Some(defrag_start)); } } @@ -357,7 +361,8 @@ impl VirtualMemoryPool { let mut events_to_wait = Vec::new(); for (ptr, size, event, _) in other_streams_to_defrag { if !event.completed() { - default_stream_wait(&event)?; + default_stream_wait(&event) + .expect("Failed to wait for CUDA event from another stream"); events_to_wait.push(event); } to_defrag.push(ptr); @@ -387,7 +392,8 @@ impl VirtualMemoryPool { }; } - self.remap_regions(to_defrag, stream_id)?; + self.remap_regions(to_defrag, stream_id) + .expect("Failed to remap regions after waiting for events"); if accumulated_size >= requested { return Ok(Some(defrag_start)); } @@ -411,7 +417,10 @@ impl VirtualMemoryPool { } } }; - unsafe { vpmm_map(self.curr_end, self.page_size, handle)? }; + unsafe { + vpmm_map(self.curr_end, self.page_size, handle) + .expect("Failed to map physical memory page to virtual address"); + } self.active_pages.insert(self.curr_end, handle); self.curr_end += self.page_size as u64; allocated_size += self.page_size; @@ -423,7 +432,11 @@ impl VirtualMemoryPool { stream_id, ByteSize::b(self.memory_usage() as u64) ); - unsafe { vpmm_set_access(allocate_start, allocated_size, self.device_id)? }; + unsafe { + vpmm_set_access(allocate_start, allocated_size, self.device_id).expect( + "Failed to set access permissions for newly allocated virtual memory region", + ); + } self.free_region_insert(allocate_start, allocated_size, stream_id); Ok(Some(defrag_start)) @@ -442,7 +455,8 @@ impl VirtualMemoryPool { let region = self .free_regions .remove(®ion_addr) - .ok_or(MemoryError::InvalidPointer)?; + .ok_or(MemoryError::InvalidPointer) + .expect("Free region not found during remapping - invalid region address"); let num_pages = region.size / self.page_size; for i in 0..num_pages { @@ -450,15 +464,22 @@ impl VirtualMemoryPool { let handle = self .active_pages .remove(&page) - .ok_or(MemoryError::InvalidPointer)?; - unsafe { vpmm_map(self.curr_end, self.page_size, handle)? }; + .ok_or(MemoryError::InvalidPointer) + .expect("Active page not found during remapping - page handle missing"); + unsafe { + vpmm_map(self.curr_end, self.page_size, handle) + .expect("Failed to remap physical memory page to new virtual address"); + } self.active_pages.insert(self.curr_end, handle); self.curr_end += self.page_size as u64; } } let new_region_size = (self.curr_end - new_region_start) as usize; - unsafe { vpmm_set_access(new_region_start, new_region_size, self.device_id)? }; + unsafe { + vpmm_set_access(new_region_start, new_region_size, self.device_id) + .expect("Failed to set access permissions for remapped virtual memory region"); + } self.free_region_insert(new_region_start, new_region_size, stream_id); Ok(()) } From 2290451245ceb35ba7792b7dd02a7fc718518071 Mon Sep 17 00:00:00 2001 From: Jonathan Wang <31040440+jonathanpwang@users.noreply.github.com> Date: Tue, 25 Nov 2025 16:53:11 +0000 Subject: [PATCH 02/20] track remaining va --- crates/cuda-common/src/memory_manager/vm_pool.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index 0fecb365..29024403 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -475,6 +475,12 @@ impl VirtualMemoryPool { } } let new_region_size = (self.curr_end - new_region_start) as usize; + tracing::debug!( + "VPMM: Remapped regions: new_region_start={:?}, size={} (remaining: {})", + new_region_start, + new_region_size, + ByteSize::b((self.root + VIRTUAL_POOL_SIZE - self.curr_end) as u64) + ); unsafe { vpmm_set_access(new_region_start, new_region_size, self.device_id) From 21fd8ce053b5eba3ea8cf401324d450e81d39cd3 Mon Sep 17 00:00:00 2001 From: Jonathan Wang <31040440+jonathanpwang@users.noreply.github.com> Date: Tue, 25 Nov 2025 21:31:01 +0000 Subject: [PATCH 03/20] assert page size --- crates/cuda-common/src/memory_manager/vm_pool.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index 29024403..6e7507a8 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -93,6 +93,10 @@ impl VirtualMemoryPool { } Err(_) => gran, }; + assert!( + VIRTUAL_POOL_SIZE % page_size == 0, + "Virtual pool size must be a multiple of page size" + ); let va_base = vpmm_reserve(VIRTUAL_POOL_SIZE, page_size).unwrap(); (va_base, page_size) } From 30cb42bf0f5514b0dbee02bd9268a5b7f28091e3 Mon Sep 17 00:00:00 2001 From: gaxiom Date: Wed, 26 Nov 2025 13:45:22 +0000 Subject: [PATCH 04/20] synced defragmentation --- crates/cuda-common/src/error.rs | 3 + .../cuda-common/src/memory_manager/vm_pool.rs | 410 +++++++++--------- crates/cuda-common/src/stream.rs | 4 + 3 files changed, 204 insertions(+), 213 deletions(-) diff --git a/crates/cuda-common/src/error.rs b/crates/cuda-common/src/error.rs index bb5dfa87..ff02c4fa 100644 --- a/crates/cuda-common/src/error.rs +++ b/crates/cuda-common/src/error.rs @@ -95,6 +95,9 @@ pub enum MemoryError { #[error("Invalid pointer: pointer not found in allocation table")] InvalidPointer, + + #[error("Failed to reserve virtual address space (bytes: {size}, page size: {page_size})")] + ReserveFailed { size: usize, page_size: usize }, } #[derive(Error, Debug)] diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index 6e7507a8..147dca02 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -1,7 +1,6 @@ #![allow(non_camel_case_types, non_upper_case_globals, non_snake_case)] use std::{ - cmp::Reverse, collections::{BTreeMap, HashMap}, ffi::c_void, sync::{Arc, Mutex}, @@ -14,19 +13,13 @@ use crate::{ common::set_device, error::MemoryError, stream::{ - cudaStreamPerThread, cudaStream_t, current_stream_id, default_stream_wait, CudaEvent, - CudaStreamId, + current_stream_id, current_stream_sync, CudaEvent, CudaStreamId, }, }; #[link(name = "cudart")] extern "C" { fn cudaMemGetInfo(free_bytes: *mut usize, total_bytes: *mut usize) -> i32; - fn cudaLaunchHostFunc( - stream: cudaStream_t, - fn_: Option, - user_data: *mut c_void, - ) -> i32; } #[derive(Debug, Clone)] @@ -37,15 +30,12 @@ struct FreeRegion { id: usize, } -const VIRTUAL_POOL_SIZE: usize = 8usize << 40; // 8 TB +const DEFAULT_VA_SIZE: usize = 8usize << 40; // 8 TB /// Virtual memory pool implementation. pub(super) struct VirtualMemoryPool { - // Virtual address space root - root: CUdeviceptr, - - // Current end of active address space - curr_end: CUdeviceptr, + // Virtual address space roots + roots: Vec, // Map for all active pages active_pages: HashMap, @@ -53,19 +43,21 @@ pub(super) struct VirtualMemoryPool { // Free regions in virtual space (sorted by address) free_regions: BTreeMap, - // Number of free calls (used to assign ids to free regions) - free_num: usize, + // Active allocations: (ptr, size) + malloc_regions: HashMap, - // Pending events to destroy after "wait for event" is finished - // key: free_num as VMPool free state; value: non-completed events from other streams - pending_events: Arc>>>>, + // Unmapped regions: (ptr, size) + unmapped_regions: BTreeMap, - // Active allocations: (ptr, size) - used_regions: HashMap, + // Number of free calls (used to assign ids to free regions) + free_num: usize, // Granularity size: (page % 2MB must be 0) pub(super) page_size: usize, + // Reserved virtual address span in bytes + va_size: usize, + // Device ordinal pub(super) device_id: i32, } @@ -76,7 +68,7 @@ unsafe impl Sync for VirtualMemoryPool {} impl VirtualMemoryPool { fn new() -> Self { let device_id = set_device().unwrap(); - let (root, page_size) = unsafe { + let (root, page_size, va_size) = unsafe { match vpmm_check_support(device_id) { Ok(_) => { let gran = vpmm_min_granularity(device_id).unwrap(); @@ -93,29 +85,42 @@ impl VirtualMemoryPool { } Err(_) => gran, }; + let va_size = match std::env::var("VPMM_VA_SIZE") { + Ok(val) => val.parse().expect("VPMM_VA_SIZE must be a valid number"), + Err(_) => DEFAULT_VA_SIZE, + }; assert!( - VIRTUAL_POOL_SIZE % page_size == 0, + va_size > 0 && va_size % page_size == 0, "Virtual pool size must be a multiple of page size" ); - let va_base = vpmm_reserve(VIRTUAL_POOL_SIZE, page_size).unwrap(); - (va_base, page_size) + let va_base = vpmm_reserve(va_size, page_size).unwrap(); + tracing::info!( + "VPMM: Reserved virtual address space {} with page size {}", + ByteSize::b(va_size as u64), + ByteSize::b(page_size as u64) + ); + (va_base, page_size, va_size) } Err(_) => { tracing::warn!("VPMM not supported, falling back to cudaMallocAsync"); - (0, usize::MAX) + (0, usize::MAX, 0) } } }; Self { - root, - curr_end: root, + roots: vec![root], active_pages: HashMap::new(), free_regions: BTreeMap::new(), + malloc_regions: HashMap::new(), + unmapped_regions: if va_size > 0 { + BTreeMap::from_iter([(root, va_size)]) + } else { + BTreeMap::new() + }, free_num: 0, - pending_events: Arc::new(Mutex::new(HashMap::new())), - used_regions: HashMap::new(), page_size, + va_size, device_id, } } @@ -159,7 +164,7 @@ impl VirtualMemoryPool { ); } - self.used_regions.insert(ptr, requested); + self.malloc_regions.insert(ptr, requested); return Ok(ptr as *mut c_void); } @@ -192,12 +197,15 @@ impl VirtualMemoryPool { return Some(*addr); } - // 1b. Try completed from other streams (smallest completed fit) + // 1b. Try the oldest from other streams candidates .iter_mut() - .filter(|(_, region)| region.event.completed()) - .min_by_key(|(_, region)| region.size) + .min_by_key(|(_, region)| region.id) .map(|(addr, region)| { + region + .event + .synchronize() + .expect("Failed to synchronize CUDA event during allocation"); region.stream_id = stream_id; *addr }) @@ -210,13 +218,12 @@ impl VirtualMemoryPool { ptr: *mut c_void, stream_id: CudaStreamId, ) -> Result { - debug_assert!(self.curr_end != self.root, "VM pool is empty"); let ptr = ptr as CUdeviceptr; let size = self - .used_regions + .malloc_regions .remove(&ptr) .ok_or(MemoryError::InvalidPointer) - .expect("Pointer not found in used_regions - invalid pointer freed"); + .expect("Pointer not found in malloc_regions - invalid pointer freed"); self.free_region_insert(ptr, size, stream_id); @@ -262,6 +269,58 @@ impl VirtualMemoryPool { ); } + fn take_unmapped_region(&mut self, requested: usize) -> Result { + debug_assert!(requested != 0); + debug_assert_eq!(requested % self.page_size, 0); + + if let Some((&addr, &size)) = self + .unmapped_regions + .iter() + .filter(|(_, region_size)| **region_size >= requested) + .min_by_key(|(_, region_size)| *region_size) + { + self.unmapped_regions.remove(&addr); + if size > requested { + self.unmapped_regions + .insert(addr + requested as u64, size - requested); + } + return Ok(addr); + } + + let addr = unsafe { + vpmm_reserve(self.va_size, self.page_size).map_err(|_| MemoryError::ReserveFailed { + size: self.va_size, + page_size: self.page_size, + })? + }; + self.roots.push(addr); + self.insert_unmapped_region(addr + requested as u64, self.va_size - requested); + Ok(addr) + } + + fn insert_unmapped_region(&mut self, mut addr: CUdeviceptr, mut size: usize) { + if size == 0 { + return; + } + + if let Some((&prev_addr, &prev_size)) = self.unmapped_regions.range(..=addr).next_back() { + if prev_addr + prev_size as u64 == addr { + self.unmapped_regions.remove(&prev_addr); + addr = prev_addr; + size += prev_size; + } + } + + if let Some((&next_addr, &next_size)) = self.unmapped_regions.range(addr + 1..).next() { + if addr + size as u64 == next_addr { + self.unmapped_regions.remove(&next_addr); + size += next_size; + } + } + + self.unmapped_regions.insert(addr, size); + } + /// Defragments the pool by consolidating free regions at the end of virtual address space. /// Remaps pages to create one large contiguous free region. fn defragment_or_create_new_pages( @@ -273,147 +332,29 @@ impl VirtualMemoryPool { return Ok(None); } + let total_free_size = self.free_regions.values().map(|r| r.size).sum::(); tracing::debug!( - "VPMM: Defragging or creating new pages: requested={}, stream_id={}", + "VPMM: Defragging or creating new pages: requested={}, free={} stream_id={}", ByteSize::b(requested as u64), + ByteSize::b(total_free_size as u64), stream_id ); - let mut to_defrag = Vec::new(); - - // 2.a. Defrag current stream (newest first) - let mut current_stream_to_defrag: Vec<(CUdeviceptr, usize, usize)> = self - .free_regions - .iter() - .filter(|(_, r)| r.stream_id == stream_id) - .map(|(addr, r)| (*addr, r.size, r.id)) - .collect(); - - // If last free region is at the virtual end, use it as base for defragmentation - let (mut defrag_start, mut accumulated_size) = if current_stream_to_defrag - .last() - .is_some_and(|(addr, size, _)| *addr + *size as u64 == self.curr_end) - { - let (addr, size, _) = current_stream_to_defrag.pop().unwrap(); - (addr, size) - } else { - (self.curr_end, 0) - }; - - tracing::debug!( - "VPMM: Current stream {} trying to defragment from {:?}", - stream_id, - current_stream_to_defrag - .iter() - .map(|(_, size, _)| ByteSize::b(*size as u64)) - .collect::>() - ); - - let current_stream_to_defrag_size = current_stream_to_defrag - .iter() - .map(|(_, size, _)| size) - .sum::(); - if current_stream_to_defrag_size + accumulated_size < requested { - to_defrag.extend(current_stream_to_defrag.iter().map(|(ptr, _, _)| *ptr)); - accumulated_size += current_stream_to_defrag_size; - } else { - current_stream_to_defrag.sort_by_key(|(_, _, free_id)| Reverse(*free_id)); - - for (ptr, size, _) in current_stream_to_defrag { - to_defrag.push(ptr); - accumulated_size += size; - if accumulated_size >= requested { - self.remap_regions(to_defrag, stream_id) - .expect("Failed to remap regions during defragmentation"); - return Ok(Some(defrag_start)); - } - } - } - - // 2.b. Defrag other streams (completed, oldest first) - let mut other_streams_to_defrag: Vec<(CUdeviceptr, usize, Arc, usize)> = self - .free_regions - .iter() - .filter(|(_, r)| r.stream_id != stream_id) - .map(|(addr, r)| (*addr, r.size, r.event.clone(), r.id)) - .collect(); - - // If last free region is at the virtual end, take it and use as base for defragmentation - if accumulated_size == 0 - && other_streams_to_defrag - .last() - .is_some_and(|(addr, size, event, _)| { - event.completed() && *addr + *size as u64 == self.curr_end - }) - { - (defrag_start, accumulated_size, _, _) = other_streams_to_defrag.pop().unwrap(); - self.free_regions.get_mut(&defrag_start).unwrap().stream_id = stream_id; - } - - tracing::debug!( - "VPMM: Defragmented {} bytes from stream {}, other streams ready to borrow {:?}", - ByteSize::b(accumulated_size as u64), - stream_id, - other_streams_to_defrag - .iter() - .map(|(_, size, event, id)| (ByteSize::b(*size as u64), event.status(), id)) - .collect::>() - ); - - other_streams_to_defrag.sort_by_key(|(_, _, event, free_id)| (event.status(), *free_id)); - - let mut events_to_wait = Vec::new(); - for (ptr, size, event, _) in other_streams_to_defrag { - if !event.completed() { - default_stream_wait(&event) - .expect("Failed to wait for CUDA event from another stream"); - events_to_wait.push(event); - } - to_defrag.push(ptr); - accumulated_size += size; - if accumulated_size >= requested { - break; - } - } - if !events_to_wait.is_empty() { - // Destroy events only when "wait for event" is finished - tracing::debug!( - "VPMM: Async call {} events to destroy on stream {}", - events_to_wait.len(), - stream_id - ); - self.pending_events - .lock() - .unwrap() - .insert(self.free_num, events_to_wait); - unsafe { - cudaLaunchHostFunc( - cudaStreamPerThread, - Some(pending_events_destroy_callback), - Box::into_raw(Box::new((self.free_num, self.pending_events.clone()))) - as *mut c_void, - ) - }; - } - - self.remap_regions(to_defrag, stream_id) - .expect("Failed to remap regions after waiting for events"); - if accumulated_size >= requested { - return Ok(Some(defrag_start)); - } - - // 2.c. Create new pages - let allocate_start = self.curr_end; - let mut allocated_size = 0; - while accumulated_size < requested { + // Find a best fit unmapped region + let dst = self.take_unmapped_region(requested).expect("Failed to take unmapped region"); + + // Allocate new pages if we don't have enough free regions + let mut allocated_dst = dst; + let allocate_size = if total_free_size < requested { requested - total_free_size } else { 0 }; + while allocated_dst < dst + allocate_size as u64 { let handle = unsafe { match vpmm_create_physical(self.device_id, self.page_size) { Ok(handle) => handle, Err(e) => { if e.is_out_of_memory() { return Err(MemoryError::OutOfMemory { - requested, - available: accumulated_size, + requested: allocate_size, + available: (allocated_dst - dst) as usize , }); } else { return Err(MemoryError::from(e)); @@ -422,75 +363,115 @@ impl VirtualMemoryPool { } }; unsafe { - vpmm_map(self.curr_end, self.page_size, handle) + vpmm_map(allocated_dst, self.page_size, handle) .expect("Failed to map physical memory page to virtual address"); } - self.active_pages.insert(self.curr_end, handle); - self.curr_end += self.page_size as u64; - allocated_size += self.page_size; - accumulated_size += self.page_size; + self.active_pages.insert( allocated_dst, handle); + allocated_dst += self.page_size as u64; } - tracing::info!( - "GPU mem: VPMM allocated {} bytes on stream {}. Total allocated: {}", - ByteSize::b(allocated_size as u64), - stream_id, - ByteSize::b(self.memory_usage() as u64) - ); - unsafe { - vpmm_set_access(allocate_start, allocated_size, self.device_id).expect( - "Failed to set access permissions for newly allocated virtual memory region", + if allocate_size > 0 { + tracing::debug!( + "VPMM: Allocated {} bytes on stream {}. Total allocated: {}", + ByteSize::b(allocate_size as u64), + stream_id, + ByteSize::b(self.memory_usage() as u64) ); + unsafe { + vpmm_set_access(dst, allocate_size, self.device_id).expect( + "Failed to set access permissions for newly allocated virtual memory region", + ); + } + self.free_region_insert(dst, allocate_size, stream_id); + } + let mut remaining = requested - allocate_size; + if remaining == 0 { + return Ok(Some(dst)); } - self.free_region_insert(allocate_start, allocated_size, stream_id); - Ok(Some(defrag_start)) + // Defragment free regions from oldest to newest + let mut to_defrag: Vec<(CUdeviceptr, usize)> = Vec::new(); + let mut oldest_free_regions: Vec<_> = self.free_regions.iter().map(|(addr, region)| (region.id, *addr)).collect(); + oldest_free_regions.sort_by_key(|(id, _)| *id); + for (_, addr) in oldest_free_regions { + if remaining == 0 { + break; + } + + let region = self + .free_regions + .remove(&addr) + .unwrap(); + region + .event + .synchronize() + .expect("Failed to synchronize event"); + + let take = remaining.min(region.size); + to_defrag.push((addr, take)); + remaining -= take; + + if region.size > take { + // stash the leftover right away + let leftover_addr = addr + take as u64; + let leftover_size = region.size - take; + self.free_region_insert(leftover_addr, leftover_size, region.stream_id); + } + } + self.remap_regions(to_defrag, allocated_dst, stream_id)?; + + Ok(Some(dst)) } fn remap_regions( &mut self, - regions: Vec, + regions: Vec<(CUdeviceptr, usize)>, + dst: CUdeviceptr, stream_id: CudaStreamId, ) -> Result<(), MemoryError> { if regions.is_empty() { return Ok(()); } - let new_region_start = self.curr_end; - for region_addr in regions { - let region = self - .free_regions - .remove(®ion_addr) - .ok_or(MemoryError::InvalidPointer) - .expect("Free region not found during remapping - invalid region address"); - let num_pages = region.size / self.page_size; + let bytes_to_remap = regions.iter().map(|(_, size)| *size).sum::(); + tracing::debug!( + "VPMM: Remapping {} regions. Total size = {}", + regions.len(), + ByteSize::b(bytes_to_remap as u64) + ); + + let mut curr_dst = dst; + regions.into_iter().for_each(|(region_addr, region_size)| { + // Unmap the region + unsafe { + vpmm_unmap(region_addr, region_size).expect("Failed to unmap region"); + } + self.insert_unmapped_region(region_addr, region_size); + + // Remap the region + let num_pages = region_size / self.page_size; for i in 0..num_pages { let page = region_addr + (i * self.page_size) as u64; let handle = self .active_pages .remove(&page) - .ok_or(MemoryError::InvalidPointer) .expect("Active page not found during remapping - page handle missing"); unsafe { - vpmm_map(self.curr_end, self.page_size, handle) + vpmm_map(curr_dst, self.page_size, handle) .expect("Failed to remap physical memory page to new virtual address"); } - self.active_pages.insert(self.curr_end, handle); - self.curr_end += self.page_size as u64; + self.active_pages.insert(curr_dst, handle); + curr_dst += self.page_size as u64; } - } - let new_region_size = (self.curr_end - new_region_start) as usize; - tracing::debug!( - "VPMM: Remapped regions: new_region_start={:?}, size={} (remaining: {})", - new_region_start, - new_region_size, - ByteSize::b((self.root + VIRTUAL_POOL_SIZE - self.curr_end) as u64) - ); + }); + debug_assert_eq!(curr_dst - dst, bytes_to_remap as u64); + + // Set access permissions for the remapped region unsafe { - vpmm_set_access(new_region_start, new_region_size, self.device_id) + vpmm_set_access(dst, bytes_to_remap, self.device_id) .expect("Failed to set access permissions for remapped virtual memory region"); } - self.free_region_insert(new_region_start, new_region_size, stream_id); + self.free_region_insert(dst, bytes_to_remap, stream_id); Ok(()) } @@ -502,16 +483,18 @@ impl VirtualMemoryPool { impl Drop for VirtualMemoryPool { fn drop(&mut self) { - if self.root != self.curr_end { + current_stream_sync().unwrap(); + for (ptr, handle) in self.active_pages.drain() { unsafe { - vpmm_unmap(self.root, (self.curr_end - self.root) as usize).unwrap(); - for (_, handle) in self.active_pages.drain() { - vpmm_release(handle).unwrap(); - } + vpmm_unmap(ptr, self.page_size).unwrap(); + vpmm_release(handle).unwrap(); } } - unsafe { - vpmm_release_va(self.root, VIRTUAL_POOL_SIZE).unwrap(); + + for root in self.roots.drain(..) { + unsafe { + vpmm_release_va(root, self.va_size).unwrap(); + } } } } @@ -552,6 +535,7 @@ impl Default for VirtualMemoryPool { } } +#[allow(unused)] unsafe extern "C" fn pending_events_destroy_callback(data: *mut c_void) { let boxed = Box::from_raw(data as *mut (usize, Arc>>>>)); diff --git a/crates/cuda-common/src/stream.rs b/crates/cuda-common/src/stream.rs index bd542ff5..6eba0eaa 100644 --- a/crates/cuda-common/src/stream.rs +++ b/crates/cuda-common/src/stream.rs @@ -150,6 +150,10 @@ impl CudaEvent { check(unsafe { cudaEventRecord(self.event, cudaStreamPerThread) }) } + pub fn synchronize(&self) -> Result<(), CudaError> { + check(unsafe { cudaEventSynchronize(self.event) }) + } + /// # Safety /// The caller must ensure that `stream` is a valid stream. pub unsafe fn record_and_wait(&self, stream: cudaStream_t) -> Result<(), CudaError> { From f44ad83d38313d26942ff5fa53718a4c8b4d47ff Mon Sep 17 00:00:00 2001 From: gaxiom Date: Wed, 26 Nov 2025 13:47:25 +0000 Subject: [PATCH 05/20] format --- .../cuda-common/src/memory_manager/vm_pool.rs | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index 147dca02..5554a862 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -12,9 +12,7 @@ use super::cuda::*; use crate::{ common::set_device, error::MemoryError, - stream::{ - current_stream_id, current_stream_sync, CudaEvent, CudaStreamId, - }, + stream::{current_stream_id, current_stream_sync, CudaEvent, CudaStreamId}, }; #[link(name = "cudart")] @@ -341,11 +339,13 @@ impl VirtualMemoryPool { ); // Find a best fit unmapped region - let dst = self.take_unmapped_region(requested).expect("Failed to take unmapped region"); - + let dst = self + .take_unmapped_region(requested) + .expect("Failed to take unmapped region"); + // Allocate new pages if we don't have enough free regions let mut allocated_dst = dst; - let allocate_size = if total_free_size < requested { requested - total_free_size } else { 0 }; + let allocate_size = requested.saturating_sub(total_free_size); while allocated_dst < dst + allocate_size as u64 { let handle = unsafe { match vpmm_create_physical(self.device_id, self.page_size) { @@ -354,7 +354,7 @@ impl VirtualMemoryPool { if e.is_out_of_memory() { return Err(MemoryError::OutOfMemory { requested: allocate_size, - available: (allocated_dst - dst) as usize , + available: (allocated_dst - dst) as usize, }); } else { return Err(MemoryError::from(e)); @@ -366,7 +366,7 @@ impl VirtualMemoryPool { vpmm_map(allocated_dst, self.page_size, handle) .expect("Failed to map physical memory page to virtual address"); } - self.active_pages.insert( allocated_dst, handle); + self.active_pages.insert(allocated_dst, handle); allocated_dst += self.page_size as u64; } if allocate_size > 0 { @@ -390,17 +390,18 @@ impl VirtualMemoryPool { // Defragment free regions from oldest to newest let mut to_defrag: Vec<(CUdeviceptr, usize)> = Vec::new(); - let mut oldest_free_regions: Vec<_> = self.free_regions.iter().map(|(addr, region)| (region.id, *addr)).collect(); + let mut oldest_free_regions: Vec<_> = self + .free_regions + .iter() + .map(|(addr, region)| (region.id, *addr)) + .collect(); oldest_free_regions.sort_by_key(|(id, _)| *id); for (_, addr) in oldest_free_regions { if remaining == 0 { break; } - let region = self - .free_regions - .remove(&addr) - .unwrap(); + let region = self.free_regions.remove(&addr).unwrap(); region .event .synchronize() From fa7e6cb7cb4b12a0bf2eaec8ab5330d94622599b Mon Sep 17 00:00:00 2001 From: gaxiom Date: Wed, 26 Nov 2025 14:14:54 +0000 Subject: [PATCH 06/20] debug print --- .../cuda-common/src/memory_manager/vm_pool.rs | 67 ++++++++++++++++--- 1 file changed, 57 insertions(+), 10 deletions(-) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index 5554a862..4fb9ac12 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -3,7 +3,7 @@ use std::{ collections::{BTreeMap, HashMap}, ffi::c_void, - sync::{Arc, Mutex}, + sync::Arc, }; use bytesize::ByteSize; @@ -267,6 +267,8 @@ impl VirtualMemoryPool { ); } + + /// Return the base address of a virtual hole large enough for `requested` bytes. fn take_unmapped_region(&mut self, requested: usize) -> Result { debug_assert!(requested != 0); debug_assert_eq!(requested % self.page_size, 0); @@ -296,6 +298,7 @@ impl VirtualMemoryPool { Ok(addr) } + /// Insert a hole back into the unmapped set, coalescing with neighbors. fn insert_unmapped_region(&mut self, mut addr: CUdeviceptr, mut size: usize) { if size == 0 { return; @@ -319,8 +322,8 @@ impl VirtualMemoryPool { self.unmapped_regions.insert(addr, size); } - /// Defragments the pool by consolidating free regions at the end of virtual address space. - /// Remaps pages to create one large contiguous free region. + /// Defragments the pool by reusing existing holes and, if needed, reserving more VA space. + /// Moves just enough pages to satisfy `requested`, keeping the remainder of each region in place. fn defragment_or_create_new_pages( &mut self, requested: usize, @@ -388,7 +391,7 @@ impl VirtualMemoryPool { return Ok(Some(dst)); } - // Defragment free regions from oldest to newest + // Pull free regions (oldest first) until we've gathered enough pages. let mut to_defrag: Vec<(CUdeviceptr, usize)> = Vec::new(); let mut oldest_free_regions: Vec<_> = self .free_regions @@ -412,7 +415,7 @@ impl VirtualMemoryPool { remaining -= take; if region.size > take { - // stash the leftover right away + // Return the unused tail to the free list so it stays available. let leftover_addr = addr + take as u64; let leftover_size = region.size - take; self.free_region_insert(leftover_addr, leftover_size, region.stream_id); @@ -423,6 +426,8 @@ impl VirtualMemoryPool { Ok(Some(dst)) } + /// Remap a list of regions to a new base address. + /// The regions already dropped from free regions fn remap_regions( &mut self, regions: Vec<(CUdeviceptr, usize)>, @@ -537,9 +542,51 @@ impl Default for VirtualMemoryPool { } #[allow(unused)] -unsafe extern "C" fn pending_events_destroy_callback(data: *mut c_void) { - let boxed = - Box::from_raw(data as *mut (usize, Arc>>>>)); - let (key, pending) = *boxed; - pending.lock().unwrap().remove(&key); +impl std::fmt::Debug for VirtualMemoryPool { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!( + f, + "VMPool (VA_SIZE={}, PAGE_SIZE={})", + ByteSize::b(self.va_size as u64), + ByteSize::b(self.page_size as u64) + )?; + + let reserved = self.roots.len() * self.va_size; + let allocated = self.memory_usage(); + let free_bytes: usize = self.free_regions.values().map(|r| r.size).sum(); + let malloc_bytes: usize = self.malloc_regions.values().sum(); + let unmapped_bytes: usize = self.unmapped_regions.values().sum(); + + writeln!( + f, + "Total: reserved={}, allocated={}, free={}, malloc={}, unmapped={}", + ByteSize::b(reserved as u64), + ByteSize::b(allocated as u64), + ByteSize::b(free_bytes as u64), + ByteSize::b(malloc_bytes as u64), + ByteSize::b(unmapped_bytes as u64) + )?; + + let mut regions: Vec<(CUdeviceptr, usize, String)> = Vec::new(); + for (addr, region) in &self.free_regions { + regions.push(( + *addr, + region.size, + format!("free (s {})", region.stream_id), + )); + } + for (addr, size) in &self.malloc_regions { + regions.push((*addr, *size, "malloc".to_string())); + } + for (addr, size) in &self.unmapped_regions { + regions.push((*addr, *size, "unmapped".to_string())); + } + regions.sort_by_key(|(addr, _, _)| *addr); + + write!(f, "Regions: ")?; + for (_, size, label) in regions.iter() { + write!(f, "[{} {}]", label, ByteSize::b(*size as u64))?; + } + Ok(()) + } } From 79313ec8d3dd4c326d466afb023d3e0d3dd58951 Mon Sep 17 00:00:00 2001 From: gaxiom Date: Wed, 26 Nov 2025 14:23:27 +0000 Subject: [PATCH 07/20] lint fix --- crates/cuda-common/src/memory_manager/vm_pool.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index 4fb9ac12..d45f5a16 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -267,7 +267,6 @@ impl VirtualMemoryPool { ); } - /// Return the base address of a virtual hole large enough for `requested` bytes. fn take_unmapped_region(&mut self, requested: usize) -> Result { debug_assert!(requested != 0); @@ -323,7 +322,7 @@ impl VirtualMemoryPool { } /// Defragments the pool by reusing existing holes and, if needed, reserving more VA space. - /// Moves just enough pages to satisfy `requested`, keeping the remainder of each region in place. + /// Moves just enough pages to satisfy `requested`, keeping the remainder in place. fn defragment_or_create_new_pages( &mut self, requested: usize, @@ -569,11 +568,7 @@ impl std::fmt::Debug for VirtualMemoryPool { let mut regions: Vec<(CUdeviceptr, usize, String)> = Vec::new(); for (addr, region) in &self.free_regions { - regions.push(( - *addr, - region.size, - format!("free (s {})", region.stream_id), - )); + regions.push((*addr, region.size, format!("free (s {})", region.stream_id))); } for (addr, size) in &self.malloc_regions { regions.push((*addr, *size, "malloc".to_string())); @@ -584,8 +579,8 @@ impl std::fmt::Debug for VirtualMemoryPool { regions.sort_by_key(|(addr, _, _)| *addr); write!(f, "Regions: ")?; - for (_, size, label) in regions.iter() { - write!(f, "[{} {}]", label, ByteSize::b(*size as u64))?; + for (_, size, label) in regions.iter() { + write!(f, "[{} {}]", label, ByteSize::b(*size as u64))?; } Ok(()) } From 129cd1e9943e911c884c2c00da025573db96f1e2 Mon Sep 17 00:00:00 2001 From: gaxiom Date: Wed, 26 Nov 2025 14:58:42 +0000 Subject: [PATCH 08/20] docs update --- docs/vpmm_spec.md | 125 ++++++++++++++++++++++------------------------ 1 file changed, 59 insertions(+), 66 deletions(-) diff --git a/docs/vpmm_spec.md b/docs/vpmm_spec.md index 95fc0f72..0dbdd8d7 100644 --- a/docs/vpmm_spec.md +++ b/docs/vpmm_spec.md @@ -23,25 +23,23 @@ The **Virtual Memory Pool Memory Manager (VPMM)** replaces this with a design ba ## Concepts & Notation -* **Virtual address (VA) space** — Per‑process range of GPU‑visible addresses. Reserved VA has no storage until mapped. -* **Active space** — The prefixed portion of the reserved VA that currently has mappings; grows as needed. -* **Page** — Fixed‑size chunk of physical GPU memory, the basic mapping unit. A page may be mapped at multiple VAs. -* **Region** — Consecutive VA range with a single state: - - * `[+X]` allocated region returned by the **current** `malloc` call - * `[X]` allocated region from a **previous** call - * `[-X]` region marked as **free** - * `[*X]` region marked as **dead** (free but already remapped) - * `[#X]` region marked as **new** (free but created by current allocation) +* **Virtual address (VA) space** — Per‑process GPU-visible address range. Reserving VA does **not** allocate physical memory. +* **Page** — Fixed-size physical GPU chunk (≥ CUDA VMM granularity). All mappings and allocations are rounded up to page size. +* **Region** — Consecutive VA range tracked with symbolic states: + * `[+X]` – current allocation returned by `malloc` + * `[X]` – previously allocated user region + * `[-X]` – mapped free region with an associated stream/event + * `[*X]` – **unmapped** region (hole created after remapping) + * `[#X]` – brand new region created while satisfying the current allocation ## High‑Level Design -1. **Reserve** a sufficiently large VA range once at initialization. -2. **Pre‑allocate** a configurable number of physical pages. -3. **Map** pages contiguously at the **end of active space** for new allocations. -4. **Find‑or‑defragment**: if no free region fits, **remap** pages from earlier freed regions to the end (no data copy, just page table updates). -5. **Cross-stream synchronization**: Use CUDA events to track when freed memory becomes safe to reuse across streams. -6. **Grow**: if still insufficient, **allocate more pages** and map them at the end. +1. **Reserve** a large VA chunk once (size configurable via `VPMM_VA_SIZE`). Additional chunks are reserved on demand. +2. **Track** three disjoint maps: `malloc_regions`, `free_regions` (with CUDA events/stream ids), and `unmapped_regions` (holes). +3. **Allocate** by finding the best-fit free region in the current stream; otherwise reuse a completed region; otherwise wait on the oldest event. +4. **Defragment** only when necessary: grab a hole, detach just enough pages from the oldest free regions (split + reinsert leftovers), unmap their old VA, and remap into the hole. +5. **Grow** by allocating the shortfall in physical pages and mapping them into the same hole when free regions still aren’t enough. +6. **Observe** all activity via maps (no implicit “active end”). Debug output lists every region in address order. Small allocations (< page size) bypass the pool and use `cudaMallocAsync` for simplicity and backward compatibility. @@ -106,47 +104,37 @@ Similar to Case C, except `+4` in step 4 cannot fit the third region, so layout // cuda-common/src/memory_manager/vm_pool.rs struct FreeRegion { size: usize, - event: CudaEvent, // Event marking when this region was freed + event: Arc, // Event marking when this region was freed stream_id: CudaStreamId, // Stream that freed this region id: usize, // Creation order for temporal tracking } pub(super) struct VirtualMemoryPool { - device_id: i32, - root: CUdeviceptr, // Virtual address space root (shared across streams) - curr_end: CUdeviceptr, // Current end of active address space - pub(super) page_size: usize, // Mapping granularity (multiple of 2MB) - - // Map for all active pages (last mapping only; keys and values are unique) + roots: Vec, // Every reserved VA chunk active_pages: HashMap, - - // Free regions in virtual space (sorted by address; non-adjacent) free_regions: BTreeMap, - - // Number of free calls (used to assign ids to free regions) + malloc_regions: HashMap, + unmapped_regions: BTreeMap, free_num: usize, - - // Pending events to destroy after "wait for event" is finished - pending_events: Arc>>>>, - - // Active allocations - used_regions: HashMap, + pub(super) page_size: usize, + va_size: usize, + device_id: i32, } ``` **Invariants** -* `root` is returned by VA reservation and satisfies: `root ≤ key < curr_end` for all keys in the collections. -* `active_pages` tracks the **current** mapping address for each page. -* `free_regions` cover only the **active** VA range and are always coalesced (neighbors merged on free). -* Each `FreeRegion` tracks which stream freed it and records a CUDA event for cross-stream synchronization. +* Every `roots` entry corresponds to a reserved VA chunk of size `va_size`. We only map/unmap within these chunks. +* `active_pages` tracks the current virtual address for every mapped page; keys move when we remap. +* `free_regions`, `malloc_regions`, and `unmapped_regions` partition the reserved VA space with no overlap; `free_regions` are coalesced by stream/event when possible. +* Each `FreeRegion` retains the CUDA event recorded at free time plus the originating `CudaStreamId`; we block on this event before stealing the region from another stream. ## Initialization -* **VA Reservation:** Reserve a large VA range at startup. **Current value:** 32 TB (constant). -* **Page Size:** Configurable via `VPMM_PAGE_SIZE`. Must be a multiple of CUDA’s minimum allocation granularity (typically 2 MB). Larger pages reduce mapping overhead; all allocations are rounded up to page size. -* **Initial Pages:** Count is configurable via `VPMM_PAGES`. Defaults to 0 (allocate on-demand). More pages improve performance but increase baseline memory footprint. -* **Mapping Unit:** All mappings are performed **by page**. +* **VA Reservation:** Reserve a `VPMM_VA_SIZE` chunk (default 8 TB) at startup. When every hole is consumed, reserve another chunk and append it to `roots`. +* **Page Size:** Configurable via `VPMM_PAGE_SIZE` (≥ CUDA’s VMM granularity, typically 2 MB). All requests are rounded up to this size. +* **Initial Pages:** `VPMM_PAGES` controls how many pages are eagerly mapped. Defaults to 0 (purely on-demand). +* **Mapping Unit:** Always page-sized; the pool never subdivides a page. ## Allocation & Growth @@ -159,27 +147,30 @@ Allocations occur during initialization (preallocation) and on‑demand when the ## Defragmentation -Triggered when no free region can satisfy a request. Defragmentation proceeds in phases to minimize cross-stream synchronization: +Triggered when Phase 1 fails. The new defragmentation flow is deterministic and bounded: -* **Phase 2a - Current stream (latest first):** Remap free regions from the requesting stream, prioritizing latest freed regions (newest events) to minimize wasted work. -* **Phase 2b - Wait on other streams (oldest first):** Wait for the oldest events from other streams first (most likely to have completed) and remap those regions. Uses `cudaStreamWaitEvent` to introduce cross-stream dependencies, with `cudaLaunchHostFunc` to manage event lifetimes asynchronously. -* **Phase 2c - Allocate new pages:** If still insufficient after waiting on all available regions, allocate new physical pages. +1. **Take a hole** — `take_unmapped_region` returns the smallest unmapped interval ≥ `requested`. If none exists, we reserve another VA chunk and carve the hole out of it. +2. **Allocate shortfall** — compute the page shortfall (`requested - free_bytes`). Allocate that many physical pages and map them into the beginning of the hole. +3. **Harvest oldest frees** — iterate `free_regions` ordered by `free_id`. For each region: + * Block on its CUDA event via `CudaEvent::synchronize()`. + * Detach only the portion we need (`take = min(remaining, region.size)`), unmap it, and push `(addr, take)` to a work list. + * Reinsert the leftover tail (if any) back into `free_regions` with its original stream/event. + Stop once the total `take` covers the remainder of the request. +4. **Remap into the hole** — unmap each harvested chunk, add its VA back to `unmapped_regions`, and remap the chunk (page by page) contiguously into the hole immediately after the newly allocated portion. The combined span becomes the new free region for the requesting stream. -* **Remapping:** Select free regions and **remap their pages** to the end of active space. The original region becomes a **dead zone** and is not reused. -* **Event tracking:** Each free region records a CUDA event. Before reusing memory from another stream, VPMM either checks event completion or waits. -* **Event lifetime management:** Events are wrapped in `Arc` for shared ownership. When waiting on events from other streams, VPMM clones the Arc and stores it in `pending_events` (keyed by `free_num`). After issuing `cudaStreamWaitEvent`, it schedules a `cudaLaunchHostFunc` callback that removes the Arc clones once the stream has processed the wait. This ensures events are not destroyed while GPU streams are waiting on them, avoiding segmentation faults. -* **Access permissions:** After remapping, `cuMemSetAccess` is called to grant access to the new VA range. ## malloc(size) — Allocation Policy **Phase 1: Zero-cost attempts** -1. **Find on current stream:** Search `free_regions` for a region from the current stream large enough to satisfy the request (**BestFit**: smallest that fits). -2. **Find completed from other streams:** Search for a completed region (event already done) from other streams. +1. **Best fit on current stream** — smallest region from the caller’s stream that fits `requested`. +2. **Completed from other streams** — any region whose event has already completed (no blocking). +3. **Oldest in-flight** — take the lowest `free_id`, call `CudaEvent::synchronize()`, and hand that region to the caller. This avoids full defrag if one region can satisfy the request. -**Phase 2: Defragmentation (if Phase 1 fails)** -3. **Defragment current stream:** Remap current stream's free regions (newest first) until enough space. -4. **Wait on other streams:** Issue `cudaStreamWaitEvent` on events from other streams (oldest first, most likely completed). Use `cudaLaunchHostFunc` to schedule async cleanup of event references. -5. **Grow:** Allocate new pages and map them after `curr_end`. +**Phase 2: Hole-based defragmentation** +4. **Reserve hole** — via `take_unmapped_region`. +5. **Allocate shortfall** — map the missing number of pages into the hole. +6. **Harvest oldest frees** — synchronously detach just enough pages from free regions (splitting leftovers). +7. **Remap into hole** — unmap the detached slices, add their old VA back to `unmapped_regions`, and remap them contiguously into the hole. The combined span (new pages + remapped slices) becomes the new free region for the caller. Additional rules: @@ -188,22 +179,24 @@ Additional rules: ## free(ptr) -* Look up `ptr` in `used_regions` to obtain the region size. -* Record a CUDA event on the current stream to mark when this memory becomes safe to reuse. -* Mark the region free in `free_regions` with the event and stream ID, and **coalesce** with adjacent free neighbors from the **same stream** (keeping the newest event). +* Look up `ptr` in `malloc_regions` to obtain the aligned size. +* Record a CUDA event on the calling stream (always `cudaStreamPerThread`) and store the stream id/event pair in `free_regions`. +* Attempt to coalesce with adjacent regions from the same stream that have matching completion guarantees. +* Remove the entry from `malloc_regions`. ## Status & Observability -(All tracking is implemented in the outer **MemoryManager**, not inside `VirtualMemoryPool`, since small buffers bypass the pool.) +(All tracking is implemented in the outer **MemoryManager**, but the pool exposes enough state for debug dumps.) -* **Total GPU memory used by pages:** `pool.active_pages.len() * pool.page_size` -* **Active VA extent:** `pool.curr_end - pool.base` -* **Currently allocated (live) bytes:** `sum(pool.used_regions.values())` -* **Currently freed (reusable) bytes:** `sum(pool.free_regions.values().map(|r| r.size))` +* **Total GPU memory mapped:** `pool.active_pages.len() * pool.page_size` +* **Reserved VA:** `pool.roots.len() * pool.va_size` +* **Currently allocated (live) bytes:** `sum(pool.malloc_regions.values())` +* **Currently reusable bytes:** `sum(pool.free_regions.values().map(|r| r.size))` +* **Holes:** `sum(pool.unmapped_regions.values())` +* `Debug` output prints the metrics above plus every region in ascending VA order. ## Asynchrony & Streams * VPMM supports **multi-stream workloads** using `cudaStreamPerThread`. -* A single shared `VirtualMemoryPool` serves all streams, using CUDA events to track cross-stream dependencies. -* Memory freed by one stream can be reused by another stream with minimal synchronization overhead (opportunistic reuse of completed regions, forced async waits only if needed). -* **Access permissions:** Set via `cuMemSetAccess` after remapping to ensure memory is accessible on the current device. \ No newline at end of file +* A single shared `VirtualMemoryPool` serves all streams. Each free region carries the stream id plus a CUDA event. We reuse a region immediately if its event has completed; otherwise we call `event.synchronize()` right before detaching it. Events are dropped as soon as the synchronous wait completes. +* **Access permissions:** After remapping (or mapping newly allocated pages) we call `cuMemSetAccess` on the destination hole to ensure the caller’s device has read/write permission. \ No newline at end of file From 90b53d67ee497dfe9435032038636d3ea674d00f Mon Sep 17 00:00:00 2001 From: gaxiom Date: Wed, 26 Nov 2025 21:30:13 +0000 Subject: [PATCH 09/20] fix left neighbour merge on defrag --- .../cuda-common/src/memory_manager/vm_pool.rs | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index d45f5a16..df659286 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -92,7 +92,7 @@ impl VirtualMemoryPool { "Virtual pool size must be a multiple of page size" ); let va_base = vpmm_reserve(va_size, page_size).unwrap(); - tracing::info!( + tracing::debug!( "VPMM: Reserved virtual address space {} with page size {}", ByteSize::b(va_size as u64), ByteSize::b(page_size as u64) @@ -146,7 +146,10 @@ impl VirtualMemoryPool { } if let Some(ptr) = best_region { - let region = self.free_regions.remove(&ptr).unwrap(); + let region = self + .free_regions + .remove(&ptr) + .expect("Incorrect free region address"); debug_assert_eq!(region.stream_id, stream_id); // If region is larger, return remainder to free list @@ -223,7 +226,7 @@ impl VirtualMemoryPool { .ok_or(MemoryError::InvalidPointer) .expect("Pointer not found in malloc_regions - invalid pointer freed"); - self.free_region_insert(ptr, size, stream_id); + let _ = self.free_region_insert(ptr, size, stream_id); Ok(size) } @@ -233,7 +236,7 @@ impl VirtualMemoryPool { mut ptr: CUdeviceptr, mut size: usize, stream_id: CudaStreamId, - ) { + ) -> CUdeviceptr { // Potential merge with next neighbor if let Some((&next_ptr, next_region)) = self.free_regions.range(ptr + 1..).next() { if next_region.stream_id == stream_id && ptr + size as u64 == next_ptr { @@ -265,6 +268,7 @@ impl VirtualMemoryPool { id, }, ); + ptr } /// Return the base address of a virtual hole large enough for `requested` bytes. @@ -344,6 +348,7 @@ impl VirtualMemoryPool { let dst = self .take_unmapped_region(requested) .expect("Failed to take unmapped region"); + let mut returned_ptr = dst; // Allocate new pages if we don't have enough free regions let mut allocated_dst = dst; @@ -383,11 +388,11 @@ impl VirtualMemoryPool { "Failed to set access permissions for newly allocated virtual memory region", ); } - self.free_region_insert(dst, allocate_size, stream_id); + returned_ptr = self.free_region_insert(dst, allocate_size, stream_id); } let mut remaining = requested - allocate_size; if remaining == 0 { - return Ok(Some(dst)); + return Ok(Some(returned_ptr)); } // Pull free regions (oldest first) until we've gathered enough pages. @@ -417,12 +422,14 @@ impl VirtualMemoryPool { // Return the unused tail to the free list so it stays available. let leftover_addr = addr + take as u64; let leftover_size = region.size - take; - self.free_region_insert(leftover_addr, leftover_size, region.stream_id); + let _ = self.free_region_insert(leftover_addr, leftover_size, region.stream_id); } } - self.remap_regions(to_defrag, allocated_dst, stream_id)?; - - Ok(Some(dst)) + Ok(Some(std::cmp::min( + returned_ptr, + self.remap_regions(to_defrag, allocated_dst, stream_id) + .unwrap(), + ))) } /// Remap a list of regions to a new base address. @@ -432,9 +439,9 @@ impl VirtualMemoryPool { regions: Vec<(CUdeviceptr, usize)>, dst: CUdeviceptr, stream_id: CudaStreamId, - ) -> Result<(), MemoryError> { + ) -> Result { if regions.is_empty() { - return Ok(()); + return Ok(dst); } let bytes_to_remap = regions.iter().map(|(_, size)| *size).sum::(); @@ -476,8 +483,7 @@ impl VirtualMemoryPool { vpmm_set_access(dst, bytes_to_remap, self.device_id) .expect("Failed to set access permissions for remapped virtual memory region"); } - self.free_region_insert(dst, bytes_to_remap, stream_id); - Ok(()) + Ok(self.free_region_insert(dst, bytes_to_remap, stream_id)) } /// Returns the total physical memory currently mapped in this pool (in bytes). From 08add1367fa2199c665483fab69ced7bafd837e9 Mon Sep 17 00:00:00 2001 From: gaxiom Date: Wed, 26 Nov 2025 21:51:39 +0000 Subject: [PATCH 10/20] eclude allocated ptr from defrag --- crates/cuda-common/src/memory_manager/vm_pool.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index df659286..a7367f6a 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -400,7 +400,8 @@ impl VirtualMemoryPool { let mut oldest_free_regions: Vec<_> = self .free_regions .iter() - .map(|(addr, region)| (region.id, *addr)) + .filter(|(&addr, _)| allocate_size == 0 || addr != returned_ptr) + .map(|(&addr, region)| (region.id, addr)) .collect(); oldest_free_regions.sort_by_key(|(id, _)| *id); for (_, addr) in oldest_free_regions { From d9eb79f17743f80904d26344995e4b4cab158a7c Mon Sep 17 00:00:00 2001 From: gaxiom Date: Wed, 26 Nov 2025 22:46:14 +0000 Subject: [PATCH 11/20] fixing error handling --- crates/cuda-common/src/memory_manager/cuda.rs | 9 +- crates/cuda-common/src/memory_manager/mod.rs | 60 ++++---- .../cuda-common/src/memory_manager/vm_pool.rs | 134 +++++++++++------- 3 files changed, 115 insertions(+), 88 deletions(-) diff --git a/crates/cuda-common/src/memory_manager/cuda.rs b/crates/cuda-common/src/memory_manager/cuda.rs index 82dd8a26..8d67efe6 100644 --- a/crates/cuda-common/src/memory_manager/cuda.rs +++ b/crates/cuda-common/src/memory_manager/cuda.rs @@ -21,13 +21,8 @@ extern "C" { fn _vpmm_release(h: CUmemGenericAllocationHandle) -> i32; } -pub(super) unsafe fn vpmm_check_support(device_ordinal: i32) -> Result { - let status = _vpmm_check_support(device_ordinal); - if status == 0 { - Ok(true) - } else { - Err(CudaError::new(status)) - } +pub(super) unsafe fn vpmm_check_support(device_ordinal: i32) -> Result<(), CudaError> { + CudaError::from_result(_vpmm_check_support(device_ordinal)) } pub(super) unsafe fn vpmm_min_granularity(device_ordinal: i32) -> Result { diff --git a/crates/cuda-common/src/memory_manager/mod.rs b/crates/cuda-common/src/memory_manager/mod.rs index ae76b754..0456f0bd 100644 --- a/crates/cuda-common/src/memory_manager/mod.rs +++ b/crates/cuda-common/src/memory_manager/mod.rs @@ -37,6 +37,9 @@ pub struct MemoryManager { max_used_size: usize, } +/// # Safety +/// `MemoryManager` is not internally synchronized. These impls are safe because +/// the singleton instance is wrapped in `Mutex` via `MEMORY_MANAGER`. unsafe impl Send for MemoryManager {} unsafe impl Sync for MemoryManager {} @@ -59,49 +62,46 @@ impl MemoryManager { let mut tracked_size = size; let ptr = if size < self.pool.page_size { let mut ptr: *mut c_void = std::ptr::null_mut(); - check(unsafe { cudaMallocAsync(&mut ptr, size, cudaStreamPerThread) }) - .expect("Failed to allocate small memory block via cudaMallocAsync"); - self.allocated_ptrs - .insert(NonNull::new(ptr).expect("cudaMalloc returned null"), size); - Ok(ptr) + check(unsafe { cudaMallocAsync(&mut ptr, size, cudaStreamPerThread) }).map_err( + |e| { + tracing::error!("cudaMallocAsync failed: size={}: {:?}", size, e); + MemoryError::from(e) + }, + )?; + self.allocated_ptrs.insert( + NonNull::new(ptr).expect("BUG: cudaMallocAsync returned null"), + size, + ); + ptr } else { tracked_size = size.next_multiple_of(self.pool.page_size); - self.pool.malloc_internal( - tracked_size, - current_stream_id() - .expect("Failed to get current CUDA stream ID for large allocation"), - ) + let stream_id = current_stream_id()?; + self.pool.malloc_internal(tracked_size, stream_id)? }; self.current_size += tracked_size; if self.current_size > self.max_used_size { self.max_used_size = self.current_size; } - ptr + Ok(ptr) } /// # Safety /// The pointer `ptr` must be a valid, previously allocated device pointer. /// The caller must ensure that `ptr` is not used after this function is called. unsafe fn d_free(&mut self, ptr: *mut c_void) -> Result<(), MemoryError> { - let nn = NonNull::new(ptr) - .ok_or(MemoryError::NullPointer) - .expect("Attempted to free null pointer"); + let nn = NonNull::new(ptr).ok_or(MemoryError::NullPointer)?; if let Some(size) = self.allocated_ptrs.remove(&nn) { self.current_size -= size; - check(unsafe { cudaFreeAsync(ptr, cudaStreamPerThread) }) - .expect("Failed to free small memory block via cudaFreeAsync"); + check(unsafe { cudaFreeAsync(ptr, cudaStreamPerThread) }).map_err(|e| { + tracing::error!("cudaFreeAsync failed: ptr={:p}: {:?}", ptr, e); + MemoryError::from(e) + })?; } else { - self.current_size -= self - .pool - .free_internal( - ptr, - current_stream_id().expect( - "Failed to get current CUDA stream ID for freeing large allocation", - ), - ) - .expect("Failed to free large memory block from virtual memory pool"); + let stream_id = current_stream_id()?; + let freed_size = self.pool.free_internal(ptr, stream_id)?; + self.current_size -= freed_size; } Ok(()) @@ -112,13 +112,9 @@ impl Drop for MemoryManager { fn drop(&mut self) { let ptrs: Vec<*mut c_void> = self.allocated_ptrs.keys().map(|nn| nn.as_ptr()).collect(); for &ptr in &ptrs { - unsafe { self.d_free(ptr).unwrap() }; - } - if !self.allocated_ptrs.is_empty() { - println!( - "Error: {} allocations were automatically freed on MemoryManager drop", - self.allocated_ptrs.len() - ); + if let Err(e) = unsafe { self.d_free(ptr) } { + eprintln!("MemoryManager drop: failed to free {:p}: {:?}", ptr, e); + } } } } diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index a7367f6a..71223298 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -60,6 +60,9 @@ pub(super) struct VirtualMemoryPool { pub(super) device_id: i32, } +/// # Safety +/// `VirtualMemoryPool` is not internally synchronized. These impls are safe because +/// all access goes through `Mutex` in the parent module. unsafe impl Send for VirtualMemoryPool {} unsafe impl Sync for VirtualMemoryPool {} @@ -140,16 +143,14 @@ impl VirtualMemoryPool { if best_region.is_none() { // Phase 2: Defragmentation - best_region = self - .defragment_or_create_new_pages(requested, stream_id) - .expect("Failed to defragment or create new pages in virtual memory pool"); + best_region = self.defragment_or_create_new_pages(requested, stream_id)?; } if let Some(ptr) = best_region { let region = self .free_regions .remove(&ptr) - .expect("Incorrect free region address"); + .expect("BUG: free region address not found after find_best_fit"); debug_assert_eq!(region.stream_id, stream_id); // If region is larger, return remainder to free list @@ -199,17 +200,16 @@ impl VirtualMemoryPool { } // 1b. Try the oldest from other streams - candidates - .iter_mut() - .min_by_key(|(_, region)| region.id) - .map(|(addr, region)| { - region - .event - .synchronize() - .expect("Failed to synchronize CUDA event during allocation"); - region.stream_id = stream_id; - *addr - }) + if let Some((addr, region)) = candidates.iter_mut().min_by_key(|(_, region)| region.id) { + if let Err(e) = region.event.synchronize() { + tracing::error!("Event synchronize failed during find_best_fit: {:?}", e); + return None; + } + region.stream_id = stream_id; + Some(*addr) + } else { + None + } } /// Frees a pointer and returns the size of the freed memory. @@ -223,8 +223,7 @@ impl VirtualMemoryPool { let size = self .malloc_regions .remove(&ptr) - .ok_or(MemoryError::InvalidPointer) - .expect("Pointer not found in malloc_regions - invalid pointer freed"); + .ok_or(MemoryError::InvalidPointer)?; let _ = self.free_region_insert(ptr, size, stream_id); @@ -307,7 +306,7 @@ impl VirtualMemoryPool { return; } - if let Some((&prev_addr, &prev_size)) = self.unmapped_regions.range(..=addr).next_back() { + if let Some((&prev_addr, &prev_size)) = self.unmapped_regions.range(..addr).next_back() { if prev_addr + prev_size as u64 == addr { self.unmapped_regions.remove(&prev_addr); addr = prev_addr; @@ -345,10 +344,9 @@ impl VirtualMemoryPool { ); // Find a best fit unmapped region - let dst = self - .take_unmapped_region(requested) - .expect("Failed to take unmapped region"); - let mut returned_ptr = dst; + let dst = self.take_unmapped_region(requested)?; + // Sentinel value until we have a valid free region pointer from allocation + let mut allocated_ptr = CUdeviceptr::MAX; // Allocate new pages if we don't have enough free regions let mut allocated_dst = dst; @@ -358,6 +356,10 @@ impl VirtualMemoryPool { match vpmm_create_physical(self.device_id, self.page_size) { Ok(handle) => handle, Err(e) => { + tracing::error!( + "vpmm_create_physical failed: device={}, page_size={}: {:?}", + self.device_id, self.page_size, e + ); if e.is_out_of_memory() { return Err(MemoryError::OutOfMemory { requested: allocate_size, @@ -370,8 +372,13 @@ impl VirtualMemoryPool { } }; unsafe { - vpmm_map(allocated_dst, self.page_size, handle) - .expect("Failed to map physical memory page to virtual address"); + vpmm_map(allocated_dst, self.page_size, handle).map_err(|e| { + tracing::error!( + "vpmm_map failed: addr={:#x}, page_size={}, handle={}: {:?}", + allocated_dst, self.page_size, handle, e + ); + MemoryError::from(e) + })?; } self.active_pages.insert(allocated_dst, handle); allocated_dst += self.page_size as u64; @@ -384,15 +391,24 @@ impl VirtualMemoryPool { ByteSize::b(self.memory_usage() as u64) ); unsafe { - vpmm_set_access(dst, allocate_size, self.device_id).expect( - "Failed to set access permissions for newly allocated virtual memory region", - ); + vpmm_set_access(dst, allocate_size, self.device_id).map_err(|e| { + tracing::error!( + "vpmm_set_access failed: addr={:#x}, size={}, device={}: {:?}", + dst, allocate_size, self.device_id, e + ); + MemoryError::from(e) + })?; } - returned_ptr = self.free_region_insert(dst, allocate_size, stream_id); + allocated_ptr = self.free_region_insert(dst, allocate_size, stream_id); } let mut remaining = requested - allocate_size; if remaining == 0 { - return Ok(Some(returned_ptr)); + debug_assert_ne!( + allocated_ptr, + CUdeviceptr::MAX, + "Allocation returned no valid free region" + ); + return Ok(Some(allocated_ptr)); } // Pull free regions (oldest first) until we've gathered enough pages. @@ -400,7 +416,7 @@ impl VirtualMemoryPool { let mut oldest_free_regions: Vec<_> = self .free_regions .iter() - .filter(|(&addr, _)| allocate_size == 0 || addr != returned_ptr) + .filter(|(&addr, _)| allocate_size == 0 || addr != allocated_ptr) .map(|(&addr, region)| (region.id, addr)) .collect(); oldest_free_regions.sort_by_key(|(id, _)| *id); @@ -409,11 +425,11 @@ impl VirtualMemoryPool { break; } - let region = self.free_regions.remove(&addr).unwrap(); - region - .event - .synchronize() - .expect("Failed to synchronize event"); + let region = self.free_regions.remove(&addr).expect("BUG: free region disappeared"); + region.event.synchronize().map_err(|e| { + tracing::error!("Event synchronize failed during defrag: {:?}", e); + MemoryError::from(e) + })?; let take = remaining.min(region.size); to_defrag.push((addr, take)); @@ -426,11 +442,14 @@ impl VirtualMemoryPool { let _ = self.free_region_insert(leftover_addr, leftover_size, region.stream_id); } } - Ok(Some(std::cmp::min( - returned_ptr, - self.remap_regions(to_defrag, allocated_dst, stream_id) - .unwrap(), - ))) + let remapped_ptr = self.remap_regions(to_defrag, allocated_dst, stream_id)?; + let result = std::cmp::min(allocated_ptr, remapped_ptr); + debug_assert_ne!( + result, + CUdeviceptr::MAX, + "Both allocation and remapping returned no valid free region" + ); + Ok(Some(result)) } /// Remap a list of regions to a new base address. @@ -442,7 +461,8 @@ impl VirtualMemoryPool { stream_id: CudaStreamId, ) -> Result { if regions.is_empty() { - return Ok(dst); + // Nothing to remap; return sentinel so caller's min() picks the other operand + return Ok(CUdeviceptr::MAX); } let bytes_to_remap = regions.iter().map(|(_, size)| *size).sum::(); @@ -453,10 +473,16 @@ impl VirtualMemoryPool { ); let mut curr_dst = dst; - regions.into_iter().for_each(|(region_addr, region_size)| { + for (region_addr, region_size) in regions { // Unmap the region unsafe { - vpmm_unmap(region_addr, region_size).expect("Failed to unmap region"); + vpmm_unmap(region_addr, region_size).map_err(|e| { + tracing::error!( + "vpmm_unmap failed: addr={:#x}, size={}: {:?}", + region_addr, region_size, e + ); + MemoryError::from(e) + })?; } self.insert_unmapped_region(region_addr, region_size); @@ -467,22 +493,32 @@ impl VirtualMemoryPool { let handle = self .active_pages .remove(&page) - .expect("Active page not found during remapping - page handle missing"); + .expect("BUG: active page not found during remapping"); unsafe { - vpmm_map(curr_dst, self.page_size, handle) - .expect("Failed to remap physical memory page to new virtual address"); + vpmm_map(curr_dst, self.page_size, handle).map_err(|e| { + tracing::error!( + "vpmm_map (remap) failed: dst={:#x}, page_size={}, handle={}: {:?}", + curr_dst, self.page_size, handle, e + ); + MemoryError::from(e) + })?; } self.active_pages.insert(curr_dst, handle); curr_dst += self.page_size as u64; } - }); + } debug_assert_eq!(curr_dst - dst, bytes_to_remap as u64); // Set access permissions for the remapped region unsafe { - vpmm_set_access(dst, bytes_to_remap, self.device_id) - .expect("Failed to set access permissions for remapped virtual memory region"); + vpmm_set_access(dst, bytes_to_remap, self.device_id).map_err(|e| { + tracing::error!( + "vpmm_set_access (remap) failed: addr={:#x}, size={}, device={}: {:?}", + dst, bytes_to_remap, self.device_id, e + ); + MemoryError::from(e) + })?; } Ok(self.free_region_insert(dst, bytes_to_remap, stream_id)) } From 1531750dadf6cc602883a156a7f1d26e4551a67f Mon Sep 17 00:00:00 2001 From: gaxiom Date: Wed, 26 Nov 2025 22:54:25 +0000 Subject: [PATCH 12/20] doc update --- docs/vpmm_spec.md | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/docs/vpmm_spec.md b/docs/vpmm_spec.md index 0dbdd8d7..4acd0905 100644 --- a/docs/vpmm_spec.md +++ b/docs/vpmm_spec.md @@ -136,6 +136,22 @@ pub(super) struct VirtualMemoryPool { * **Initial Pages:** `VPMM_PAGES` controls how many pages are eagerly mapped. Defaults to 0 (purely on-demand). * **Mapping Unit:** Always page-sized; the pool never subdivides a page. +## Recommended Configuration + +**For best performance**, preallocate ~80% of available GPU memory to avoid runtime allocations: + +```bash +# Example: 40 GB GPU → preallocate 32 GB (80%) +export VPMM_PAGES=$(((32 << 30) / (2 << 20))) # 16384 pages at 2 MB each +``` + +**To disable VPMM** and fall back to `cudaMallocAsync`, use a page size larger than any expected allocation: + +```bash +export VPMM_PAGE_SIZE=$((32 << 30)) # 32 GB page size +export VPMM_PAGES=0 +``` + ## Allocation & Growth Allocations occur during initialization (preallocation) and on‑demand when the pool runs short of free pages. @@ -154,7 +170,7 @@ Triggered when Phase 1 fails. The new defragmentation flow is deterministic an 3. **Harvest oldest frees** — iterate `free_regions` ordered by `free_id`. For each region: * Block on its CUDA event via `CudaEvent::synchronize()`. * Detach only the portion we need (`take = min(remaining, region.size)`), unmap it, and push `(addr, take)` to a work list. - * Reinsert the leftover tail (if any) back into `free_regions` with its original stream/event. + * Reinsert the leftover tail (if any) back into `free_regions` with the original stream but a new event/id. Stop once the total `take` covers the remainder of the request. 4. **Remap into the hole** — unmap each harvested chunk, add its VA back to `unmapped_regions`, and remap the chunk (page by page) contiguously into the hole immediately after the newly allocated portion. The combined span becomes the new free region for the requesting stream. @@ -162,9 +178,8 @@ Triggered when Phase 1 fails. The new defragmentation flow is deterministic an ## malloc(size) — Allocation Policy **Phase 1: Zero-cost attempts** -1. **Best fit on current stream** — smallest region from the caller’s stream that fits `requested`. -2. **Completed from other streams** — any region whose event has already completed (no blocking). -3. **Oldest in-flight** — take the lowest `free_id`, call `CudaEvent::synchronize()`, and hand that region to the caller. This avoids full defrag if one region can satisfy the request. +1. **Best fit on current stream** — smallest region from the caller's stream that fits `requested`. +2. **Oldest from other streams** — take the region with the lowest `free_id`, call `CudaEvent::synchronize()`, and hand that region to the caller. This avoids full defrag if one region can satisfy the request. **Phase 2: Hole-based defragmentation** 4. **Reserve hole** — via `take_unmapped_region`. @@ -175,7 +190,7 @@ Triggered when Phase 1 fails. The new defragmentation flow is deterministic an Additional rules: * **Alignment:** All allocations are **rounded up** to page‑size multiples. A page is either entirely free or entirely used. -* **Small Buffers:** If `size < page_size`, bypass the VM pool and call `cudaMallocAsync` instead (preserves compatibility; setting `VMM_PAGE_SIZE = usize::MAX` effectively disables the pool for typical sizes). +* **Small Buffers:** If `size < page_size`, bypass the VM pool and call `cudaMallocAsync` instead (preserves compatibility; setting `VPMM_PAGE_SIZE = usize::MAX` effectively disables the pool for typical sizes). ## free(ptr) @@ -198,5 +213,5 @@ Additional rules: ## Asynchrony & Streams * VPMM supports **multi-stream workloads** using `cudaStreamPerThread`. -* A single shared `VirtualMemoryPool` serves all streams. Each free region carries the stream id plus a CUDA event. We reuse a region immediately if its event has completed; otherwise we call `event.synchronize()` right before detaching it. Events are dropped as soon as the synchronous wait completes. -* **Access permissions:** After remapping (or mapping newly allocated pages) we call `cuMemSetAccess` on the destination hole to ensure the caller’s device has read/write permission. \ No newline at end of file +* A single shared `VirtualMemoryPool` serves all streams. Each free region carries the stream id plus a CUDA event (wrapped in `Arc`). When reusing a region from another stream, we call `event.synchronize()` before detaching it. Events are dropped when the region is consumed or coalesced. +* **Access permissions:** After remapping (or mapping newly allocated pages) we call `cuMemSetAccess` on the destination hole to ensure the caller's device has read/write permission. \ No newline at end of file From 1d3174bbd159dea7e279500ebbbfd3fd5b5657ee Mon Sep 17 00:00:00 2001 From: gaxiom Date: Wed, 26 Nov 2025 23:23:08 +0000 Subject: [PATCH 13/20] config --- .../cuda-common/src/memory_manager/vm_pool.rs | 165 ++++++++++++------ 1 file changed, 116 insertions(+), 49 deletions(-) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index 71223298..dc2b82cf 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -20,6 +20,74 @@ extern "C" { fn cudaMemGetInfo(free_bytes: *mut usize, total_bytes: *mut usize) -> i32; } +// ============================================================================ +// Configuration +// ============================================================================ + +const DEFAULT_VA_SIZE: usize = 8 << 40; // 8 TB + +/// Configuration for the Virtual Memory Pool. +/// +/// Use `VpmmConfig::from_env()` to load from environment variables, +/// or construct directly for testing. +#[derive(Debug, Clone)] +pub(super) struct VpmmConfig { + /// Page size override. If `None`, uses CUDA's minimum granularity for the device. + pub page_size: Option, + /// Virtual address space size per reserved chunk (default: 8 TB). + pub va_size: usize, + /// Number of pages to preallocate at startup (default: 0). + pub initial_pages: usize, +} + +impl Default for VpmmConfig { + fn default() -> Self { + Self { + page_size: None, + va_size: DEFAULT_VA_SIZE, + initial_pages: 0, + } + } +} + +impl VpmmConfig { + /// Load configuration from environment variables: + /// - `VPMM_PAGE_SIZE`: Page size in bytes (must be multiple of CUDA granularity) + /// - `VPMM_VA_SIZE`: Virtual address space size per chunk (default: 8 TB) + /// - `VPMM_PAGES`: Number of pages to preallocate (default: 0) + pub fn from_env() -> Self { + let page_size = std::env::var("VPMM_PAGE_SIZE").ok().map(|val| { + let size: usize = val.parse().expect("VPMM_PAGE_SIZE must be a valid number"); + assert!(size > 0, "VPMM_PAGE_SIZE must be > 0"); + size + }); + + let va_size = match std::env::var("VPMM_VA_SIZE") { + Ok(val) => { + let size: usize = val.parse().expect("VPMM_VA_SIZE must be a valid number"); + assert!(size > 0, "VPMM_VA_SIZE must be > 0"); + size + } + Err(_) => DEFAULT_VA_SIZE, + }; + + let initial_pages = match std::env::var("VPMM_PAGES") { + Ok(val) => val.parse().expect("VPMM_PAGES must be a valid number"), + Err(_) => 0, + }; + + Self { + page_size, + va_size, + initial_pages, + } + } +} + +// ============================================================================ +// Pool Implementation +// ============================================================================ + #[derive(Debug, Clone)] struct FreeRegion { size: usize, @@ -28,8 +96,6 @@ struct FreeRegion { id: usize, } -const DEFAULT_VA_SIZE: usize = 8usize << 40; // 8 TB - /// Virtual memory pool implementation. pub(super) struct VirtualMemoryPool { // Virtual address space roots @@ -67,33 +133,37 @@ unsafe impl Send for VirtualMemoryPool {} unsafe impl Sync for VirtualMemoryPool {} impl VirtualMemoryPool { - fn new() -> Self { + fn new(config: VpmmConfig) -> Self { let device_id = set_device().unwrap(); + + // Check VPMM support and resolve page_size let (root, page_size, va_size) = unsafe { match vpmm_check_support(device_id) { Ok(_) => { - let gran = vpmm_min_granularity(device_id).unwrap(); - let page_size = match std::env::var("VPMM_PAGE_SIZE") { - Ok(val) => { - let custom_size: usize = - val.parse().expect("VPMM_PAGE_SIZE must be a valid number"); + let granularity = vpmm_min_granularity(device_id).unwrap(); + + // Resolve page_size: use config override or device granularity + let page_size = match config.page_size { + Some(size) => { assert!( - custom_size > 0 && custom_size % gran == 0, + size > 0 && size % granularity == 0, "VPMM_PAGE_SIZE must be > 0 and multiple of {}", - gran + granularity ); - custom_size + size } - Err(_) => gran, - }; - let va_size = match std::env::var("VPMM_VA_SIZE") { - Ok(val) => val.parse().expect("VPMM_VA_SIZE must be a valid number"), - Err(_) => DEFAULT_VA_SIZE, + None => granularity, }; + + // Validate va_size + let va_size = config.va_size; assert!( va_size > 0 && va_size % page_size == 0, - "Virtual pool size must be a multiple of page size" + "VPMM_VA_SIZE must be > 0 and multiple of page size ({})", + page_size ); + + // Reserve initial VA chunk let va_base = vpmm_reserve(va_size, page_size).unwrap(); tracing::debug!( "VPMM: Reserved virtual address space {} with page size {}", @@ -109,7 +179,7 @@ impl VirtualMemoryPool { } }; - Self { + let mut pool = Self { roots: vec![root], active_pages: HashMap::new(), free_regions: BTreeMap::new(), @@ -123,7 +193,34 @@ impl VirtualMemoryPool { page_size, va_size, device_id, + }; + + // Preallocate pages if requested (skip if VPMM not supported) + if config.initial_pages > 0 && page_size != usize::MAX { + let alloc_size = config.initial_pages * page_size; + if let Err(e) = pool.defragment_or_create_new_pages( + alloc_size, + current_stream_id().unwrap(), + ) { + let mut free_mem = 0usize; + let mut total_mem = 0usize; + unsafe { + cudaMemGetInfo(&mut free_mem, &mut total_mem); + } + panic!( + "VPMM preallocation failed: {:?}\n\ + Config: pages={}, page_size={}\n\ + GPU Memory: free={}, total={}", + e, + config.initial_pages, + ByteSize::b(page_size as u64), + ByteSize::b(free_mem as u64), + ByteSize::b(total_mem as u64) + ); + } } + + pool } /// Allocates memory from the pool's free regions. @@ -549,37 +646,7 @@ impl Drop for VirtualMemoryPool { impl Default for VirtualMemoryPool { fn default() -> Self { - let mut pool = Self::new(); - - // Skip allocation if vpmm not supported - if pool.page_size == usize::MAX { - return pool; - } - - // Calculate initial number of pages to allocate - let initial_pages = match std::env::var("VPMM_PAGES") { - Ok(val) => { - let pages: usize = val.parse().expect("VPMM_PAGES must be a valid number"); - pages - } - Err(_) => 0, - }; - if let Err(e) = pool.defragment_or_create_new_pages( - initial_pages * pool.page_size, - current_stream_id().unwrap(), - ) { - // Check how much memory is available - let mut free_mem = 0usize; - let mut total_mem = 0usize; - unsafe { - cudaMemGetInfo(&mut free_mem, &mut total_mem); - } - panic!( - "Error:{:?}\nPool: pages={}, page_size={}\nMemory: free_mem={}, total_mem={}", - e, initial_pages, pool.page_size, free_mem, total_mem - ); - } - pool + Self::new(VpmmConfig::from_env()) } } From 2e65d970e8577a8882e3f2fc9c149a74d93f3a8d Mon Sep 17 00:00:00 2001 From: gaxiom Date: Wed, 26 Nov 2025 23:24:30 +0000 Subject: [PATCH 14/20] format --- .../cuda-common/src/memory_manager/vm_pool.rs | 42 +++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index dc2b82cf..b9bf39fd 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -27,7 +27,7 @@ extern "C" { const DEFAULT_VA_SIZE: usize = 8 << 40; // 8 TB /// Configuration for the Virtual Memory Pool. -/// +/// /// Use `VpmmConfig::from_env()` to load from environment variables, /// or construct directly for testing. #[derive(Debug, Clone)] @@ -198,10 +198,9 @@ impl VirtualMemoryPool { // Preallocate pages if requested (skip if VPMM not supported) if config.initial_pages > 0 && page_size != usize::MAX { let alloc_size = config.initial_pages * page_size; - if let Err(e) = pool.defragment_or_create_new_pages( - alloc_size, - current_stream_id().unwrap(), - ) { + if let Err(e) = + pool.defragment_or_create_new_pages(alloc_size, current_stream_id().unwrap()) + { let mut free_mem = 0usize; let mut total_mem = 0usize; unsafe { @@ -455,7 +454,9 @@ impl VirtualMemoryPool { Err(e) => { tracing::error!( "vpmm_create_physical failed: device={}, page_size={}: {:?}", - self.device_id, self.page_size, e + self.device_id, + self.page_size, + e ); if e.is_out_of_memory() { return Err(MemoryError::OutOfMemory { @@ -472,7 +473,10 @@ impl VirtualMemoryPool { vpmm_map(allocated_dst, self.page_size, handle).map_err(|e| { tracing::error!( "vpmm_map failed: addr={:#x}, page_size={}, handle={}: {:?}", - allocated_dst, self.page_size, handle, e + allocated_dst, + self.page_size, + handle, + e ); MemoryError::from(e) })?; @@ -491,7 +495,10 @@ impl VirtualMemoryPool { vpmm_set_access(dst, allocate_size, self.device_id).map_err(|e| { tracing::error!( "vpmm_set_access failed: addr={:#x}, size={}, device={}: {:?}", - dst, allocate_size, self.device_id, e + dst, + allocate_size, + self.device_id, + e ); MemoryError::from(e) })?; @@ -522,7 +529,10 @@ impl VirtualMemoryPool { break; } - let region = self.free_regions.remove(&addr).expect("BUG: free region disappeared"); + let region = self + .free_regions + .remove(&addr) + .expect("BUG: free region disappeared"); region.event.synchronize().map_err(|e| { tracing::error!("Event synchronize failed during defrag: {:?}", e); MemoryError::from(e) @@ -576,7 +586,9 @@ impl VirtualMemoryPool { vpmm_unmap(region_addr, region_size).map_err(|e| { tracing::error!( "vpmm_unmap failed: addr={:#x}, size={}: {:?}", - region_addr, region_size, e + region_addr, + region_size, + e ); MemoryError::from(e) })?; @@ -595,7 +607,10 @@ impl VirtualMemoryPool { vpmm_map(curr_dst, self.page_size, handle).map_err(|e| { tracing::error!( "vpmm_map (remap) failed: dst={:#x}, page_size={}, handle={}: {:?}", - curr_dst, self.page_size, handle, e + curr_dst, + self.page_size, + handle, + e ); MemoryError::from(e) })?; @@ -612,7 +627,10 @@ impl VirtualMemoryPool { vpmm_set_access(dst, bytes_to_remap, self.device_id).map_err(|e| { tracing::error!( "vpmm_set_access (remap) failed: addr={:#x}, size={}, device={}: {:?}", - dst, bytes_to_remap, self.device_id, e + dst, + bytes_to_remap, + self.device_id, + e ); MemoryError::from(e) })?; From 945644899be4af755c2d7165d60b5fa0a915ad6d Mon Sep 17 00:00:00 2001 From: gaxiom Date: Thu, 27 Nov 2025 00:31:12 +0000 Subject: [PATCH 15/20] tests --- crates/cuda-common/Cargo.toml | 3 + crates/cuda-common/src/memory_manager/mod.rs | 3 + .../cuda-common/src/memory_manager/tests.rs | 505 ++++++++++++++++++ .../cuda-common/src/memory_manager/vm_pool.rs | 4 +- 4 files changed, 513 insertions(+), 2 deletions(-) create mode 100644 crates/cuda-common/src/memory_manager/tests.rs diff --git a/crates/cuda-common/Cargo.toml b/crates/cuda-common/Cargo.toml index fc97bc0e..21360d78 100644 --- a/crates/cuda-common/Cargo.toml +++ b/crates/cuda-common/Cargo.toml @@ -17,5 +17,8 @@ ctor.workspace = true [build-dependencies] openvm-cuda-builder.workspace = true +[dev-dependencies] +tokio = { workspace = true, features = ["full"] } + [features] touchemall = [] \ No newline at end of file diff --git a/crates/cuda-common/src/memory_manager/mod.rs b/crates/cuda-common/src/memory_manager/mod.rs index 0456f0bd..5937b743 100644 --- a/crates/cuda-common/src/memory_manager/mod.rs +++ b/crates/cuda-common/src/memory_manager/mod.rs @@ -16,6 +16,9 @@ mod cuda; mod vm_pool; use vm_pool::VirtualMemoryPool; +#[cfg(test)] +mod tests; + #[link(name = "cudart")] extern "C" { fn cudaMallocAsync(dev_ptr: *mut *mut c_void, size: usize, stream: cudaStream_t) -> i32; diff --git a/crates/cuda-common/src/memory_manager/tests.rs b/crates/cuda-common/src/memory_manager/tests.rs new file mode 100644 index 00000000..183792cb --- /dev/null +++ b/crates/cuda-common/src/memory_manager/tests.rs @@ -0,0 +1,505 @@ +//! Tests for memory_manager - focused on edge cases and dangerous scenarios + +use std::sync::Arc; + +use tokio::sync::Barrier; + +use super::vm_pool::{VirtualMemoryPool, VpmmConfig}; +use crate::{d_buffer::DeviceBuffer, stream::current_stream_id}; + +// ============================================================================ +// Coalescing: free B first, then A, then C - should coalesce into one region +// ============================================================================ +#[test] +fn test_coalescing_via_combined_alloc() { + let len = 2 << 30; // 2 GB per allocation + let buf_a = DeviceBuffer::::with_capacity(len); + let buf_b = DeviceBuffer::::with_capacity(len); + let buf_c = DeviceBuffer::::with_capacity(len); + + let addr_a = buf_a.as_raw_ptr(); + + // Free in order: B, A, C - this tests both next and prev neighbor coalescing + drop(buf_b); + drop(buf_a); + drop(buf_c); + + // Request combined size - if coalescing worked, this should reuse from A's start + let combined_len = 3 * len; + let buf_combined = DeviceBuffer::::with_capacity(combined_len); + assert_eq!( + addr_a, + buf_combined.as_raw_ptr(), + "Should reuse coalesced region starting at A" + ); +} + +// ============================================================================ +// VA exhaustion: use tiny VA size to force multiple VA reservations +// ============================================================================ +#[test] +fn test_va_exhaustion_reserves_more() { + // Create pool with very small VA (4 MB) - will exhaust quickly + let config = VpmmConfig { + page_size: None, + va_size: 4 << 20, // 4 MB VA per chunk + initial_pages: 0, + }; + let mut pool = VirtualMemoryPool::new(config); + + if pool.page_size == usize::MAX { + println!("VPMM not supported, skipping test"); + return; + } + + let page_size = pool.page_size; + let stream_id = current_stream_id().unwrap(); + + // Initial state: 1 VA root + assert_eq!(pool.roots.len(), 1); + + // Allocate enough pages to exhaust first VA chunk and trigger second reservation + // 4MB VA / 2MB page = 2 pages max in first chunk + let mut ptrs = Vec::new(); + for _ in 0..4 { + // Allocate 4 pages total → needs 2 VA chunks + match pool.malloc_internal(page_size, stream_id) { + Ok(ptr) => ptrs.push(ptr), + Err(e) => panic!("Allocation failed: {:?}", e), + } + } + + assert!( + pool.roots.len() >= 2, + "Should have reserved additional VA chunks. Got {} roots", + pool.roots.len() + ); + + // Cleanup + for ptr in ptrs { + pool.free_internal(ptr, stream_id).unwrap(); + } +} + +// ============================================================================ +// Defragmentation scenario from vpmm_spec.md: +// +10 > +1 > -10 > +4 > +11 (in units of PAGE_SIZE) +// +// X = PAGES - 11 determines behavior: +// Case A: X ≥ 11 (PAGES ≥ 22) - enough free pages, no defrag +// Case B: 5 ≤ X < 11 (16 ≤ PAGES < 22) - defrag for +11, no new pages +// Case C: X == 4 (PAGES = 15) - defrag + allocate 1 new page +// Case D: 0 ≤ X < 4 (11 ≤ PAGES < 15) - different layout, defrag + new pages +// ============================================================================ + +/// Helper to run the doc scenario and return final state +fn run_doc_scenario( + initial_pages: usize, +) -> ( + VirtualMemoryPool, + usize, // page_size + *mut std::ffi::c_void, // ptr_1 (kept) + *mut std::ffi::c_void, // ptr_4 + *mut std::ffi::c_void, // ptr_11 +) { + let config = VpmmConfig { + page_size: None, // Use device granularity + va_size: 1 << 30, // 1 GB VA space + initial_pages, + }; + let mut pool = VirtualMemoryPool::new(config); + + if pool.page_size == usize::MAX { + panic!("VPMM not supported"); + } + + let page_size = pool.page_size; + let stream_id = current_stream_id().unwrap(); + + // Step 1: +10 pages + let ptr_10 = pool.malloc_internal(10 * page_size, stream_id).unwrap(); + assert!(!ptr_10.is_null()); + + // Step 2: +1 page + let ptr_1 = pool.malloc_internal(page_size, stream_id).unwrap(); + assert!(!ptr_1.is_null()); + // Should be right after the 10-page allocation + assert_eq!(ptr_1 as usize, ptr_10 as usize + 10 * page_size); + + // Step 3: -10 pages + pool.free_internal(ptr_10, stream_id).unwrap(); + + // Step 4: +4 pages + let ptr_4 = pool.malloc_internal(4 * page_size, stream_id).unwrap(); + assert!(!ptr_4.is_null()); + + // Step 5: +11 pages + let ptr_11 = pool.malloc_internal(11 * page_size, stream_id).unwrap(); + assert!(!ptr_11.is_null()); + + (pool, page_size, ptr_1, ptr_4, ptr_11) +} + +#[test] +fn test_defrag_case_a_enough_free_pages() { + // Case A: X ≥ 11, so PAGES ≥ 22 + // After +10 +1 we use 11 pages, leaving X=11 free + // +4 takes from the freed 10-page region (best fit) + // +11 can fit in remaining preallocated space + let initial_pages = 22; // X = 22 - 11 = 11 + + let (pool, page_size, ptr_1, ptr_4, ptr_11) = run_doc_scenario(initial_pages); + let stream_id = current_stream_id().unwrap(); + + // Memory usage should be exactly 22 pages (no new allocation needed) + assert_eq!( + pool.memory_usage(), + 22 * page_size, + "Case A: no new pages allocated" + ); + + // Step 4 layout: [+4][-6][1][-X] - 4 takes start of freed 10 + assert_eq!(ptr_4 as usize, pool.roots[0] as usize, "4 at VA start"); + + // Step 5 layout: [4][-6][1][+11][...] - 11 goes after the 1 + assert!( + ptr_11 as usize > ptr_1 as usize, + "Case A: 11 should be after 1 (no defrag)" + ); + + // Cleanup + let mut pool = pool; + pool.free_internal(ptr_1, stream_id).unwrap(); + pool.free_internal(ptr_4, stream_id).unwrap(); + pool.free_internal(ptr_11, stream_id).unwrap(); +} + +#[test] +fn test_defrag_case_b_defrag_no_new_pages() { + // Case B: 5 ≤ X < 11, so 16 ≤ PAGES < 22 + // After +10 +1, we have X free pages (5 ≤ X < 11) + // +4 goes after 1 (fits in X pages) + // +11 needs defrag: remap the 10-page free region + let initial_pages = 18; // X = 18 - 11 = 7 + + let (pool, page_size, ptr_1, ptr_4, ptr_11) = run_doc_scenario(initial_pages); + let stream_id = current_stream_id().unwrap(); + + // Memory usage should still be 18 pages (defrag reuses existing) + assert_eq!( + pool.memory_usage(), + 18 * page_size, + "Case B: no new pages allocated" + ); + + // In Case B, +4 goes after 1: [-10][1][+4][-(X-4)] + assert_eq!( + ptr_4 as usize, + ptr_1 as usize + page_size, + "Case B: 4 right after 1" + ); + + // Cleanup + let mut pool = pool; + pool.free_internal(ptr_1, stream_id).unwrap(); + pool.free_internal(ptr_4, stream_id).unwrap(); + pool.free_internal(ptr_11, stream_id).unwrap(); +} + +#[test] +fn test_defrag_case_c_defrag_plus_new_page() { + // Case C: X == 4, so PAGES = 15 + // After +10 +1, we have exactly 4 free pages + // +4 takes all free pages: [-10][1][+4] (no leftover) + // +11 needs defrag (remap 10) + allocate 1 new page + let initial_pages = 15; // X = 15 - 11 = 4 + + let (pool, page_size, ptr_1, ptr_4, ptr_11) = run_doc_scenario(initial_pages); + let stream_id = current_stream_id().unwrap(); + + // Memory usage: 15 original + 1 new = 16 pages + assert_eq!( + pool.memory_usage(), + 16 * page_size, + "Case C: 1 new page allocated" + ); + + // +4 goes after 1 (uses all remaining X=4 pages) + assert_eq!( + ptr_4 as usize, + ptr_1 as usize + page_size, + "Case C: 4 right after 1" + ); + + // Cleanup + let mut pool = pool; + pool.free_internal(ptr_1, stream_id).unwrap(); + pool.free_internal(ptr_4, stream_id).unwrap(); + pool.free_internal(ptr_11, stream_id).unwrap(); +} + +#[test] +fn test_defrag_case_d_not_enough_for_4() { + // Case D: 0 ≤ X < 4, so 11 ≤ PAGES < 15 + // After +10 +1, we have X < 4 free pages + // +4 cannot fit after 1, so it takes from freed 10: [+4][-6][1][-X] + // +11 needs defrag of the 6 + allocate (11-X-6) new pages + let initial_pages = 12; // X = 12 - 11 = 1 + + let (pool, page_size, ptr_1, ptr_4, ptr_11) = run_doc_scenario(initial_pages); + let stream_id = current_stream_id().unwrap(); + + // Memory usage: need 11 more pages but only have 6+1=7 free + // So allocate 11-7=4 new pages → 12 + 4 = 16 total + assert_eq!( + pool.memory_usage(), + 16 * page_size, + "Case D: 4 new pages allocated" + ); + + // +4 at VA start (takes from freed 10 since X < 4) + assert_eq!( + ptr_4 as usize, pool.roots[0] as usize, + "Case D: 4 at VA start" + ); + + // Cleanup + let mut pool = pool; + pool.free_internal(ptr_1, stream_id).unwrap(); + pool.free_internal(ptr_4, stream_id).unwrap(); + pool.free_internal(ptr_11, stream_id).unwrap(); +} + +// ============================================================================ +// Cross-thread: Thread A frees, Thread B should be able to reuse after sync +// ============================================================================ +#[test] +fn test_cross_thread_handoff() { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .max_blocking_threads(2) + .enable_all() + .build() + .unwrap(); + + runtime.block_on(async { + let barrier = Arc::new(Barrier::new(2)); + // Store address as usize (which is Send + Sync) + let addr_holder = Arc::new(std::sync::Mutex::new(0usize)); + + let barrier1 = barrier.clone(); + let addr_holder1 = addr_holder.clone(); + + // Thread A: allocate and free + let handle_a = tokio::task::spawn(async move { + tokio::task::spawn_blocking(move || { + let len = (200 << 20) / size_of::(); // 200 MB + let buf = DeviceBuffer::::with_capacity(len); + *addr_holder1.lock().unwrap() = buf.as_raw_ptr() as usize; + // buf drops here, freeing memory + }) + .await + .expect("task A panicked"); + + barrier1.wait().await; + }); + + let barrier2 = barrier.clone(); + let addr_holder2 = addr_holder.clone(); + + // Thread B: wait for A, then allocate same size + let handle_b = tokio::task::spawn(async move { + barrier2.wait().await; + + tokio::task::spawn_blocking(move || { + let len = (200 << 20) / size_of::(); + let buf = DeviceBuffer::::with_capacity(len); + + // Check if we got the same address (depends on event sync timing) + let original_addr = *addr_holder2.lock().unwrap(); + let new_addr = buf.as_raw_ptr() as usize; + if new_addr == original_addr { + println!("Cross-thread: reused same address (event synced)"); + } else { + println!("Cross-thread: different address (event pending)"); + } + // buf drops here + }) + .await + .expect("task B panicked"); + }); + + handle_a.await.expect("thread A failed"); + handle_b.await.expect("thread B failed"); + }); +} + +// ============================================================================ +// Stress: many threads doing random alloc/free patterns +// ============================================================================ +#[test] +fn test_stress_multithread() { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(8) + .max_blocking_threads(8) + .enable_all() + .build() + .unwrap(); + + runtime.block_on(async { + let mut handles = Vec::new(); + + for thread_idx in 0..8 { + let handle = tokio::task::spawn_blocking(move || { + let mut buffers: Vec> = Vec::new(); + + for op in 0..20 { + // Random-ish size: 100-500 MB (VPMM path) + let len = ((thread_idx * 7 + op * 3) % 5 + 1) * (100 << 20); + let buf = DeviceBuffer::::with_capacity(len); + buffers.push(buf); + + // Free every 3rd allocation + if op % 3 == 0 && !buffers.is_empty() { + buffers.remove(0); // drops and frees + } + } + // remaining buffers drop here + }); + handles.push(handle); + } + + for handle in handles { + handle.await.expect("stress thread failed"); + } + }); + + println!("Stress test: 8 threads × 20 ops completed"); +} + +// ============================================================================ +// Mixed: interleave small (cudaMallocAsync) and large (VPMM) allocations +// ============================================================================ +#[test] +fn test_mixed_small_large_multithread() { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .max_blocking_threads(4) + .enable_all() + .build() + .unwrap(); + + runtime.block_on(async { + let mut handles = Vec::new(); + + for thread_idx in 0..4 { + let handle = tokio::task::spawn_blocking(move || { + let mut buffers: Vec> = Vec::new(); + + for op in 0..15 { + let len = if op % 3 == 0 { + // Small: 1KB - 100KB (cudaMallocAsync) + ((thread_idx + 1) * (op + 1) * 1024) % (100 << 10) + 1024 + } else { + // Large: 100MB - 400MB (VPMM) + ((thread_idx + 1) * (op + 1) % 4 + 1) * (100 << 20) + }; + + let buf = DeviceBuffer::::with_capacity(len); + buffers.push(buf); + + if op % 2 == 0 && !buffers.is_empty() { + buffers.remove(0); // drops and frees + } + } + // remaining buffers drop here + }); + handles.push(handle); + } + + for handle in handles { + handle.await.expect("thread failed"); + } + }); +} + +// ============================================================================ +// Preallocation: test pool with initial pages already mapped +// ============================================================================ +#[test] +fn test_preallocation() { + let config = VpmmConfig { + page_size: None, // Use device granularity + va_size: 1 << 30, // 1 GB VA + initial_pages: 4, // Preallocate 4 pages + }; + let mut pool = VirtualMemoryPool::new(config); + + if pool.page_size == usize::MAX { + println!("VPMM not supported, skipping test"); + return; + } + + let page_size = pool.page_size; + let stream_id = current_stream_id().unwrap(); + + // Should already have 4 pages allocated + assert_eq!( + pool.memory_usage(), + 4 * page_size, + "Should have 4 preallocated pages" + ); + + // Allocate 2 pages from preallocated pool - no new pages needed + let ptr = pool.malloc_internal(2 * page_size, stream_id).unwrap(); + assert_eq!( + pool.memory_usage(), + 4 * page_size, + "No new pages should be allocated" + ); + + // Allocate 3 more pages - more than remaining 2 → triggers growth + let ptr2 = pool.malloc_internal(3 * page_size, stream_id).unwrap(); + assert!( + pool.memory_usage() > 4 * page_size, + "Pool should have grown past initial 4 pages" + ); + + // Cleanup + pool.free_internal(ptr, stream_id).unwrap(); + pool.free_internal(ptr2, stream_id).unwrap(); +} + +// ============================================================================ +// Reuse after free: same stream should immediately reuse freed region +// ============================================================================ +#[test] +fn test_same_stream_immediate_reuse() { + let config = VpmmConfig { + page_size: None, // Use device granularity + va_size: 1 << 30, // 1 GB VA + initial_pages: 0, + }; + let mut pool = VirtualMemoryPool::new(config); + + if pool.page_size == usize::MAX { + println!("VPMM not supported, skipping test"); + return; + } + + let page_size = pool.page_size; + let stream_id = current_stream_id().unwrap(); + + // Allocate 2 pages and free + let ptr1 = pool.malloc_internal(2 * page_size, stream_id).unwrap(); + pool.free_internal(ptr1, stream_id).unwrap(); + + // Same stream should immediately get the same address back + let ptr2 = pool.malloc_internal(2 * page_size, stream_id).unwrap(); + assert_eq!( + ptr1, ptr2, + "Same stream should immediately reuse freed region" + ); + + pool.free_internal(ptr2, stream_id).unwrap(); +} diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index b9bf39fd..e21bf587 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -99,7 +99,7 @@ struct FreeRegion { /// Virtual memory pool implementation. pub(super) struct VirtualMemoryPool { // Virtual address space roots - roots: Vec, + pub(super) roots: Vec, // Map for all active pages active_pages: HashMap, @@ -133,7 +133,7 @@ unsafe impl Send for VirtualMemoryPool {} unsafe impl Sync for VirtualMemoryPool {} impl VirtualMemoryPool { - fn new(config: VpmmConfig) -> Self { + pub(super) fn new(config: VpmmConfig) -> Self { let device_id = set_device().unwrap(); // Check VPMM support and resolve page_size From dccb9b40e950fe6e6955eb0afaa0217f0d57d6a1 Mon Sep 17 00:00:00 2001 From: Jonathan Wang <31040440+jonathanpwang@users.noreply.github.com> Date: Sun, 30 Nov 2025 13:42:33 -0500 Subject: [PATCH 16/20] chore: add clarifying comment --- crates/cuda-common/src/memory_manager/vm_pool.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index e21bf587..9dbd76a0 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -505,6 +505,11 @@ impl VirtualMemoryPool { } allocated_ptr = self.free_region_insert(dst, allocate_size, stream_id); } + // At this point, allocated_ptr is either + // - CUdeviceptr::MAX if no allocations occurred + // - or some pointer `<= dst` for the start of a free region (with VA-mapping) of at least + // `requested` bytes. The case `allocated_ptr < dst` happens if `free_region_insert` + // merged the allocated region with a previous free region. let mut remaining = requested - allocate_size; if remaining == 0 { debug_assert_ne!( From c299a4971a1551898abac7be26fea0f65c15e53f Mon Sep 17 00:00:00 2001 From: Jonathan Wang <31040440+jonathanpwang@users.noreply.github.com> Date: Sun, 30 Nov 2025 14:19:48 -0500 Subject: [PATCH 17/20] fix: remaining size needs to account for allocated_ptr < dst --- .../cuda-common/src/memory_manager/vm_pool.rs | 52 +++++++++++++++---- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index 9dbd76a0..479f2cef 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -239,7 +239,7 @@ impl VirtualMemoryPool { if best_region.is_none() { // Phase 2: Defragmentation - best_region = self.defragment_or_create_new_pages(requested, stream_id)?; + best_region = self.defragment_or_create_new_pages(requested, stream_id)?.0; } if let Some(ptr) = best_region { @@ -326,12 +326,19 @@ impl VirtualMemoryPool { Ok(size) } + /// Inserts a new free region for `(ptr, size)` into the free regions map, **possibly** merging + /// with existing adjacent regions. Merges only occur if the regions are both adjacent in memory + /// and on the same stream as `stream_id`. The new free region always records a new event on the + /// stream. + /// + /// Returns the starting pointer and size of the (possibly merged) free region containing `(ptr, + /// size)`. fn free_region_insert( &mut self, mut ptr: CUdeviceptr, mut size: usize, stream_id: CudaStreamId, - ) -> CUdeviceptr { + ) -> (CUdeviceptr, usize) { // Potential merge with next neighbor if let Some((&next_ptr, next_region)) = self.free_regions.range(ptr + 1..).next() { if next_region.stream_id == stream_id && ptr + size as u64 == next_ptr { @@ -363,7 +370,7 @@ impl VirtualMemoryPool { id, }, ); - ptr + (ptr, size) } /// Return the base address of a virtual hole large enough for `requested` bytes. @@ -422,11 +429,16 @@ impl VirtualMemoryPool { /// Defragments the pool by reusing existing holes and, if needed, reserving more VA space. /// Moves just enough pages to satisfy `requested`, keeping the remainder in place. + /// + /// The returned `pointer`, if not `None`, is guaranteed to exist as a key in the `free_regions` + /// map. The corresponding free region will have size `>= requested`. Note the size _may_ be + /// larger than requested. fn defragment_or_create_new_pages( &mut self, requested: usize, stream_id: CudaStreamId, ) -> Result, MemoryError> { + debug_assert_eq!(requested % self.page_size, 0); if requested == 0 { return Ok(None); } @@ -446,7 +458,8 @@ impl VirtualMemoryPool { // Allocate new pages if we don't have enough free regions let mut allocated_dst = dst; - let allocate_size = requested.saturating_sub(total_free_size); + let mut allocate_size = requested.saturating_sub(total_free_size); + debug_assert_eq!(allocate_size % self.page_size, 0); while allocated_dst < dst + allocate_size as u64 { let handle = unsafe { match vpmm_create_physical(self.device_id, self.page_size) { @@ -484,6 +497,7 @@ impl VirtualMemoryPool { self.active_pages.insert(allocated_dst, handle); allocated_dst += self.page_size as u64; } + debug_assert_eq!(allocated_dst, dst + allocated_size as u64); if allocate_size > 0 { tracing::debug!( "VPMM: Allocated {} bytes on stream {}. Total allocated: {}", @@ -503,14 +517,21 @@ impl VirtualMemoryPool { MemoryError::from(e) })?; } - allocated_ptr = self.free_region_insert(dst, allocate_size, stream_id); + (allocated_ptr, merged_allocate_size) = + self.free_region_insert(dst, allocate_size, stream_id); + debug_assert!(merged_allocate_size >= allocate_size); + allocate_size = merged_allocate_size; } // At this point, allocated_ptr is either // - CUdeviceptr::MAX if no allocations occurred // - or some pointer `<= dst` for the start of a free region (with VA-mapping) of at least // `requested` bytes. The case `allocated_ptr < dst` happens if `free_region_insert` - // merged the allocated region with a previous free region. - let mut remaining = requested - allocate_size; + // merged the allocated region with a previous free region. This only happens if the + // previous free region is on the same `stream_id`. In this case, we have + // [allocated_ptr..dst][dst..allocated_dst] which is all free and safe to use on the + // stream `stream_id` _without_ synchronization, because all events will be sequenced + // afterwards on the stream. + let mut remaining = requested.saturating_sub(allocate_size); if remaining == 0 { debug_assert_ne!( allocated_ptr, @@ -519,6 +540,7 @@ impl VirtualMemoryPool { ); return Ok(Some(allocated_ptr)); } + debug_assert!(allocate_size == 0 || allocated_ptr <= dst); // Pull free regions (oldest first) until we've gathered enough pages. let mut to_defrag: Vec<(CUdeviceptr, usize)> = Vec::new(); @@ -555,7 +577,9 @@ impl VirtualMemoryPool { } } let remapped_ptr = self.remap_regions(to_defrag, allocated_dst, stream_id)?; + // Take the minimum in case allocated_ptr is CUdeviceptr::MAX when allocated_size = 0 let result = std::cmp::min(allocated_ptr, remapped_ptr); + debug_assert!(allocate_size == 0 || allocated_ptr == remapped_ptr); debug_assert_ne!( result, CUdeviceptr::MAX, @@ -564,8 +588,15 @@ impl VirtualMemoryPool { Ok(Some(result)) } - /// Remap a list of regions to a new base address. - /// The regions already dropped from free regions + /// Remap a list of regions to a new base address. The regions are remapped consecutively + /// starting at `dst`. + /// + /// Returns the starting pointer of the new remapped free region or `CUdeviceptr::MAX` if no + /// remapping is needed. + /// + /// # Assumptions + /// The regions are already removed from the free regions map. Removal should mean that regions' + /// corresponding events have already completed and the regions can be safely unmapped. fn remap_regions( &mut self, regions: Vec<(CUdeviceptr, usize)>, @@ -640,7 +671,8 @@ impl VirtualMemoryPool { MemoryError::from(e) })?; } - Ok(self.free_region_insert(dst, bytes_to_remap, stream_id)) + let (remapped_ptr, _) = self.free_region_insert(dst, bytes_to_remap, stream_id); + Ok(remapped_ptr) } /// Returns the total physical memory currently mapped in this pool (in bytes). From 0df93e046af2810dfee29b804647c0578d16773b Mon Sep 17 00:00:00 2001 From: Jonathan Wang <31040440+jonathanpwang@users.noreply.github.com> Date: Sun, 30 Nov 2025 14:28:05 -0500 Subject: [PATCH 18/20] chore: fix typo and document assumptions --- .../cuda-common/src/memory_manager/vm_pool.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index 479f2cef..b9077847 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -239,7 +239,7 @@ impl VirtualMemoryPool { if best_region.is_none() { // Phase 2: Defragmentation - best_region = self.defragment_or_create_new_pages(requested, stream_id)?.0; + best_region = self.defragment_or_create_new_pages(requested, stream_id)?; } if let Some(ptr) = best_region { @@ -310,6 +310,10 @@ impl VirtualMemoryPool { /// Frees a pointer and returns the size of the freed memory. /// Coalesces adjacent free regions. + /// + /// # Assumptions + /// No CUDA streams will use the malloc region starting at `ptr` after the newly recorded event + /// on `stream_id` at this point in time. pub(super) fn free_internal( &mut self, ptr: *mut c_void, @@ -497,7 +501,7 @@ impl VirtualMemoryPool { self.active_pages.insert(allocated_dst, handle); allocated_dst += self.page_size as u64; } - debug_assert_eq!(allocated_dst, dst + allocated_size as u64); + debug_assert_eq!(allocated_dst, dst + allocate_size as u64); if allocate_size > 0 { tracing::debug!( "VPMM: Allocated {} bytes on stream {}. Total allocated: {}", @@ -517,10 +521,10 @@ impl VirtualMemoryPool { MemoryError::from(e) })?; } - (allocated_ptr, merged_allocate_size) = - self.free_region_insert(dst, allocate_size, stream_id); - debug_assert!(merged_allocate_size >= allocate_size); - allocate_size = merged_allocate_size; + let (merged_ptr, merged_size) = self.free_region_insert(dst, allocate_size, stream_id); + debug_assert!(merged_size >= allocate_size); + allocated_ptr = merged_ptr; + allocate_size = merged_size; } // At this point, allocated_ptr is either // - CUdeviceptr::MAX if no allocations occurred @@ -577,7 +581,7 @@ impl VirtualMemoryPool { } } let remapped_ptr = self.remap_regions(to_defrag, allocated_dst, stream_id)?; - // Take the minimum in case allocated_ptr is CUdeviceptr::MAX when allocated_size = 0 + // Take the minimum in case allocated_ptr is CUdeviceptr::MAX when allocate_size = 0 let result = std::cmp::min(allocated_ptr, remapped_ptr); debug_assert!(allocate_size == 0 || allocated_ptr == remapped_ptr); debug_assert_ne!( From 1518fa824d6691acd4929bd9740e111cbef78236 Mon Sep 17 00:00:00 2001 From: Jonathan Wang <31040440+jonathanpwang@users.noreply.github.com> Date: Sun, 30 Nov 2025 19:32:29 +0000 Subject: [PATCH 19/20] chore: switch to tracing error --- crates/cuda-common/src/memory_manager/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/cuda-common/src/memory_manager/mod.rs b/crates/cuda-common/src/memory_manager/mod.rs index 5937b743..d8a1f9a2 100644 --- a/crates/cuda-common/src/memory_manager/mod.rs +++ b/crates/cuda-common/src/memory_manager/mod.rs @@ -116,7 +116,7 @@ impl Drop for MemoryManager { let ptrs: Vec<*mut c_void> = self.allocated_ptrs.keys().map(|nn| nn.as_ptr()).collect(); for &ptr in &ptrs { if let Err(e) = unsafe { self.d_free(ptr) } { - eprintln!("MemoryManager drop: failed to free {:p}: {:?}", ptr, e); + tracing::error!("MemoryManager drop: failed to free {:p}: {:?}", ptr, e); } } } From d3479749d20922b98ceb9c1125740f858e888b87 Mon Sep 17 00:00:00 2001 From: Jonathan Wang <31040440+jonathanpwang@users.noreply.github.com> Date: Sun, 30 Nov 2025 16:13:06 -0500 Subject: [PATCH 20/20] chore: add another comment --- crates/cuda-common/src/memory_manager/vm_pool.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index b9077847..24ef4b01 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -378,6 +378,9 @@ impl VirtualMemoryPool { } /// Return the base address of a virtual hole large enough for `requested` bytes. + /// + /// The returned region is not inside `free_region` or `unmapped_region` map and must be + /// properly handled. fn take_unmapped_region(&mut self, requested: usize) -> Result { debug_assert!(requested != 0); debug_assert_eq!(requested % self.page_size, 0);