From f27066c98e80ccf83fffa64d07c06e97c77e724c Mon Sep 17 00:00:00 2001 From: "Dongliang Wu (from Dev Box)" Date: Wed, 29 Apr 2026 14:12:26 -0700 Subject: [PATCH 1/2] Correct VertexProvider / VertexProviderFactory doc comments Drop references to ANNWrapper and JetVertexProvider (neither exist), fix typo GraphProvider -> VertexProvider, fix wrong type parameter (Data, not GraphMetadata) and wrong create_vertex_provider signature description, and convert four `//` comments that should have been `///` doc comments. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/search/provider/disk_provider.rs | 1 + .../src/search/traits/vertex_provider.rs | 16 ++++++++-------- .../src/search/traits/vertex_provider_factory.rs | 16 ++++++++-------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/diskann-disk/src/search/provider/disk_provider.rs b/diskann-disk/src/search/provider/disk_provider.rs index b7b30e94a..635b5cdc1 100644 --- a/diskann-disk/src/search/provider/disk_provider.rs +++ b/diskann-disk/src/search/provider/disk_provider.rs @@ -653,6 +653,7 @@ where query, }) } + fn ensure_loaded(&mut self, ids: &[u32]) -> Result<(), ANNError> { if ids.is_empty() { return Ok(()); diff --git a/diskann-disk/src/search/traits/vertex_provider.rs b/diskann-disk/src/search/traits/vertex_provider.rs index b96f65351..9ebd31a7b 100644 --- a/diskann-disk/src/search/traits/vertex_provider.rs +++ b/diskann-disk/src/search/traits/vertex_provider.rs @@ -8,7 +8,7 @@ use diskann::ANNResult; /// `VertexProvider` is a trait that abstracts the access to Vertex data. /// -/// This trait provides an interface to interact with different types of vertex providers structures such as `DiskVertexProvider` and `JetVertexProvider`. +/// This trait provides an interface to interact with vertex provider structures such as `DiskVertexProvider`. /// /// # Types /// @@ -47,7 +47,7 @@ pub trait VertexProvider: Send + Sync { vertex_id: &Data::VectorIdType, ) -> ANNResult<&[Data::VectorIdType]>; - // Gets the associated data for a given vertex id. + /// Gets the associated data for a given vertex id. /// /// The `get_associated_data` function attempts to retrieve the associated data for the /// specified `vertex_id`. This function returns an `ANNResult` that wraps a reference to the associated data. @@ -80,11 +80,11 @@ pub trait VertexProvider: Send + Sync { /// If it fails, returns an `ANNError`. fn load_vertices(&mut self, vertex_ids: &[Data::VectorIdType]) -> ANNResult<()>; - /// This function to process the loaded node + /// Processes a vertex previously loaded by `load_vertices`, materializing its full-precision vector and adjacency list from the raw on-disk record so they can be served by the `get_*` accessors. /// # Parameters /// - /// * `vertex_id`: A Data::VectorIdType value representing the id of the vertex for which to process. - /// * `idx`: A usize value representing the index of the vertex in the loaded node list. + /// * `vertex_id`: A Data::VectorIdType value representing the id of the vertex to process. + /// * `idx`: A usize value representing the index of the vertex in the slice passed to the preceding `load_vertices` call. /// /// # Returns /// * `ANNResult<()>`: If the operation is successful, returns Ok. @@ -92,12 +92,12 @@ pub trait VertexProvider: Send + Sync { /// If it fails, returns an `ANNError`. fn process_loaded_node(&mut self, vertex_id: &Data::VectorIdType, idx: usize) -> ANNResult<()>; - // Returns the number of IO operations performed by the vertex provider. + /// Returns the number of IO operations performed by the vertex provider. fn io_operations(&self) -> u32; - // Returns the number of vertices loaded by the vertex provider. + /// Returns the number of vertices loaded by the vertex provider. fn vertices_loaded_count(&self) -> u32; - // Clears the members of the vertex provider. + /// Clears the members of the vertex provider. fn clear(&mut self); } diff --git a/diskann-disk/src/search/traits/vertex_provider_factory.rs b/diskann-disk/src/search/traits/vertex_provider_factory.rs index 43da64da3..9ab660dbb 100644 --- a/diskann-disk/src/search/traits/vertex_provider_factory.rs +++ b/diskann-disk/src/search/traits/vertex_provider_factory.rs @@ -9,20 +9,20 @@ use diskann::ANNResult; use super::VertexProvider; use crate::data_model::GraphHeader; -/// The `VertexProviderFactory` trait provides an interface to create a GraphProvider`. This trait forms an important part -/// of the interaction between the `ANNWrapper` and the creation of `DiskIndexSearcher`. The `ANNWrapper` passes a `VertexProviderFactory` when -/// it initializes a `DiskIndexSearcher` When serving each search request, the `DiskIndexSearcher` opens a `VertexProvider` -/// using the provided `VertexProviderFactory`. There will be two flavors of VertexProviderFactory, one that reads vertex data from data another that reads vertex data from a stream. +/// The `VertexProviderFactory` trait provides an interface to create a `VertexProvider`. This trait forms an important part +/// of the interaction between the `DiskIndexSearcher` and a `VertexProvider`. A `VertexProviderFactory` is passed to `DiskIndexSearcher` when +/// it is constructed. When serving each search request, the `DiskIndexSearcher` opens a `VertexProvider` +/// using the provided `VertexProviderFactory`. /// /// This trait has an associated VertexProvider type that signifies the specific type of VertexProvider which this `VertexProviderFactory` will create. /// /// # Parameters -/// * `GraphMetadata`: This contains the metadata of the disk index graph, like the number of points, dimension, max_node_length, etc. +/// * `Data`: A `GraphDataType` that defines the vector element type, id type, and associated payload type for the graph. /// /// # Functions -/// * `create_vertex_provider`: This function takes a `Metadata` object as an argument and returns a `VertexProvider` object. It also accepts a max batch read sizes which is -/// used to control the maximum number of nodes it can get in one batch. -/// * `get_header`: This function returns the metadata of the graph. +/// * `create_vertex_provider`: This function takes a `GraphHeader` reference and a max batch size and returns a `VertexProvider` object. +/// The max batch size controls the maximum number of nodes that can be loaded in a single batch. +/// * `get_header`: This function returns the header of the graph. pub trait VertexProviderFactory: Send + Sync { type VertexProviderType: VertexProvider; From 37fdf008c349420a5e1005fbf3fe17bc4bd9a7b1 Mon Sep 17 00:00:00 2001 From: "Dongliang Wu (from Dev Box)" Date: Wed, 29 Apr 2026 14:19:34 -0700 Subject: [PATCH 2/2] Encode AlignedFileReader memory alignment in the type system Decouple buffer-placement memory alignment from the disk-side stride. DiskSectorGraph previously allocated `sectors_data` aligned to `block_size` (typically 4096), conflating the on-disk stride with the reader's hardware/OS memory-placement requirement. Introduce a sealed `Alignment` trait with `const VALUE: PowerOfTwo` and two unit markers (`A1`, `A512`), and parameterize `AlignedRead` by an `A: Alignment` witness (defaulting to `A1`). `AlignedRead::new` checks buffer pointer, byte-length, and disk-offset alignment against `A::VALUE` at construction and returns `ANNResult`, so a typed `AlignedRead` is itself a witness that the request is well-formed -- the file reader's `read` impl no longer needs to re-validate. Replace `AlignedFileReader::DISK_IO_ALIGNMENT` (and the hardcoded 512 in `AlignedRead::new`) with `AlignedFileReader::Alignment`. Linux and Windows direct-I/O readers set `type Alignment = A512;`; the buffered `StorageProviderAlignedFileReader` uses `A1`. `DiskSectorGraph::sectors_data` is now allocated using `::VALUE` -- the reader's contract -- rather than `block_size`. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../benchmarks/aligned_file_reader_bench.rs | 2 +- .../aligned_file_reader_bench_iai.rs | 2 +- .../src/search/provider/disk_sector_graph.rs | 18 +-- .../utils/aligned_file_reader/aligned_read.rs | 152 +++++++++++------- .../linux_aligned_file_reader.rs | 20 ++- .../src/utils/aligned_file_reader/mod.rs | 2 +- .../storage_provider_aligned_file_reader.rs | 6 +- .../traits/aligned_file_reader.rs | 16 +- .../windows_aligned_file_reader.rs | 17 +- 9 files changed, 148 insertions(+), 87 deletions(-) diff --git a/diskann-disk/benches/benchmarks/aligned_file_reader_bench.rs b/diskann-disk/benches/benchmarks/aligned_file_reader_bench.rs index e8099d40d..0e166bc4f 100644 --- a/diskann-disk/benches/benchmarks/aligned_file_reader_bench.rs +++ b/diskann-disk/benches/benchmarks/aligned_file_reader_bench.rs @@ -40,7 +40,7 @@ pub fn benchmark_aligned_file_reader(c: &mut Criterion) { let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect(); // Read the same data from disk over and over again. We guarantee that it is not all zeros. - let mut aligned_reads: Vec> = mem_slices + let mut aligned_reads: Vec> = mem_slices .iter_mut() .map(|slice| AlignedRead::new(0, slice).unwrap()) .collect(); diff --git a/diskann-disk/benches/benchmarks_iai/aligned_file_reader_bench_iai.rs b/diskann-disk/benches/benchmarks_iai/aligned_file_reader_bench_iai.rs index 8c9c1f880..215597684 100644 --- a/diskann-disk/benches/benchmarks_iai/aligned_file_reader_bench_iai.rs +++ b/diskann-disk/benches/benchmarks_iai/aligned_file_reader_bench_iai.rs @@ -31,7 +31,7 @@ pub fn benchmark_aligned_file_reader_iai() { let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect(); // Read the same data from disk over and over again. We guarantee that it is not all zeros. - let mut aligned_reads: Vec> = mem_slices + let mut aligned_reads: Vec> = mem_slices .iter_mut() .map(|slice| AlignedRead::new(0, slice).unwrap()) .collect(); diff --git a/diskann-disk/src/search/provider/disk_sector_graph.rs b/diskann-disk/src/search/provider/disk_sector_graph.rs index def525712..117b492e5 100644 --- a/diskann-disk/src/search/provider/disk_sector_graph.rs +++ b/diskann-disk/src/search/provider/disk_sector_graph.rs @@ -8,14 +8,11 @@ use std::ops::Deref; use diskann::{ANNError, ANNResult}; -use diskann_quantization::{ - alloc::{AlignedAllocator, Poly}, - num::PowerOfTwo, -}; +use diskann_quantization::alloc::{AlignedAllocator, Poly}; use crate::{ data_model::GraphHeader, - utils::aligned_file_reader::{traits::AlignedFileReader, AlignedRead}, + utils::aligned_file_reader::{traits::AlignedFileReader, AlignedRead, Alignment}, }; const DEFAULT_DISK_SECTOR_LEN: usize = 4096; @@ -79,9 +76,7 @@ impl DiskSectorGraph { sectors_data: Poly::broadcast( 0u8, max_n_batch_sector_read * num_sectors_per_node * block_size, - AlignedAllocator::new( - PowerOfTwo::new(block_size).map_err(ANNError::log_index_error)?, - ), + AlignedAllocator::new(AlignedReaderType::Alignment::VALUE), ) .map_err(ANNError::log_index_error)?, cur_sector_idx: 0, @@ -100,9 +95,7 @@ impl DiskSectorGraph { self.sectors_data = Poly::broadcast( 0u8, max_n_batch_sector_read * self.num_sectors_per_node * self.block_size, - AlignedAllocator::new( - PowerOfTwo::new(self.block_size).map_err(ANNError::log_index_error)?, - ), + AlignedAllocator::new(AlignedReaderType::Alignment::VALUE), ) .map_err(ANNError::log_index_error)?; } @@ -143,7 +136,8 @@ impl DiskSectorGraph { ); let mut sector_slices: Vec<&mut [u8]> = self.sectors_data[range].chunks_mut(len_per_node).collect(); - let mut read_requests = Vec::with_capacity(sector_slices.len()); + let mut read_requests: Vec> = + Vec::with_capacity(sector_slices.len()); for (local_sector_idx, slice) in sector_slices.iter_mut().enumerate() { let sector_id = sectors_to_fetch[local_sector_idx]; read_requests.push(AlignedRead::new(sector_id * self.block_size as u64, slice)?); diff --git a/diskann-disk/src/utils/aligned_file_reader/aligned_read.rs b/diskann-disk/src/utils/aligned_file_reader/aligned_read.rs index 782a8d696..8261d2c26 100644 --- a/diskann-disk/src/utils/aligned_file_reader/aligned_read.rs +++ b/diskann-disk/src/utils/aligned_file_reader/aligned_read.rs @@ -3,44 +3,78 @@ * Licensed under the MIT license. */ +use std::marker::PhantomData; + use diskann::{ANNError, ANNResult}; +use diskann_quantization::num::PowerOfTwo; + +/// Type-level memory-alignment witness for [`AlignedRead`]. Each implementor is +/// a unit type carrying a single `PowerOfTwo` value. +/// +/// Custom readers can define their own marker (e.g. `A4096`) by adding a unit +/// type and an `impl Alignment` with the desired `VALUE`. +pub trait Alignment { + /// The alignment, in bytes. + const VALUE: PowerOfTwo; +} -pub const DISK_IO_ALIGNMENT: usize = 512; +macro_rules! alignment_marker { + ($name:ident, $value:expr) => { + #[doc = concat!("Alignment witness for ", stringify!($value), " bytes.")] + #[derive(Debug, Clone, Copy)] + pub struct $name; + impl Alignment for $name { + const VALUE: PowerOfTwo = $value; + } + }; +} -/// Aligned read struct for disk IO. -pub struct AlignedRead<'a, T> { - /// where to read from - /// offset needs to be aligned with DISK_IO_ALIGNMENT +alignment_marker!(A1, PowerOfTwo::V1); +alignment_marker!(A512, PowerOfTwo::V512); + +/// Disk-IO read request, parameterized by its required memory alignment `A`. +/// +/// Three constraints govern a read: +/// 1. Disk offset alignment. +/// 2. Buffer length alignment. +/// 3. Buffer pointer alignment in memory. +/// +/// All three are checked against `A::VALUE` at construction time by +/// [`AlignedRead::new`]. A typed `AlignedRead` is therefore a witness +/// that the request satisfies its declared alignment, and the file reader's +/// `read` method can rely on it without re-checking. +#[derive(Debug)] +pub struct AlignedRead<'a, T, A: Alignment = A1> { offset: u64, - - /// where to read into - /// aligned_buf and its len need to be aligned with DISK_IO_ALIGNMENT aligned_buf: &'a mut [T], + _alignment: PhantomData, } -impl<'a, T> AlignedRead<'a, T> { +impl<'a, T, A: Alignment> AlignedRead<'a, T, A> { + /// Build an `AlignedRead` after validating that `offset`, the buffer + /// length (in bytes), and the buffer pointer all satisfy `A::VALUE`. pub fn new(offset: u64, aligned_buf: &'a mut [T]) -> ANNResult { - Self::assert_is_aligned(offset as usize)?; - Self::assert_is_aligned(std::mem::size_of_val(aligned_buf))?; - + Self::assert_is_aligned(aligned_buf.as_ptr() as usize, "buffer pointer")?; + Self::assert_is_aligned(std::mem::size_of_val(aligned_buf), "buffer length")?; + Self::assert_is_aligned(offset as usize, "offset")?; Ok(Self { offset, aligned_buf, + _alignment: PhantomData, }) } - fn assert_is_aligned(val: usize) -> ANNResult<()> { - match val % DISK_IO_ALIGNMENT { - 0 => Ok(()), - _ => Err(ANNError::log_disk_io_request_alignment_error(format!( - "The offset or length of AlignedRead request is not {} bytes aligned", - DISK_IO_ALIGNMENT - ))), + fn assert_is_aligned(val: usize, kind: &str) -> ANNResult<()> { + let align = A::VALUE.raw(); + if val.is_multiple_of(align) { + Ok(()) + } else { + Err(ANNError::log_disk_io_request_alignment_error(format!( + "{kind} {val} not aligned to {align}", + ))) } } - /// where to read from - /// offset needs to be aligned with DISK_IO_ALIGNMENT pub fn offset(&self) -> u64 { self.offset } @@ -49,8 +83,6 @@ impl<'a, T> AlignedRead<'a, T> { self.aligned_buf } - /// where to read into - /// aligned_buf and its len need to be aligned with DISK_IO_ALIGNMENT pub fn aligned_buf_mut(&mut self) -> &mut [T] { self.aligned_buf } @@ -59,54 +91,66 @@ impl<'a, T> AlignedRead<'a, T> { #[cfg(test)] mod tests { use super::*; + use diskann::ANNErrorKind; + use diskann_quantization::alloc::{AlignedAllocator, Poly}; + + fn aligned_512(len: usize) -> Poly<[u8], AlignedAllocator> { + Poly::broadcast(0u8, len, AlignedAllocator::A512).unwrap() + } #[test] - fn test_aligned_read_valid() { + fn aligned_read_carries_offset_and_buffer() { let mut buffer = vec![0u8; 512]; - let aligned_read = AlignedRead::new(0, &mut buffer); - - assert!(aligned_read.is_ok()); - let aligned_read = aligned_read.unwrap(); - assert_eq!(aligned_read.offset(), 0); - assert_eq!(aligned_read.aligned_buf().len(), 512); + let read = AlignedRead::::new(512, &mut buffer).unwrap(); + assert_eq!(read.offset(), 512); + assert_eq!(read.aligned_buf().len(), 512); } #[test] - fn test_aligned_read_valid_offset() { - let mut buffer = vec![0u8; 1024]; - let aligned_read = AlignedRead::new(512, &mut buffer); - - assert!(aligned_read.is_ok()); - let aligned_read = aligned_read.unwrap(); - assert_eq!(aligned_read.offset(), 512); + fn aligned_read_buffer_access() { + let mut buffer = vec![42u8; 512]; + let mut read = AlignedRead::::new(0, &mut buffer).unwrap(); + assert_eq!(read.aligned_buf()[0], 42); + read.aligned_buf_mut()[0] = 100; + assert_eq!(read.aligned_buf()[0], 100); } #[test] - fn test_aligned_read_invalid_offset() { - let mut buffer = vec![0u8; 512]; - let aligned_read = AlignedRead::new(100, &mut buffer); - - assert!(aligned_read.is_err()); + fn a512_accepts_fully_aligned_request() { + let mut buf = aligned_512(512); + AlignedRead::::new(0, &mut buf).expect("aligned request should pass"); } #[test] - fn test_aligned_read_invalid_buffer_size() { + fn a1_default_accepts_anything() { let mut buffer = vec![0u8; 100]; - let aligned_read = AlignedRead::new(0, &mut buffer); - - assert!(aligned_read.is_err()); + AlignedRead::::new(1, &mut buffer).expect("A1 alignment should accept any request"); } #[test] - fn test_aligned_read_buffer_access() { - let mut buffer = vec![42u8; 512]; - let mut aligned_read = AlignedRead::new(0, &mut buffer).unwrap(); + fn rejects_unaligned_buffer_pointer() { + let mut buf = aligned_512(1024); + let slice = &mut buf[1..513]; // ptr offset by 1; length 512 ✓; offset 0 ✓ + let err = AlignedRead::::new(0, slice) + .expect_err("misaligned buffer pointer should be rejected"); + assert_eq!(err.kind(), ANNErrorKind::DiskIOAlignmentError); + } - // Test immutable access - assert_eq!(aligned_read.aligned_buf()[0], 42); + #[test] + fn rejects_unaligned_buffer_length() { + let mut buf = aligned_512(1024); + let slice = &mut buf[..100]; // ptr ✓; length 100 ✗; offset 0 ✓ + let err = AlignedRead::::new(0, slice) + .expect_err("buffer length 100 (not a multiple of 512) should be rejected"); + assert_eq!(err.kind(), ANNErrorKind::DiskIOAlignmentError); + } - // Test mutable access - aligned_read.aligned_buf_mut()[0] = 100; - assert_eq!(aligned_read.aligned_buf()[0], 100); + #[test] + fn rejects_unaligned_offset() { + let mut buf = aligned_512(1024); + let slice = &mut buf[..512]; // ptr ✓; length 512 ✓; offset 1 ✗ + let err = AlignedRead::::new(1, slice) + .expect_err("offset 1 (not a multiple of 512) should be rejected"); + assert_eq!(err.kind(), ANNErrorKind::DiskIOAlignmentError); } } diff --git a/diskann-disk/src/utils/aligned_file_reader/linux_aligned_file_reader.rs b/diskann-disk/src/utils/aligned_file_reader/linux_aligned_file_reader.rs index b9c649b7c..9ebfb85dc 100644 --- a/diskann-disk/src/utils/aligned_file_reader/linux_aligned_file_reader.rs +++ b/diskann-disk/src/utils/aligned_file_reader/linux_aligned_file_reader.rs @@ -12,7 +12,7 @@ use diskann_platform::ssd_io_context::IOContext; use io_uring::IoUring; use libc; -use crate::utils::aligned_file_reader::{traits::AlignedFileReader, AlignedRead}; +use crate::utils::aligned_file_reader::{traits::AlignedFileReader, AlignedRead, A512}; pub const MAX_IO_CONCURRENCY: usize = 128; @@ -54,7 +54,7 @@ impl LinuxAlignedFileReader { } fn submit_aligned_read( - aligned_read: &mut AlignedRead, + aligned_read: &mut AlignedRead, ring: &mut IoUring, identifier: u64, ) -> Result<(), ANNError> { @@ -86,8 +86,12 @@ impl LinuxAlignedFileReader { } impl AlignedFileReader for LinuxAlignedFileReader { + /// O_DIRECT requires the buffer pointer to be aligned to the device sector + /// size in memory (512 bytes on typical Linux block devices). + type Alignment = A512; + // Read the data from the file by sending concurrent io requests in batches. - fn read(&mut self, read_requests: &mut [AlignedRead]) -> ANNResult<()> { + fn read(&mut self, read_requests: &mut [AlignedRead]) -> ANNResult<()> { let n_requests = read_requests.len(); let n_batches = n_requests.div_ceil(MAX_IO_CONCURRENCY); @@ -176,7 +180,7 @@ mod tests { // create and add AlignedReads to the vector let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect(); - let mut aligned_reads: Vec> = mem_slices + let mut aligned_reads: Vec> = mem_slices .iter_mut() .enumerate() .map(|(i, slice)| { @@ -221,7 +225,7 @@ mod tests { let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect(); // Read the same data from disk over and over again. We guarantee that it is not all zeros. - let mut aligned_reads: Vec> = mem_slices + let mut aligned_reads: Vec> = mem_slices .iter_mut() .map(|slice| AlignedRead::new(0, slice).unwrap()) .collect(); @@ -240,7 +244,7 @@ mod tests { } /// Return True if the AlignedRead value is empty or False if the AlignedRead value is not empty. - fn aligned_read_buffer_is_empty(read: &AlignedRead<'_, u8>) -> bool { + fn aligned_read_buffer_is_empty(read: &AlignedRead<'_, u8, A512>) -> bool { let max_value = read.aligned_buf().iter().fold(0, |acc, &x| max(acc, x)); // If max_value is zero then this aligned read was not completed. Data was not @@ -260,7 +264,7 @@ mod tests { // Each slice will be used as the buffer for a read request of a sector. let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect(); - let mut aligned_reads: Vec> = mem_slices + let mut aligned_reads: Vec> = mem_slices .iter_mut() .enumerate() .map(|(sector_id, slice)| { @@ -355,7 +359,7 @@ mod tests { fn test_read_no_requests() { let mut reader = LinuxAlignedFileReader::new(TEST_INDEX_PATH).unwrap(); - let mut read_requests = Vec::>::new(); + let mut read_requests = Vec::>::new(); let result = reader.read(&mut read_requests); assert!(result.is_ok()); } diff --git a/diskann-disk/src/utils/aligned_file_reader/mod.rs b/diskann-disk/src/utils/aligned_file_reader/mod.rs index 07e23dde7..dfaa5e44e 100644 --- a/diskann-disk/src/utils/aligned_file_reader/mod.rs +++ b/diskann-disk/src/utils/aligned_file_reader/mod.rs @@ -6,7 +6,7 @@ pub mod traits; mod aligned_read; -pub use aligned_read::AlignedRead; +pub use aligned_read::{AlignedRead, Alignment, A1, A512}; cfg_if::cfg_if! { if #[cfg(all(not(miri), target_os = "linux"))] { diff --git a/diskann-disk/src/utils/aligned_file_reader/storage_provider_aligned_file_reader.rs b/diskann-disk/src/utils/aligned_file_reader/storage_provider_aligned_file_reader.rs index 8bc6a7e54..631934a3f 100644 --- a/diskann-disk/src/utils/aligned_file_reader/storage_provider_aligned_file_reader.rs +++ b/diskann-disk/src/utils/aligned_file_reader/storage_provider_aligned_file_reader.rs @@ -10,7 +10,7 @@ use diskann_providers::storage::StorageReadProvider; use tracing::info; use super::traits::AlignedFileReader; -use crate::utils::aligned_file_reader::AlignedRead; +use crate::utils::aligned_file_reader::{AlignedRead, A1}; pub struct StorageProviderAlignedFileReader { data: Vec, @@ -34,7 +34,9 @@ impl StorageProviderAlignedFileReader { } impl AlignedFileReader for StorageProviderAlignedFileReader { - fn read(&mut self, read_requests: &mut [AlignedRead]) -> ANNResult<()> { + type Alignment = A1; + + fn read(&mut self, read_requests: &mut [AlignedRead]) -> ANNResult<()> { for read in read_requests { let offset = read.offset(); let len = read.aligned_buf().len(); diff --git a/diskann-disk/src/utils/aligned_file_reader/traits/aligned_file_reader.rs b/diskann-disk/src/utils/aligned_file_reader/traits/aligned_file_reader.rs index ea635ad7a..f8715e509 100644 --- a/diskann-disk/src/utils/aligned_file_reader/traits/aligned_file_reader.rs +++ b/diskann-disk/src/utils/aligned_file_reader/traits/aligned_file_reader.rs @@ -5,9 +5,21 @@ use diskann::ANNResult; -use crate::utils::aligned_file_reader::AlignedRead; +use crate::utils::aligned_file_reader::{aligned_read::Alignment, AlignedRead}; pub trait AlignedFileReader: Send + Sync { + /// Alignment requirement applied to every `AlignedRead` + /// passed to [`Self::read`]. All three constraints — buffer pointer, + /// buffer length, and disk offset — are checked against this value at + /// `AlignedRead::new` time. + /// + /// Direct-I/O readers (O_DIRECT, `FILE_FLAG_NO_BUFFERING`) set this to the + /// device sector size (typically 512 bytes); buffered readers use `A1`. + /// The caller is responsible for generating offsets and buffers that + /// satisfy this alignment — `DiskSectorGraph`, for example, requires its + /// `block_size` to be a multiple of `Self::Alignment::VALUE`. + type Alignment: Alignment; + /// Read the data from the file by sending concurrent io requests in batches. - fn read(&mut self, read_requests: &mut [AlignedRead]) -> ANNResult<()>; + fn read(&mut self, read_requests: &mut [AlignedRead]) -> ANNResult<()>; } diff --git a/diskann-disk/src/utils/aligned_file_reader/windows_aligned_file_reader.rs b/diskann-disk/src/utils/aligned_file_reader/windows_aligned_file_reader.rs index 468d8a1ff..543a06d48 100644 --- a/diskann-disk/src/utils/aligned_file_reader/windows_aligned_file_reader.rs +++ b/diskann-disk/src/utils/aligned_file_reader/windows_aligned_file_reader.rs @@ -11,7 +11,7 @@ use diskann_platform::{ }; use super::traits::AlignedFileReader; -use crate::utils::aligned_file_reader::AlignedRead; +use crate::utils::aligned_file_reader::{AlignedRead, A512}; pub const MAX_IO_CONCURRENCY: usize = 128; pub const IO_COMPLETION_TIMEOUT: DWORD = u32::MAX; // Infinite timeout. @@ -53,8 +53,13 @@ impl WindowsAlignedFileReader { } impl AlignedFileReader for WindowsAlignedFileReader { + /// Overlapped/`FILE_FLAG_NO_BUFFERING` I/O requires the buffer pointer to + /// be aligned to the device sector size in memory (512 bytes on typical + /// Windows volumes). + type Alignment = A512; + // Read the data from the file by sending concurrent io requests in batches. - fn read(&mut self, read_requests: &mut [AlignedRead]) -> ANNResult<()> { + fn read(&mut self, read_requests: &mut [AlignedRead]) -> ANNResult<()> { let n_requests = read_requests.len(); let n_batches = n_requests.div_ceil(MAX_IO_CONCURRENCY); let ctx = &self.io_context; @@ -122,7 +127,7 @@ mod tests { use serde::{Deserialize, Serialize}; use super::*; - use crate::utils::aligned_file_reader::AlignedRead; + use crate::utils::aligned_file_reader::{AlignedRead, A512}; use diskann_quantization::alloc::{AlignedAllocator, Poly}; fn test_index_path() -> String { @@ -175,7 +180,7 @@ mod tests { // create and add AlignedReads to the vector let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect(); - let mut aligned_reads: Vec> = mem_slices + let mut aligned_reads: Vec> = mem_slices .iter_mut() .enumerate() .map(|(i, slice)| { @@ -214,7 +219,7 @@ mod tests { // Each slice will be used as the buffer for a read request of a sector. let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect(); - let mut aligned_reads: Vec> = mem_slices + let mut aligned_reads: Vec> = mem_slices .iter_mut() .enumerate() .map(|(sector_id, slice)| { @@ -309,7 +314,7 @@ mod tests { fn test_read_no_requests() { let mut reader = WindowsAlignedFileReader::new(&test_index_path()).unwrap(); - let mut read_requests = Vec::>::new(); + let mut read_requests = Vec::>::new(); let result = reader.read(&mut read_requests); assert!(result.is_ok()); }