diff --git a/crates/cuda-common/Cargo.toml b/crates/cuda-common/Cargo.toml index fc97bc0e..21360d78 100644 --- a/crates/cuda-common/Cargo.toml +++ b/crates/cuda-common/Cargo.toml @@ -17,5 +17,8 @@ ctor.workspace = true [build-dependencies] openvm-cuda-builder.workspace = true +[dev-dependencies] +tokio = { workspace = true, features = ["full"] } + [features] touchemall = [] \ No newline at end of file diff --git a/crates/cuda-common/src/error.rs b/crates/cuda-common/src/error.rs index bb5dfa87..ff02c4fa 100644 --- a/crates/cuda-common/src/error.rs +++ b/crates/cuda-common/src/error.rs @@ -95,6 +95,9 @@ pub enum MemoryError { #[error("Invalid pointer: pointer not found in allocation table")] InvalidPointer, + + #[error("Failed to reserve virtual address space (bytes: {size}, page size: {page_size})")] + ReserveFailed { size: usize, page_size: usize }, } #[derive(Error, Debug)] diff --git a/crates/cuda-common/src/memory_manager/cuda.rs b/crates/cuda-common/src/memory_manager/cuda.rs index 82dd8a26..8d67efe6 100644 --- a/crates/cuda-common/src/memory_manager/cuda.rs +++ b/crates/cuda-common/src/memory_manager/cuda.rs @@ -21,13 +21,8 @@ extern "C" { fn _vpmm_release(h: CUmemGenericAllocationHandle) -> i32; } -pub(super) unsafe fn vpmm_check_support(device_ordinal: i32) -> Result { - let status = _vpmm_check_support(device_ordinal); - if status == 0 { - Ok(true) - } else { - Err(CudaError::new(status)) - } +pub(super) unsafe fn vpmm_check_support(device_ordinal: i32) -> Result<(), CudaError> { + CudaError::from_result(_vpmm_check_support(device_ordinal)) } pub(super) unsafe fn vpmm_min_granularity(device_ordinal: i32) -> Result { diff --git a/crates/cuda-common/src/memory_manager/mod.rs b/crates/cuda-common/src/memory_manager/mod.rs index a51927cf..d8a1f9a2 100644 --- a/crates/cuda-common/src/memory_manager/mod.rs +++ b/crates/cuda-common/src/memory_manager/mod.rs @@ -16,6 +16,9 @@ mod cuda; mod vm_pool; use vm_pool::VirtualMemoryPool; +#[cfg(test)] +mod tests; + #[link(name = "cudart")] extern "C" { fn cudaMallocAsync(dev_ptr: *mut *mut c_void, size: usize, stream: cudaStream_t) -> i32; @@ -37,6 +40,9 @@ pub struct MemoryManager { max_used_size: usize, } +/// # Safety +/// `MemoryManager` is not internally synchronized. These impls are safe because +/// the singleton instance is wrapped in `Mutex` via `MEMORY_MANAGER`. unsafe impl Send for MemoryManager {} unsafe impl Sync for MemoryManager {} @@ -59,21 +65,28 @@ impl MemoryManager { let mut tracked_size = size; let ptr = if size < self.pool.page_size { let mut ptr: *mut c_void = std::ptr::null_mut(); - check(unsafe { cudaMallocAsync(&mut ptr, size, cudaStreamPerThread) })?; - self.allocated_ptrs - .insert(NonNull::new(ptr).expect("cudaMalloc returned null"), size); - Ok(ptr) + check(unsafe { cudaMallocAsync(&mut ptr, size, cudaStreamPerThread) }).map_err( + |e| { + tracing::error!("cudaMallocAsync failed: size={}: {:?}", size, e); + MemoryError::from(e) + }, + )?; + self.allocated_ptrs.insert( + NonNull::new(ptr).expect("BUG: cudaMallocAsync returned null"), + size, + ); + ptr } else { tracked_size = size.next_multiple_of(self.pool.page_size); - self.pool - .malloc_internal(tracked_size, current_stream_id()?) + let stream_id = current_stream_id()?; + self.pool.malloc_internal(tracked_size, stream_id)? }; self.current_size += tracked_size; if self.current_size > self.max_used_size { self.max_used_size = self.current_size; } - ptr + Ok(ptr) } /// # Safety @@ -84,9 +97,14 @@ impl MemoryManager { if let Some(size) = self.allocated_ptrs.remove(&nn) { self.current_size -= size; - check(unsafe { cudaFreeAsync(ptr, cudaStreamPerThread) })?; + check(unsafe { cudaFreeAsync(ptr, cudaStreamPerThread) }).map_err(|e| { + tracing::error!("cudaFreeAsync failed: ptr={:p}: {:?}", ptr, e); + MemoryError::from(e) + })?; } else { - self.current_size -= self.pool.free_internal(ptr, current_stream_id()?)?; + let stream_id = current_stream_id()?; + let freed_size = self.pool.free_internal(ptr, stream_id)?; + self.current_size -= freed_size; } Ok(()) @@ -97,13 +115,9 @@ impl Drop for MemoryManager { fn drop(&mut self) { let ptrs: Vec<*mut c_void> = self.allocated_ptrs.keys().map(|nn| nn.as_ptr()).collect(); for &ptr in &ptrs { - unsafe { self.d_free(ptr).unwrap() }; - } - if !self.allocated_ptrs.is_empty() { - println!( - "Error: {} allocations were automatically freed on MemoryManager drop", - self.allocated_ptrs.len() - ); + if let Err(e) = unsafe { self.d_free(ptr) } { + tracing::error!("MemoryManager drop: failed to free {:p}: {:?}", ptr, e); + } } } } diff --git a/crates/cuda-common/src/memory_manager/tests.rs b/crates/cuda-common/src/memory_manager/tests.rs new file mode 100644 index 00000000..183792cb --- /dev/null +++ b/crates/cuda-common/src/memory_manager/tests.rs @@ -0,0 +1,505 @@ +//! Tests for memory_manager - focused on edge cases and dangerous scenarios + +use std::sync::Arc; + +use tokio::sync::Barrier; + +use super::vm_pool::{VirtualMemoryPool, VpmmConfig}; +use crate::{d_buffer::DeviceBuffer, stream::current_stream_id}; + +// ============================================================================ +// Coalescing: free B first, then A, then C - should coalesce into one region +// ============================================================================ +#[test] +fn test_coalescing_via_combined_alloc() { + let len = 2 << 30; // 2 GB per allocation + let buf_a = DeviceBuffer::::with_capacity(len); + let buf_b = DeviceBuffer::::with_capacity(len); + let buf_c = DeviceBuffer::::with_capacity(len); + + let addr_a = buf_a.as_raw_ptr(); + + // Free in order: B, A, C - this tests both next and prev neighbor coalescing + drop(buf_b); + drop(buf_a); + drop(buf_c); + + // Request combined size - if coalescing worked, this should reuse from A's start + let combined_len = 3 * len; + let buf_combined = DeviceBuffer::::with_capacity(combined_len); + assert_eq!( + addr_a, + buf_combined.as_raw_ptr(), + "Should reuse coalesced region starting at A" + ); +} + +// ============================================================================ +// VA exhaustion: use tiny VA size to force multiple VA reservations +// ============================================================================ +#[test] +fn test_va_exhaustion_reserves_more() { + // Create pool with very small VA (4 MB) - will exhaust quickly + let config = VpmmConfig { + page_size: None, + va_size: 4 << 20, // 4 MB VA per chunk + initial_pages: 0, + }; + let mut pool = VirtualMemoryPool::new(config); + + if pool.page_size == usize::MAX { + println!("VPMM not supported, skipping test"); + return; + } + + let page_size = pool.page_size; + let stream_id = current_stream_id().unwrap(); + + // Initial state: 1 VA root + assert_eq!(pool.roots.len(), 1); + + // Allocate enough pages to exhaust first VA chunk and trigger second reservation + // 4MB VA / 2MB page = 2 pages max in first chunk + let mut ptrs = Vec::new(); + for _ in 0..4 { + // Allocate 4 pages total → needs 2 VA chunks + match pool.malloc_internal(page_size, stream_id) { + Ok(ptr) => ptrs.push(ptr), + Err(e) => panic!("Allocation failed: {:?}", e), + } + } + + assert!( + pool.roots.len() >= 2, + "Should have reserved additional VA chunks. Got {} roots", + pool.roots.len() + ); + + // Cleanup + for ptr in ptrs { + pool.free_internal(ptr, stream_id).unwrap(); + } +} + +// ============================================================================ +// Defragmentation scenario from vpmm_spec.md: +// +10 > +1 > -10 > +4 > +11 (in units of PAGE_SIZE) +// +// X = PAGES - 11 determines behavior: +// Case A: X ≥ 11 (PAGES ≥ 22) - enough free pages, no defrag +// Case B: 5 ≤ X < 11 (16 ≤ PAGES < 22) - defrag for +11, no new pages +// Case C: X == 4 (PAGES = 15) - defrag + allocate 1 new page +// Case D: 0 ≤ X < 4 (11 ≤ PAGES < 15) - different layout, defrag + new pages +// ============================================================================ + +/// Helper to run the doc scenario and return final state +fn run_doc_scenario( + initial_pages: usize, +) -> ( + VirtualMemoryPool, + usize, // page_size + *mut std::ffi::c_void, // ptr_1 (kept) + *mut std::ffi::c_void, // ptr_4 + *mut std::ffi::c_void, // ptr_11 +) { + let config = VpmmConfig { + page_size: None, // Use device granularity + va_size: 1 << 30, // 1 GB VA space + initial_pages, + }; + let mut pool = VirtualMemoryPool::new(config); + + if pool.page_size == usize::MAX { + panic!("VPMM not supported"); + } + + let page_size = pool.page_size; + let stream_id = current_stream_id().unwrap(); + + // Step 1: +10 pages + let ptr_10 = pool.malloc_internal(10 * page_size, stream_id).unwrap(); + assert!(!ptr_10.is_null()); + + // Step 2: +1 page + let ptr_1 = pool.malloc_internal(page_size, stream_id).unwrap(); + assert!(!ptr_1.is_null()); + // Should be right after the 10-page allocation + assert_eq!(ptr_1 as usize, ptr_10 as usize + 10 * page_size); + + // Step 3: -10 pages + pool.free_internal(ptr_10, stream_id).unwrap(); + + // Step 4: +4 pages + let ptr_4 = pool.malloc_internal(4 * page_size, stream_id).unwrap(); + assert!(!ptr_4.is_null()); + + // Step 5: +11 pages + let ptr_11 = pool.malloc_internal(11 * page_size, stream_id).unwrap(); + assert!(!ptr_11.is_null()); + + (pool, page_size, ptr_1, ptr_4, ptr_11) +} + +#[test] +fn test_defrag_case_a_enough_free_pages() { + // Case A: X ≥ 11, so PAGES ≥ 22 + // After +10 +1 we use 11 pages, leaving X=11 free + // +4 takes from the freed 10-page region (best fit) + // +11 can fit in remaining preallocated space + let initial_pages = 22; // X = 22 - 11 = 11 + + let (pool, page_size, ptr_1, ptr_4, ptr_11) = run_doc_scenario(initial_pages); + let stream_id = current_stream_id().unwrap(); + + // Memory usage should be exactly 22 pages (no new allocation needed) + assert_eq!( + pool.memory_usage(), + 22 * page_size, + "Case A: no new pages allocated" + ); + + // Step 4 layout: [+4][-6][1][-X] - 4 takes start of freed 10 + assert_eq!(ptr_4 as usize, pool.roots[0] as usize, "4 at VA start"); + + // Step 5 layout: [4][-6][1][+11][...] - 11 goes after the 1 + assert!( + ptr_11 as usize > ptr_1 as usize, + "Case A: 11 should be after 1 (no defrag)" + ); + + // Cleanup + let mut pool = pool; + pool.free_internal(ptr_1, stream_id).unwrap(); + pool.free_internal(ptr_4, stream_id).unwrap(); + pool.free_internal(ptr_11, stream_id).unwrap(); +} + +#[test] +fn test_defrag_case_b_defrag_no_new_pages() { + // Case B: 5 ≤ X < 11, so 16 ≤ PAGES < 22 + // After +10 +1, we have X free pages (5 ≤ X < 11) + // +4 goes after 1 (fits in X pages) + // +11 needs defrag: remap the 10-page free region + let initial_pages = 18; // X = 18 - 11 = 7 + + let (pool, page_size, ptr_1, ptr_4, ptr_11) = run_doc_scenario(initial_pages); + let stream_id = current_stream_id().unwrap(); + + // Memory usage should still be 18 pages (defrag reuses existing) + assert_eq!( + pool.memory_usage(), + 18 * page_size, + "Case B: no new pages allocated" + ); + + // In Case B, +4 goes after 1: [-10][1][+4][-(X-4)] + assert_eq!( + ptr_4 as usize, + ptr_1 as usize + page_size, + "Case B: 4 right after 1" + ); + + // Cleanup + let mut pool = pool; + pool.free_internal(ptr_1, stream_id).unwrap(); + pool.free_internal(ptr_4, stream_id).unwrap(); + pool.free_internal(ptr_11, stream_id).unwrap(); +} + +#[test] +fn test_defrag_case_c_defrag_plus_new_page() { + // Case C: X == 4, so PAGES = 15 + // After +10 +1, we have exactly 4 free pages + // +4 takes all free pages: [-10][1][+4] (no leftover) + // +11 needs defrag (remap 10) + allocate 1 new page + let initial_pages = 15; // X = 15 - 11 = 4 + + let (pool, page_size, ptr_1, ptr_4, ptr_11) = run_doc_scenario(initial_pages); + let stream_id = current_stream_id().unwrap(); + + // Memory usage: 15 original + 1 new = 16 pages + assert_eq!( + pool.memory_usage(), + 16 * page_size, + "Case C: 1 new page allocated" + ); + + // +4 goes after 1 (uses all remaining X=4 pages) + assert_eq!( + ptr_4 as usize, + ptr_1 as usize + page_size, + "Case C: 4 right after 1" + ); + + // Cleanup + let mut pool = pool; + pool.free_internal(ptr_1, stream_id).unwrap(); + pool.free_internal(ptr_4, stream_id).unwrap(); + pool.free_internal(ptr_11, stream_id).unwrap(); +} + +#[test] +fn test_defrag_case_d_not_enough_for_4() { + // Case D: 0 ≤ X < 4, so 11 ≤ PAGES < 15 + // After +10 +1, we have X < 4 free pages + // +4 cannot fit after 1, so it takes from freed 10: [+4][-6][1][-X] + // +11 needs defrag of the 6 + allocate (11-X-6) new pages + let initial_pages = 12; // X = 12 - 11 = 1 + + let (pool, page_size, ptr_1, ptr_4, ptr_11) = run_doc_scenario(initial_pages); + let stream_id = current_stream_id().unwrap(); + + // Memory usage: need 11 more pages but only have 6+1=7 free + // So allocate 11-7=4 new pages → 12 + 4 = 16 total + assert_eq!( + pool.memory_usage(), + 16 * page_size, + "Case D: 4 new pages allocated" + ); + + // +4 at VA start (takes from freed 10 since X < 4) + assert_eq!( + ptr_4 as usize, pool.roots[0] as usize, + "Case D: 4 at VA start" + ); + + // Cleanup + let mut pool = pool; + pool.free_internal(ptr_1, stream_id).unwrap(); + pool.free_internal(ptr_4, stream_id).unwrap(); + pool.free_internal(ptr_11, stream_id).unwrap(); +} + +// ============================================================================ +// Cross-thread: Thread A frees, Thread B should be able to reuse after sync +// ============================================================================ +#[test] +fn test_cross_thread_handoff() { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .max_blocking_threads(2) + .enable_all() + .build() + .unwrap(); + + runtime.block_on(async { + let barrier = Arc::new(Barrier::new(2)); + // Store address as usize (which is Send + Sync) + let addr_holder = Arc::new(std::sync::Mutex::new(0usize)); + + let barrier1 = barrier.clone(); + let addr_holder1 = addr_holder.clone(); + + // Thread A: allocate and free + let handle_a = tokio::task::spawn(async move { + tokio::task::spawn_blocking(move || { + let len = (200 << 20) / size_of::(); // 200 MB + let buf = DeviceBuffer::::with_capacity(len); + *addr_holder1.lock().unwrap() = buf.as_raw_ptr() as usize; + // buf drops here, freeing memory + }) + .await + .expect("task A panicked"); + + barrier1.wait().await; + }); + + let barrier2 = barrier.clone(); + let addr_holder2 = addr_holder.clone(); + + // Thread B: wait for A, then allocate same size + let handle_b = tokio::task::spawn(async move { + barrier2.wait().await; + + tokio::task::spawn_blocking(move || { + let len = (200 << 20) / size_of::(); + let buf = DeviceBuffer::::with_capacity(len); + + // Check if we got the same address (depends on event sync timing) + let original_addr = *addr_holder2.lock().unwrap(); + let new_addr = buf.as_raw_ptr() as usize; + if new_addr == original_addr { + println!("Cross-thread: reused same address (event synced)"); + } else { + println!("Cross-thread: different address (event pending)"); + } + // buf drops here + }) + .await + .expect("task B panicked"); + }); + + handle_a.await.expect("thread A failed"); + handle_b.await.expect("thread B failed"); + }); +} + +// ============================================================================ +// Stress: many threads doing random alloc/free patterns +// ============================================================================ +#[test] +fn test_stress_multithread() { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(8) + .max_blocking_threads(8) + .enable_all() + .build() + .unwrap(); + + runtime.block_on(async { + let mut handles = Vec::new(); + + for thread_idx in 0..8 { + let handle = tokio::task::spawn_blocking(move || { + let mut buffers: Vec> = Vec::new(); + + for op in 0..20 { + // Random-ish size: 100-500 MB (VPMM path) + let len = ((thread_idx * 7 + op * 3) % 5 + 1) * (100 << 20); + let buf = DeviceBuffer::::with_capacity(len); + buffers.push(buf); + + // Free every 3rd allocation + if op % 3 == 0 && !buffers.is_empty() { + buffers.remove(0); // drops and frees + } + } + // remaining buffers drop here + }); + handles.push(handle); + } + + for handle in handles { + handle.await.expect("stress thread failed"); + } + }); + + println!("Stress test: 8 threads × 20 ops completed"); +} + +// ============================================================================ +// Mixed: interleave small (cudaMallocAsync) and large (VPMM) allocations +// ============================================================================ +#[test] +fn test_mixed_small_large_multithread() { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .max_blocking_threads(4) + .enable_all() + .build() + .unwrap(); + + runtime.block_on(async { + let mut handles = Vec::new(); + + for thread_idx in 0..4 { + let handle = tokio::task::spawn_blocking(move || { + let mut buffers: Vec> = Vec::new(); + + for op in 0..15 { + let len = if op % 3 == 0 { + // Small: 1KB - 100KB (cudaMallocAsync) + ((thread_idx + 1) * (op + 1) * 1024) % (100 << 10) + 1024 + } else { + // Large: 100MB - 400MB (VPMM) + ((thread_idx + 1) * (op + 1) % 4 + 1) * (100 << 20) + }; + + let buf = DeviceBuffer::::with_capacity(len); + buffers.push(buf); + + if op % 2 == 0 && !buffers.is_empty() { + buffers.remove(0); // drops and frees + } + } + // remaining buffers drop here + }); + handles.push(handle); + } + + for handle in handles { + handle.await.expect("thread failed"); + } + }); +} + +// ============================================================================ +// Preallocation: test pool with initial pages already mapped +// ============================================================================ +#[test] +fn test_preallocation() { + let config = VpmmConfig { + page_size: None, // Use device granularity + va_size: 1 << 30, // 1 GB VA + initial_pages: 4, // Preallocate 4 pages + }; + let mut pool = VirtualMemoryPool::new(config); + + if pool.page_size == usize::MAX { + println!("VPMM not supported, skipping test"); + return; + } + + let page_size = pool.page_size; + let stream_id = current_stream_id().unwrap(); + + // Should already have 4 pages allocated + assert_eq!( + pool.memory_usage(), + 4 * page_size, + "Should have 4 preallocated pages" + ); + + // Allocate 2 pages from preallocated pool - no new pages needed + let ptr = pool.malloc_internal(2 * page_size, stream_id).unwrap(); + assert_eq!( + pool.memory_usage(), + 4 * page_size, + "No new pages should be allocated" + ); + + // Allocate 3 more pages - more than remaining 2 → triggers growth + let ptr2 = pool.malloc_internal(3 * page_size, stream_id).unwrap(); + assert!( + pool.memory_usage() > 4 * page_size, + "Pool should have grown past initial 4 pages" + ); + + // Cleanup + pool.free_internal(ptr, stream_id).unwrap(); + pool.free_internal(ptr2, stream_id).unwrap(); +} + +// ============================================================================ +// Reuse after free: same stream should immediately reuse freed region +// ============================================================================ +#[test] +fn test_same_stream_immediate_reuse() { + let config = VpmmConfig { + page_size: None, // Use device granularity + va_size: 1 << 30, // 1 GB VA + initial_pages: 0, + }; + let mut pool = VirtualMemoryPool::new(config); + + if pool.page_size == usize::MAX { + println!("VPMM not supported, skipping test"); + return; + } + + let page_size = pool.page_size; + let stream_id = current_stream_id().unwrap(); + + // Allocate 2 pages and free + let ptr1 = pool.malloc_internal(2 * page_size, stream_id).unwrap(); + pool.free_internal(ptr1, stream_id).unwrap(); + + // Same stream should immediately get the same address back + let ptr2 = pool.malloc_internal(2 * page_size, stream_id).unwrap(); + assert_eq!( + ptr1, ptr2, + "Same stream should immediately reuse freed region" + ); + + pool.free_internal(ptr2, stream_id).unwrap(); +} diff --git a/crates/cuda-common/src/memory_manager/vm_pool.rs b/crates/cuda-common/src/memory_manager/vm_pool.rs index 2ea60574..24ef4b01 100644 --- a/crates/cuda-common/src/memory_manager/vm_pool.rs +++ b/crates/cuda-common/src/memory_manager/vm_pool.rs @@ -1,10 +1,9 @@ #![allow(non_camel_case_types, non_upper_case_globals, non_snake_case)] use std::{ - cmp::Reverse, collections::{BTreeMap, HashMap}, ffi::c_void, - sync::{Arc, Mutex}, + sync::Arc, }; use bytesize::ByteSize; @@ -13,22 +12,82 @@ use super::cuda::*; use crate::{ common::set_device, error::MemoryError, - stream::{ - cudaStreamPerThread, cudaStream_t, current_stream_id, default_stream_wait, CudaEvent, - CudaStreamId, - }, + stream::{current_stream_id, current_stream_sync, CudaEvent, CudaStreamId}, }; #[link(name = "cudart")] extern "C" { fn cudaMemGetInfo(free_bytes: *mut usize, total_bytes: *mut usize) -> i32; - fn cudaLaunchHostFunc( - stream: cudaStream_t, - fn_: Option, - user_data: *mut c_void, - ) -> i32; } +// ============================================================================ +// Configuration +// ============================================================================ + +const DEFAULT_VA_SIZE: usize = 8 << 40; // 8 TB + +/// Configuration for the Virtual Memory Pool. +/// +/// Use `VpmmConfig::from_env()` to load from environment variables, +/// or construct directly for testing. +#[derive(Debug, Clone)] +pub(super) struct VpmmConfig { + /// Page size override. If `None`, uses CUDA's minimum granularity for the device. + pub page_size: Option, + /// Virtual address space size per reserved chunk (default: 8 TB). + pub va_size: usize, + /// Number of pages to preallocate at startup (default: 0). + pub initial_pages: usize, +} + +impl Default for VpmmConfig { + fn default() -> Self { + Self { + page_size: None, + va_size: DEFAULT_VA_SIZE, + initial_pages: 0, + } + } +} + +impl VpmmConfig { + /// Load configuration from environment variables: + /// - `VPMM_PAGE_SIZE`: Page size in bytes (must be multiple of CUDA granularity) + /// - `VPMM_VA_SIZE`: Virtual address space size per chunk (default: 8 TB) + /// - `VPMM_PAGES`: Number of pages to preallocate (default: 0) + pub fn from_env() -> Self { + let page_size = std::env::var("VPMM_PAGE_SIZE").ok().map(|val| { + let size: usize = val.parse().expect("VPMM_PAGE_SIZE must be a valid number"); + assert!(size > 0, "VPMM_PAGE_SIZE must be > 0"); + size + }); + + let va_size = match std::env::var("VPMM_VA_SIZE") { + Ok(val) => { + let size: usize = val.parse().expect("VPMM_VA_SIZE must be a valid number"); + assert!(size > 0, "VPMM_VA_SIZE must be > 0"); + size + } + Err(_) => DEFAULT_VA_SIZE, + }; + + let initial_pages = match std::env::var("VPMM_PAGES") { + Ok(val) => val.parse().expect("VPMM_PAGES must be a valid number"), + Err(_) => 0, + }; + + Self { + page_size, + va_size, + initial_pages, + } + } +} + +// ============================================================================ +// Pool Implementation +// ============================================================================ + #[derive(Debug, Clone)] struct FreeRegion { size: usize, @@ -37,15 +96,10 @@ struct FreeRegion { id: usize, } -const VIRTUAL_POOL_SIZE: usize = 1usize << 45; // 32 TB - /// Virtual memory pool implementation. pub(super) struct VirtualMemoryPool { - // Virtual address space root - root: CUdeviceptr, - - // Current end of active address space - curr_end: CUdeviceptr, + // Virtual address space roots + pub(super) roots: Vec, // Map for all active pages active_pages: HashMap, @@ -53,67 +107,119 @@ pub(super) struct VirtualMemoryPool { // Free regions in virtual space (sorted by address) free_regions: BTreeMap, - // Number of free calls (used to assign ids to free regions) - free_num: usize, + // Active allocations: (ptr, size) + malloc_regions: HashMap, - // Pending events to destroy after "wait for event" is finished - // key: free_num as VMPool free state; value: non-completed events from other streams - pending_events: Arc>>>>, + // Unmapped regions: (ptr, size) + unmapped_regions: BTreeMap, - // Active allocations: (ptr, size) - used_regions: HashMap, + // Number of free calls (used to assign ids to free regions) + free_num: usize, // Granularity size: (page % 2MB must be 0) pub(super) page_size: usize, + // Reserved virtual address span in bytes + va_size: usize, + // Device ordinal pub(super) device_id: i32, } +/// # Safety +/// `VirtualMemoryPool` is not internally synchronized. These impls are safe because +/// all access goes through `Mutex` in the parent module. unsafe impl Send for VirtualMemoryPool {} unsafe impl Sync for VirtualMemoryPool {} impl VirtualMemoryPool { - fn new() -> Self { + pub(super) fn new(config: VpmmConfig) -> Self { let device_id = set_device().unwrap(); - let (root, page_size) = unsafe { + + // Check VPMM support and resolve page_size + let (root, page_size, va_size) = unsafe { match vpmm_check_support(device_id) { Ok(_) => { - let gran = vpmm_min_granularity(device_id).unwrap(); - let page_size = match std::env::var("VPMM_PAGE_SIZE") { - Ok(val) => { - let custom_size: usize = - val.parse().expect("VPMM_PAGE_SIZE must be a valid number"); + let granularity = vpmm_min_granularity(device_id).unwrap(); + + // Resolve page_size: use config override or device granularity + let page_size = match config.page_size { + Some(size) => { assert!( - custom_size > 0 && custom_size % gran == 0, + size > 0 && size % granularity == 0, "VPMM_PAGE_SIZE must be > 0 and multiple of {}", - gran + granularity ); - custom_size + size } - Err(_) => gran, + None => granularity, }; - let va_base = vpmm_reserve(VIRTUAL_POOL_SIZE, page_size).unwrap(); - (va_base, page_size) + + // Validate va_size + let va_size = config.va_size; + assert!( + va_size > 0 && va_size % page_size == 0, + "VPMM_VA_SIZE must be > 0 and multiple of page size ({})", + page_size + ); + + // Reserve initial VA chunk + let va_base = vpmm_reserve(va_size, page_size).unwrap(); + tracing::debug!( + "VPMM: Reserved virtual address space {} with page size {}", + ByteSize::b(va_size as u64), + ByteSize::b(page_size as u64) + ); + (va_base, page_size, va_size) } Err(_) => { tracing::warn!("VPMM not supported, falling back to cudaMallocAsync"); - (0, usize::MAX) + (0, usize::MAX, 0) } } }; - Self { - root, - curr_end: root, + let mut pool = Self { + roots: vec![root], active_pages: HashMap::new(), free_regions: BTreeMap::new(), + malloc_regions: HashMap::new(), + unmapped_regions: if va_size > 0 { + BTreeMap::from_iter([(root, va_size)]) + } else { + BTreeMap::new() + }, free_num: 0, - pending_events: Arc::new(Mutex::new(HashMap::new())), - used_regions: HashMap::new(), page_size, + va_size, device_id, + }; + + // Preallocate pages if requested (skip if VPMM not supported) + if config.initial_pages > 0 && page_size != usize::MAX { + let alloc_size = config.initial_pages * page_size; + if let Err(e) = + pool.defragment_or_create_new_pages(alloc_size, current_stream_id().unwrap()) + { + let mut free_mem = 0usize; + let mut total_mem = 0usize; + unsafe { + cudaMemGetInfo(&mut free_mem, &mut total_mem); + } + panic!( + "VPMM preallocation failed: {:?}\n\ + Config: pages={}, page_size={}\n\ + GPU Memory: free={}, total={}", + e, + config.initial_pages, + ByteSize::b(page_size as u64), + ByteSize::b(free_mem as u64), + ByteSize::b(total_mem as u64) + ); + } } + + pool } /// Allocates memory from the pool's free regions. @@ -137,7 +243,10 @@ impl VirtualMemoryPool { } if let Some(ptr) = best_region { - let region = self.free_regions.remove(&ptr).unwrap(); + let region = self + .free_regions + .remove(&ptr) + .expect("BUG: free region address not found after find_best_fit"); debug_assert_eq!(region.stream_id, stream_id); // If region is larger, return remainder to free list @@ -153,7 +262,7 @@ impl VirtualMemoryPool { ); } - self.used_regions.insert(ptr, requested); + self.malloc_regions.insert(ptr, requested); return Ok(ptr as *mut c_void); } @@ -186,42 +295,54 @@ impl VirtualMemoryPool { return Some(*addr); } - // 1b. Try completed from other streams (smallest completed fit) - candidates - .iter_mut() - .filter(|(_, region)| region.event.completed()) - .min_by_key(|(_, region)| region.size) - .map(|(addr, region)| { - region.stream_id = stream_id; - *addr - }) + // 1b. Try the oldest from other streams + if let Some((addr, region)) = candidates.iter_mut().min_by_key(|(_, region)| region.id) { + if let Err(e) = region.event.synchronize() { + tracing::error!("Event synchronize failed during find_best_fit: {:?}", e); + return None; + } + region.stream_id = stream_id; + Some(*addr) + } else { + None + } } /// Frees a pointer and returns the size of the freed memory. /// Coalesces adjacent free regions. + /// + /// # Assumptions + /// No CUDA streams will use the malloc region starting at `ptr` after the newly recorded event + /// on `stream_id` at this point in time. pub(super) fn free_internal( &mut self, ptr: *mut c_void, stream_id: CudaStreamId, ) -> Result { - debug_assert!(self.curr_end != self.root, "VM pool is empty"); let ptr = ptr as CUdeviceptr; let size = self - .used_regions + .malloc_regions .remove(&ptr) .ok_or(MemoryError::InvalidPointer)?; - self.free_region_insert(ptr, size, stream_id); + let _ = self.free_region_insert(ptr, size, stream_id); Ok(size) } + /// Inserts a new free region for `(ptr, size)` into the free regions map, **possibly** merging + /// with existing adjacent regions. Merges only occur if the regions are both adjacent in memory + /// and on the same stream as `stream_id`. The new free region always records a new event on the + /// stream. + /// + /// Returns the starting pointer and size of the (possibly merged) free region containing `(ptr, + /// size)`. fn free_region_insert( &mut self, mut ptr: CUdeviceptr, mut size: usize, stream_id: CudaStreamId, - ) { + ) -> (CUdeviceptr, usize) { // Potential merge with next neighbor if let Some((&next_ptr, next_region)) = self.free_regions.range(ptr + 1..).next() { if next_region.stream_id == stream_id && ptr + size as u64 == next_ptr { @@ -253,157 +374,114 @@ impl VirtualMemoryPool { id, }, ); + (ptr, size) } - /// Defragments the pool by consolidating free regions at the end of virtual address space. - /// Remaps pages to create one large contiguous free region. - fn defragment_or_create_new_pages( - &mut self, - requested: usize, - stream_id: CudaStreamId, - ) -> Result, MemoryError> { - if requested == 0 { - return Ok(None); - } - - tracing::debug!( - "VPMM: Defragging or creating new pages: requested={}, stream_id={}", - ByteSize::b(requested as u64), - stream_id - ); - - let mut to_defrag = Vec::new(); + /// Return the base address of a virtual hole large enough for `requested` bytes. + /// + /// The returned region is not inside `free_region` or `unmapped_region` map and must be + /// properly handled. + fn take_unmapped_region(&mut self, requested: usize) -> Result { + debug_assert!(requested != 0); + debug_assert_eq!(requested % self.page_size, 0); - // 2.a. Defrag current stream (newest first) - let mut current_stream_to_defrag: Vec<(CUdeviceptr, usize, usize)> = self - .free_regions + if let Some((&addr, &size)) = self + .unmapped_regions .iter() - .filter(|(_, r)| r.stream_id == stream_id) - .map(|(addr, r)| (*addr, r.size, r.id)) - .collect(); - - // If last free region is at the virtual end, use it as base for defragmentation - let (mut defrag_start, mut accumulated_size) = if current_stream_to_defrag - .last() - .is_some_and(|(addr, size, _)| *addr + *size as u64 == self.curr_end) + .filter(|(_, region_size)| **region_size >= requested) + .min_by_key(|(_, region_size)| *region_size) { - let (addr, size, _) = current_stream_to_defrag.pop().unwrap(); - (addr, size) - } else { - (self.curr_end, 0) + self.unmapped_regions.remove(&addr); + if size > requested { + self.unmapped_regions + .insert(addr + requested as u64, size - requested); + } + return Ok(addr); + } + + let addr = unsafe { + vpmm_reserve(self.va_size, self.page_size).map_err(|_| MemoryError::ReserveFailed { + size: self.va_size, + page_size: self.page_size, + })? }; + self.roots.push(addr); + self.insert_unmapped_region(addr + requested as u64, self.va_size - requested); + Ok(addr) + } - tracing::debug!( - "VPMM: Current stream {} trying to defragment from {:?}", - stream_id, - current_stream_to_defrag - .iter() - .map(|(_, size, _)| ByteSize::b(*size as u64)) - .collect::>() - ); + /// Insert a hole back into the unmapped set, coalescing with neighbors. + fn insert_unmapped_region(&mut self, mut addr: CUdeviceptr, mut size: usize) { + if size == 0 { + return; + } - let current_stream_to_defrag_size = current_stream_to_defrag - .iter() - .map(|(_, size, _)| size) - .sum::(); - if current_stream_to_defrag_size + accumulated_size < requested { - to_defrag.extend(current_stream_to_defrag.iter().map(|(ptr, _, _)| *ptr)); - accumulated_size += current_stream_to_defrag_size; - } else { - current_stream_to_defrag.sort_by_key(|(_, _, free_id)| Reverse(*free_id)); - - for (ptr, size, _) in current_stream_to_defrag { - to_defrag.push(ptr); - accumulated_size += size; - if accumulated_size >= requested { - self.remap_regions(to_defrag, stream_id)?; - return Ok(Some(defrag_start)); - } + if let Some((&prev_addr, &prev_size)) = self.unmapped_regions.range(..addr).next_back() { + if prev_addr + prev_size as u64 == addr { + self.unmapped_regions.remove(&prev_addr); + addr = prev_addr; + size += prev_size; } } - // 2.b. Defrag other streams (completed, oldest first) - let mut other_streams_to_defrag: Vec<(CUdeviceptr, usize, Arc, usize)> = self - .free_regions - .iter() - .filter(|(_, r)| r.stream_id != stream_id) - .map(|(addr, r)| (*addr, r.size, r.event.clone(), r.id)) - .collect(); + if let Some((&next_addr, &next_size)) = self.unmapped_regions.range(addr + 1..).next() { + if addr + size as u64 == next_addr { + self.unmapped_regions.remove(&next_addr); + size += next_size; + } + } - // If last free region is at the virtual end, take it and use as base for defragmentation - if accumulated_size == 0 - && other_streams_to_defrag - .last() - .is_some_and(|(addr, size, event, _)| { - event.completed() && *addr + *size as u64 == self.curr_end - }) - { - (defrag_start, accumulated_size, _, _) = other_streams_to_defrag.pop().unwrap(); - self.free_regions.get_mut(&defrag_start).unwrap().stream_id = stream_id; + self.unmapped_regions.insert(addr, size); + } + + /// Defragments the pool by reusing existing holes and, if needed, reserving more VA space. + /// Moves just enough pages to satisfy `requested`, keeping the remainder in place. + /// + /// The returned `pointer`, if not `None`, is guaranteed to exist as a key in the `free_regions` + /// map. The corresponding free region will have size `>= requested`. Note the size _may_ be + /// larger than requested. + fn defragment_or_create_new_pages( + &mut self, + requested: usize, + stream_id: CudaStreamId, + ) -> Result, MemoryError> { + debug_assert_eq!(requested % self.page_size, 0); + if requested == 0 { + return Ok(None); } + let total_free_size = self.free_regions.values().map(|r| r.size).sum::(); tracing::debug!( - "VPMM: Defragmented {} bytes from stream {}, other streams ready to borrow {:?}", - ByteSize::b(accumulated_size as u64), - stream_id, - other_streams_to_defrag - .iter() - .map(|(_, size, event, id)| (ByteSize::b(*size as u64), event.status(), id)) - .collect::>() + "VPMM: Defragging or creating new pages: requested={}, free={} stream_id={}", + ByteSize::b(requested as u64), + ByteSize::b(total_free_size as u64), + stream_id ); - other_streams_to_defrag.sort_by_key(|(_, _, event, free_id)| (event.status(), *free_id)); - - let mut events_to_wait = Vec::new(); - for (ptr, size, event, _) in other_streams_to_defrag { - if !event.completed() { - default_stream_wait(&event)?; - events_to_wait.push(event); - } - to_defrag.push(ptr); - accumulated_size += size; - if accumulated_size >= requested { - break; - } - } - if !events_to_wait.is_empty() { - // Destroy events only when "wait for event" is finished - tracing::debug!( - "VPMM: Async call {} events to destroy on stream {}", - events_to_wait.len(), - stream_id - ); - self.pending_events - .lock() - .unwrap() - .insert(self.free_num, events_to_wait); - unsafe { - cudaLaunchHostFunc( - cudaStreamPerThread, - Some(pending_events_destroy_callback), - Box::into_raw(Box::new((self.free_num, self.pending_events.clone()))) - as *mut c_void, - ) - }; - } - - self.remap_regions(to_defrag, stream_id)?; - if accumulated_size >= requested { - return Ok(Some(defrag_start)); - } + // Find a best fit unmapped region + let dst = self.take_unmapped_region(requested)?; + // Sentinel value until we have a valid free region pointer from allocation + let mut allocated_ptr = CUdeviceptr::MAX; - // 2.c. Create new pages - let allocate_start = self.curr_end; - let mut allocated_size = 0; - while accumulated_size < requested { + // Allocate new pages if we don't have enough free regions + let mut allocated_dst = dst; + let mut allocate_size = requested.saturating_sub(total_free_size); + debug_assert_eq!(allocate_size % self.page_size, 0); + while allocated_dst < dst + allocate_size as u64 { let handle = unsafe { match vpmm_create_physical(self.device_id, self.page_size) { Ok(handle) => handle, Err(e) => { + tracing::error!( + "vpmm_create_physical failed: device={}, page_size={}: {:?}", + self.device_id, + self.page_size, + e + ); if e.is_out_of_memory() { return Err(MemoryError::OutOfMemory { - requested, - available: accumulated_size, + requested: allocate_size, + available: (allocated_dst - dst) as usize, }); } else { return Err(MemoryError::from(e)); @@ -411,56 +489,197 @@ impl VirtualMemoryPool { } } }; - unsafe { vpmm_map(self.curr_end, self.page_size, handle)? }; - self.active_pages.insert(self.curr_end, handle); - self.curr_end += self.page_size as u64; - allocated_size += self.page_size; - accumulated_size += self.page_size; + unsafe { + vpmm_map(allocated_dst, self.page_size, handle).map_err(|e| { + tracing::error!( + "vpmm_map failed: addr={:#x}, page_size={}, handle={}: {:?}", + allocated_dst, + self.page_size, + handle, + e + ); + MemoryError::from(e) + })?; + } + self.active_pages.insert(allocated_dst, handle); + allocated_dst += self.page_size as u64; } - tracing::info!( - "GPU mem: VPMM allocated {} bytes on stream {}. Total allocated: {}", - ByteSize::b(allocated_size as u64), - stream_id, - ByteSize::b(self.memory_usage() as u64) - ); - unsafe { vpmm_set_access(allocate_start, allocated_size, self.device_id)? }; - self.free_region_insert(allocate_start, allocated_size, stream_id); + debug_assert_eq!(allocated_dst, dst + allocate_size as u64); + if allocate_size > 0 { + tracing::debug!( + "VPMM: Allocated {} bytes on stream {}. Total allocated: {}", + ByteSize::b(allocate_size as u64), + stream_id, + ByteSize::b(self.memory_usage() as u64) + ); + unsafe { + vpmm_set_access(dst, allocate_size, self.device_id).map_err(|e| { + tracing::error!( + "vpmm_set_access failed: addr={:#x}, size={}, device={}: {:?}", + dst, + allocate_size, + self.device_id, + e + ); + MemoryError::from(e) + })?; + } + let (merged_ptr, merged_size) = self.free_region_insert(dst, allocate_size, stream_id); + debug_assert!(merged_size >= allocate_size); + allocated_ptr = merged_ptr; + allocate_size = merged_size; + } + // At this point, allocated_ptr is either + // - CUdeviceptr::MAX if no allocations occurred + // - or some pointer `<= dst` for the start of a free region (with VA-mapping) of at least + // `requested` bytes. The case `allocated_ptr < dst` happens if `free_region_insert` + // merged the allocated region with a previous free region. This only happens if the + // previous free region is on the same `stream_id`. In this case, we have + // [allocated_ptr..dst][dst..allocated_dst] which is all free and safe to use on the + // stream `stream_id` _without_ synchronization, because all events will be sequenced + // afterwards on the stream. + let mut remaining = requested.saturating_sub(allocate_size); + if remaining == 0 { + debug_assert_ne!( + allocated_ptr, + CUdeviceptr::MAX, + "Allocation returned no valid free region" + ); + return Ok(Some(allocated_ptr)); + } + debug_assert!(allocate_size == 0 || allocated_ptr <= dst); + + // Pull free regions (oldest first) until we've gathered enough pages. + let mut to_defrag: Vec<(CUdeviceptr, usize)> = Vec::new(); + let mut oldest_free_regions: Vec<_> = self + .free_regions + .iter() + .filter(|(&addr, _)| allocate_size == 0 || addr != allocated_ptr) + .map(|(&addr, region)| (region.id, addr)) + .collect(); + oldest_free_regions.sort_by_key(|(id, _)| *id); + for (_, addr) in oldest_free_regions { + if remaining == 0 { + break; + } - Ok(Some(defrag_start)) + let region = self + .free_regions + .remove(&addr) + .expect("BUG: free region disappeared"); + region.event.synchronize().map_err(|e| { + tracing::error!("Event synchronize failed during defrag: {:?}", e); + MemoryError::from(e) + })?; + + let take = remaining.min(region.size); + to_defrag.push((addr, take)); + remaining -= take; + + if region.size > take { + // Return the unused tail to the free list so it stays available. + let leftover_addr = addr + take as u64; + let leftover_size = region.size - take; + let _ = self.free_region_insert(leftover_addr, leftover_size, region.stream_id); + } + } + let remapped_ptr = self.remap_regions(to_defrag, allocated_dst, stream_id)?; + // Take the minimum in case allocated_ptr is CUdeviceptr::MAX when allocate_size = 0 + let result = std::cmp::min(allocated_ptr, remapped_ptr); + debug_assert!(allocate_size == 0 || allocated_ptr == remapped_ptr); + debug_assert_ne!( + result, + CUdeviceptr::MAX, + "Both allocation and remapping returned no valid free region" + ); + Ok(Some(result)) } + /// Remap a list of regions to a new base address. The regions are remapped consecutively + /// starting at `dst`. + /// + /// Returns the starting pointer of the new remapped free region or `CUdeviceptr::MAX` if no + /// remapping is needed. + /// + /// # Assumptions + /// The regions are already removed from the free regions map. Removal should mean that regions' + /// corresponding events have already completed and the regions can be safely unmapped. fn remap_regions( &mut self, - regions: Vec, + regions: Vec<(CUdeviceptr, usize)>, + dst: CUdeviceptr, stream_id: CudaStreamId, - ) -> Result<(), MemoryError> { + ) -> Result { if regions.is_empty() { - return Ok(()); + // Nothing to remap; return sentinel so caller's min() picks the other operand + return Ok(CUdeviceptr::MAX); } - let new_region_start = self.curr_end; - for region_addr in regions { - let region = self - .free_regions - .remove(®ion_addr) - .ok_or(MemoryError::InvalidPointer)?; - let num_pages = region.size / self.page_size; + let bytes_to_remap = regions.iter().map(|(_, size)| *size).sum::(); + tracing::debug!( + "VPMM: Remapping {} regions. Total size = {}", + regions.len(), + ByteSize::b(bytes_to_remap as u64) + ); + + let mut curr_dst = dst; + for (region_addr, region_size) in regions { + // Unmap the region + unsafe { + vpmm_unmap(region_addr, region_size).map_err(|e| { + tracing::error!( + "vpmm_unmap failed: addr={:#x}, size={}: {:?}", + region_addr, + region_size, + e + ); + MemoryError::from(e) + })?; + } + self.insert_unmapped_region(region_addr, region_size); + + // Remap the region + let num_pages = region_size / self.page_size; for i in 0..num_pages { let page = region_addr + (i * self.page_size) as u64; let handle = self .active_pages .remove(&page) - .ok_or(MemoryError::InvalidPointer)?; - unsafe { vpmm_map(self.curr_end, self.page_size, handle)? }; - self.active_pages.insert(self.curr_end, handle); - self.curr_end += self.page_size as u64; + .expect("BUG: active page not found during remapping"); + unsafe { + vpmm_map(curr_dst, self.page_size, handle).map_err(|e| { + tracing::error!( + "vpmm_map (remap) failed: dst={:#x}, page_size={}, handle={}: {:?}", + curr_dst, + self.page_size, + handle, + e + ); + MemoryError::from(e) + })?; + } + self.active_pages.insert(curr_dst, handle); + curr_dst += self.page_size as u64; } } - let new_region_size = (self.curr_end - new_region_start) as usize; - unsafe { vpmm_set_access(new_region_start, new_region_size, self.device_id)? }; - self.free_region_insert(new_region_start, new_region_size, stream_id); - Ok(()) + debug_assert_eq!(curr_dst - dst, bytes_to_remap as u64); + + // Set access permissions for the remapped region + unsafe { + vpmm_set_access(dst, bytes_to_remap, self.device_id).map_err(|e| { + tracing::error!( + "vpmm_set_access (remap) failed: addr={:#x}, size={}, device={}: {:?}", + dst, + bytes_to_remap, + self.device_id, + e + ); + MemoryError::from(e) + })?; + } + let (remapped_ptr, _) = self.free_region_insert(dst, bytes_to_remap, stream_id); + Ok(remapped_ptr) } /// Returns the total physical memory currently mapped in this pool (in bytes). @@ -471,59 +690,70 @@ impl VirtualMemoryPool { impl Drop for VirtualMemoryPool { fn drop(&mut self) { - if self.root != self.curr_end { + current_stream_sync().unwrap(); + for (ptr, handle) in self.active_pages.drain() { unsafe { - vpmm_unmap(self.root, (self.curr_end - self.root) as usize).unwrap(); - for (_, handle) in self.active_pages.drain() { - vpmm_release(handle).unwrap(); - } + vpmm_unmap(ptr, self.page_size).unwrap(); + vpmm_release(handle).unwrap(); } } - unsafe { - vpmm_release_va(self.root, VIRTUAL_POOL_SIZE).unwrap(); + + for root in self.roots.drain(..) { + unsafe { + vpmm_release_va(root, self.va_size).unwrap(); + } } } } impl Default for VirtualMemoryPool { fn default() -> Self { - let mut pool = Self::new(); + Self::new(VpmmConfig::from_env()) + } +} - // Skip allocation if vpmm not supported - if pool.page_size == usize::MAX { - return pool; +#[allow(unused)] +impl std::fmt::Debug for VirtualMemoryPool { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!( + f, + "VMPool (VA_SIZE={}, PAGE_SIZE={})", + ByteSize::b(self.va_size as u64), + ByteSize::b(self.page_size as u64) + )?; + + let reserved = self.roots.len() * self.va_size; + let allocated = self.memory_usage(); + let free_bytes: usize = self.free_regions.values().map(|r| r.size).sum(); + let malloc_bytes: usize = self.malloc_regions.values().sum(); + let unmapped_bytes: usize = self.unmapped_regions.values().sum(); + + writeln!( + f, + "Total: reserved={}, allocated={}, free={}, malloc={}, unmapped={}", + ByteSize::b(reserved as u64), + ByteSize::b(allocated as u64), + ByteSize::b(free_bytes as u64), + ByteSize::b(malloc_bytes as u64), + ByteSize::b(unmapped_bytes as u64) + )?; + + let mut regions: Vec<(CUdeviceptr, usize, String)> = Vec::new(); + for (addr, region) in &self.free_regions { + regions.push((*addr, region.size, format!("free (s {})", region.stream_id))); + } + for (addr, size) in &self.malloc_regions { + regions.push((*addr, *size, "malloc".to_string())); + } + for (addr, size) in &self.unmapped_regions { + regions.push((*addr, *size, "unmapped".to_string())); } + regions.sort_by_key(|(addr, _, _)| *addr); - // Calculate initial number of pages to allocate - let initial_pages = match std::env::var("VPMM_PAGES") { - Ok(val) => { - let pages: usize = val.parse().expect("VPMM_PAGES must be a valid number"); - pages - } - Err(_) => 0, - }; - if let Err(e) = pool.defragment_or_create_new_pages( - initial_pages * pool.page_size, - current_stream_id().unwrap(), - ) { - // Check how much memory is available - let mut free_mem = 0usize; - let mut total_mem = 0usize; - unsafe { - cudaMemGetInfo(&mut free_mem, &mut total_mem); - } - panic!( - "Error:{:?}\nPool: pages={}, page_size={}\nMemory: free_mem={}, total_mem={}", - e, initial_pages, pool.page_size, free_mem, total_mem - ); + write!(f, "Regions: ")?; + for (_, size, label) in regions.iter() { + write!(f, "[{} {}]", label, ByteSize::b(*size as u64))?; } - pool + Ok(()) } } - -unsafe extern "C" fn pending_events_destroy_callback(data: *mut c_void) { - let boxed = - Box::from_raw(data as *mut (usize, Arc>>>>)); - let (key, pending) = *boxed; - pending.lock().unwrap().remove(&key); -} diff --git a/crates/cuda-common/src/stream.rs b/crates/cuda-common/src/stream.rs index bd542ff5..6eba0eaa 100644 --- a/crates/cuda-common/src/stream.rs +++ b/crates/cuda-common/src/stream.rs @@ -150,6 +150,10 @@ impl CudaEvent { check(unsafe { cudaEventRecord(self.event, cudaStreamPerThread) }) } + pub fn synchronize(&self) -> Result<(), CudaError> { + check(unsafe { cudaEventSynchronize(self.event) }) + } + /// # Safety /// The caller must ensure that `stream` is a valid stream. pub unsafe fn record_and_wait(&self, stream: cudaStream_t) -> Result<(), CudaError> { diff --git a/docs/vpmm_spec.md b/docs/vpmm_spec.md index 95fc0f72..4acd0905 100644 --- a/docs/vpmm_spec.md +++ b/docs/vpmm_spec.md @@ -23,25 +23,23 @@ The **Virtual Memory Pool Memory Manager (VPMM)** replaces this with a design ba ## Concepts & Notation -* **Virtual address (VA) space** — Per‑process range of GPU‑visible addresses. Reserved VA has no storage until mapped. -* **Active space** — The prefixed portion of the reserved VA that currently has mappings; grows as needed. -* **Page** — Fixed‑size chunk of physical GPU memory, the basic mapping unit. A page may be mapped at multiple VAs. -* **Region** — Consecutive VA range with a single state: - - * `[+X]` allocated region returned by the **current** `malloc` call - * `[X]` allocated region from a **previous** call - * `[-X]` region marked as **free** - * `[*X]` region marked as **dead** (free but already remapped) - * `[#X]` region marked as **new** (free but created by current allocation) +* **Virtual address (VA) space** — Per‑process GPU-visible address range. Reserving VA does **not** allocate physical memory. +* **Page** — Fixed-size physical GPU chunk (≥ CUDA VMM granularity). All mappings and allocations are rounded up to page size. +* **Region** — Consecutive VA range tracked with symbolic states: + * `[+X]` – current allocation returned by `malloc` + * `[X]` – previously allocated user region + * `[-X]` – mapped free region with an associated stream/event + * `[*X]` – **unmapped** region (hole created after remapping) + * `[#X]` – brand new region created while satisfying the current allocation ## High‑Level Design -1. **Reserve** a sufficiently large VA range once at initialization. -2. **Pre‑allocate** a configurable number of physical pages. -3. **Map** pages contiguously at the **end of active space** for new allocations. -4. **Find‑or‑defragment**: if no free region fits, **remap** pages from earlier freed regions to the end (no data copy, just page table updates). -5. **Cross-stream synchronization**: Use CUDA events to track when freed memory becomes safe to reuse across streams. -6. **Grow**: if still insufficient, **allocate more pages** and map them at the end. +1. **Reserve** a large VA chunk once (size configurable via `VPMM_VA_SIZE`). Additional chunks are reserved on demand. +2. **Track** three disjoint maps: `malloc_regions`, `free_regions` (with CUDA events/stream ids), and `unmapped_regions` (holes). +3. **Allocate** by finding the best-fit free region in the current stream; otherwise reuse a completed region; otherwise wait on the oldest event. +4. **Defragment** only when necessary: grab a hole, detach just enough pages from the oldest free regions (split + reinsert leftovers), unmap their old VA, and remap into the hole. +5. **Grow** by allocating the shortfall in physical pages and mapping them into the same hole when free regions still aren’t enough. +6. **Observe** all activity via maps (no implicit “active end”). Debug output lists every region in address order. Small allocations (< page size) bypass the pool and use `cudaMallocAsync` for simplicity and backward compatibility. @@ -106,47 +104,53 @@ Similar to Case C, except `+4` in step 4 cannot fit the third region, so layout // cuda-common/src/memory_manager/vm_pool.rs struct FreeRegion { size: usize, - event: CudaEvent, // Event marking when this region was freed + event: Arc, // Event marking when this region was freed stream_id: CudaStreamId, // Stream that freed this region id: usize, // Creation order for temporal tracking } pub(super) struct VirtualMemoryPool { - device_id: i32, - root: CUdeviceptr, // Virtual address space root (shared across streams) - curr_end: CUdeviceptr, // Current end of active address space - pub(super) page_size: usize, // Mapping granularity (multiple of 2MB) - - // Map for all active pages (last mapping only; keys and values are unique) + roots: Vec, // Every reserved VA chunk active_pages: HashMap, - - // Free regions in virtual space (sorted by address; non-adjacent) free_regions: BTreeMap, - - // Number of free calls (used to assign ids to free regions) + malloc_regions: HashMap, + unmapped_regions: BTreeMap, free_num: usize, - - // Pending events to destroy after "wait for event" is finished - pending_events: Arc>>>>, - - // Active allocations - used_regions: HashMap, + pub(super) page_size: usize, + va_size: usize, + device_id: i32, } ``` **Invariants** -* `root` is returned by VA reservation and satisfies: `root ≤ key < curr_end` for all keys in the collections. -* `active_pages` tracks the **current** mapping address for each page. -* `free_regions` cover only the **active** VA range and are always coalesced (neighbors merged on free). -* Each `FreeRegion` tracks which stream freed it and records a CUDA event for cross-stream synchronization. +* Every `roots` entry corresponds to a reserved VA chunk of size `va_size`. We only map/unmap within these chunks. +* `active_pages` tracks the current virtual address for every mapped page; keys move when we remap. +* `free_regions`, `malloc_regions`, and `unmapped_regions` partition the reserved VA space with no overlap; `free_regions` are coalesced by stream/event when possible. +* Each `FreeRegion` retains the CUDA event recorded at free time plus the originating `CudaStreamId`; we block on this event before stealing the region from another stream. ## Initialization -* **VA Reservation:** Reserve a large VA range at startup. **Current value:** 32 TB (constant). -* **Page Size:** Configurable via `VPMM_PAGE_SIZE`. Must be a multiple of CUDA’s minimum allocation granularity (typically 2 MB). Larger pages reduce mapping overhead; all allocations are rounded up to page size. -* **Initial Pages:** Count is configurable via `VPMM_PAGES`. Defaults to 0 (allocate on-demand). More pages improve performance but increase baseline memory footprint. -* **Mapping Unit:** All mappings are performed **by page**. +* **VA Reservation:** Reserve a `VPMM_VA_SIZE` chunk (default 8 TB) at startup. When every hole is consumed, reserve another chunk and append it to `roots`. +* **Page Size:** Configurable via `VPMM_PAGE_SIZE` (≥ CUDA’s VMM granularity, typically 2 MB). All requests are rounded up to this size. +* **Initial Pages:** `VPMM_PAGES` controls how many pages are eagerly mapped. Defaults to 0 (purely on-demand). +* **Mapping Unit:** Always page-sized; the pool never subdivides a page. + +## Recommended Configuration + +**For best performance**, preallocate ~80% of available GPU memory to avoid runtime allocations: + +```bash +# Example: 40 GB GPU → preallocate 32 GB (80%) +export VPMM_PAGES=$(((32 << 30) / (2 << 20))) # 16384 pages at 2 MB each +``` + +**To disable VPMM** and fall back to `cudaMallocAsync`, use a page size larger than any expected allocation: + +```bash +export VPMM_PAGE_SIZE=$((32 << 30)) # 32 GB page size +export VPMM_PAGES=0 +``` ## Allocation & Growth @@ -159,51 +163,55 @@ Allocations occur during initialization (preallocation) and on‑demand when the ## Defragmentation -Triggered when no free region can satisfy a request. Defragmentation proceeds in phases to minimize cross-stream synchronization: +Triggered when Phase 1 fails. The new defragmentation flow is deterministic and bounded: -* **Phase 2a - Current stream (latest first):** Remap free regions from the requesting stream, prioritizing latest freed regions (newest events) to minimize wasted work. -* **Phase 2b - Wait on other streams (oldest first):** Wait for the oldest events from other streams first (most likely to have completed) and remap those regions. Uses `cudaStreamWaitEvent` to introduce cross-stream dependencies, with `cudaLaunchHostFunc` to manage event lifetimes asynchronously. -* **Phase 2c - Allocate new pages:** If still insufficient after waiting on all available regions, allocate new physical pages. +1. **Take a hole** — `take_unmapped_region` returns the smallest unmapped interval ≥ `requested`. If none exists, we reserve another VA chunk and carve the hole out of it. +2. **Allocate shortfall** — compute the page shortfall (`requested - free_bytes`). Allocate that many physical pages and map them into the beginning of the hole. +3. **Harvest oldest frees** — iterate `free_regions` ordered by `free_id`. For each region: + * Block on its CUDA event via `CudaEvent::synchronize()`. + * Detach only the portion we need (`take = min(remaining, region.size)`), unmap it, and push `(addr, take)` to a work list. + * Reinsert the leftover tail (if any) back into `free_regions` with the original stream but a new event/id. + Stop once the total `take` covers the remainder of the request. +4. **Remap into the hole** — unmap each harvested chunk, add its VA back to `unmapped_regions`, and remap the chunk (page by page) contiguously into the hole immediately after the newly allocated portion. The combined span becomes the new free region for the requesting stream. -* **Remapping:** Select free regions and **remap their pages** to the end of active space. The original region becomes a **dead zone** and is not reused. -* **Event tracking:** Each free region records a CUDA event. Before reusing memory from another stream, VPMM either checks event completion or waits. -* **Event lifetime management:** Events are wrapped in `Arc` for shared ownership. When waiting on events from other streams, VPMM clones the Arc and stores it in `pending_events` (keyed by `free_num`). After issuing `cudaStreamWaitEvent`, it schedules a `cudaLaunchHostFunc` callback that removes the Arc clones once the stream has processed the wait. This ensures events are not destroyed while GPU streams are waiting on them, avoiding segmentation faults. -* **Access permissions:** After remapping, `cuMemSetAccess` is called to grant access to the new VA range. ## malloc(size) — Allocation Policy **Phase 1: Zero-cost attempts** -1. **Find on current stream:** Search `free_regions` for a region from the current stream large enough to satisfy the request (**BestFit**: smallest that fits). -2. **Find completed from other streams:** Search for a completed region (event already done) from other streams. +1. **Best fit on current stream** — smallest region from the caller's stream that fits `requested`. +2. **Oldest from other streams** — take the region with the lowest `free_id`, call `CudaEvent::synchronize()`, and hand that region to the caller. This avoids full defrag if one region can satisfy the request. -**Phase 2: Defragmentation (if Phase 1 fails)** -3. **Defragment current stream:** Remap current stream's free regions (newest first) until enough space. -4. **Wait on other streams:** Issue `cudaStreamWaitEvent` on events from other streams (oldest first, most likely completed). Use `cudaLaunchHostFunc` to schedule async cleanup of event references. -5. **Grow:** Allocate new pages and map them after `curr_end`. +**Phase 2: Hole-based defragmentation** +4. **Reserve hole** — via `take_unmapped_region`. +5. **Allocate shortfall** — map the missing number of pages into the hole. +6. **Harvest oldest frees** — synchronously detach just enough pages from free regions (splitting leftovers). +7. **Remap into hole** — unmap the detached slices, add their old VA back to `unmapped_regions`, and remap them contiguously into the hole. The combined span (new pages + remapped slices) becomes the new free region for the caller. Additional rules: * **Alignment:** All allocations are **rounded up** to page‑size multiples. A page is either entirely free or entirely used. -* **Small Buffers:** If `size < page_size`, bypass the VM pool and call `cudaMallocAsync` instead (preserves compatibility; setting `VMM_PAGE_SIZE = usize::MAX` effectively disables the pool for typical sizes). +* **Small Buffers:** If `size < page_size`, bypass the VM pool and call `cudaMallocAsync` instead (preserves compatibility; setting `VPMM_PAGE_SIZE = usize::MAX` effectively disables the pool for typical sizes). ## free(ptr) -* Look up `ptr` in `used_regions` to obtain the region size. -* Record a CUDA event on the current stream to mark when this memory becomes safe to reuse. -* Mark the region free in `free_regions` with the event and stream ID, and **coalesce** with adjacent free neighbors from the **same stream** (keeping the newest event). +* Look up `ptr` in `malloc_regions` to obtain the aligned size. +* Record a CUDA event on the calling stream (always `cudaStreamPerThread`) and store the stream id/event pair in `free_regions`. +* Attempt to coalesce with adjacent regions from the same stream that have matching completion guarantees. +* Remove the entry from `malloc_regions`. ## Status & Observability -(All tracking is implemented in the outer **MemoryManager**, not inside `VirtualMemoryPool`, since small buffers bypass the pool.) +(All tracking is implemented in the outer **MemoryManager**, but the pool exposes enough state for debug dumps.) -* **Total GPU memory used by pages:** `pool.active_pages.len() * pool.page_size` -* **Active VA extent:** `pool.curr_end - pool.base` -* **Currently allocated (live) bytes:** `sum(pool.used_regions.values())` -* **Currently freed (reusable) bytes:** `sum(pool.free_regions.values().map(|r| r.size))` +* **Total GPU memory mapped:** `pool.active_pages.len() * pool.page_size` +* **Reserved VA:** `pool.roots.len() * pool.va_size` +* **Currently allocated (live) bytes:** `sum(pool.malloc_regions.values())` +* **Currently reusable bytes:** `sum(pool.free_regions.values().map(|r| r.size))` +* **Holes:** `sum(pool.unmapped_regions.values())` +* `Debug` output prints the metrics above plus every region in ascending VA order. ## Asynchrony & Streams * VPMM supports **multi-stream workloads** using `cudaStreamPerThread`. -* A single shared `VirtualMemoryPool` serves all streams, using CUDA events to track cross-stream dependencies. -* Memory freed by one stream can be reused by another stream with minimal synchronization overhead (opportunistic reuse of completed regions, forced async waits only if needed). -* **Access permissions:** Set via `cuMemSetAccess` after remapping to ensure memory is accessible on the current device. \ No newline at end of file +* A single shared `VirtualMemoryPool` serves all streams. Each free region carries the stream id plus a CUDA event (wrapped in `Arc`). When reusing a region from another stream, we call `event.synchronize()` before detaching it. Events are dropped when the region is consumed or coalesced. +* **Access permissions:** After remapping (or mapping newly allocated pages) we call `cuMemSetAccess` on the destination hole to ensure the caller's device has read/write permission. \ No newline at end of file