From bc8cdd9d6de9e60528083abee678ddb0070593fd Mon Sep 17 00:00:00 2001 From: ritchie Date: Thu, 2 May 2024 09:28:45 +0200 Subject: [PATCH] refactor: Add some comments --- .../src/csv/read/read_impl/batched_read.rs | 24 +++++++++++++------ crates/polars-utils/src/sync.rs | 12 ++++++---- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/crates/polars-io/src/csv/read/read_impl/batched_read.rs b/crates/polars-io/src/csv/read/read_impl/batched_read.rs index 9098d255c6a2..64e165844e7a 100644 --- a/crates/polars-io/src/csv/read/read_impl/batched_read.rs +++ b/crates/polars-io/src/csv/read/read_impl/batched_read.rs @@ -7,6 +7,7 @@ use polars_core::frame::DataFrame; use polars_core::schema::SchemaRef; use polars_core::POOL; use polars_error::PolarsResult; +use polars_utils::sync::SyncPtr; use polars_utils::IdxSize; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; @@ -54,6 +55,8 @@ pub(crate) fn get_offsets( } } +/// Reads bytes from `file` to `buf` and returns pointers into `buf` that can be parsed. +/// TODO! this can be implemented without copying by pointing in the memmapped file. struct ChunkReader<'a> { file: &'a File, buf: Vec, @@ -109,18 +112,23 @@ impl<'a> ChunkReader<'a> { self.buf_end = 0; } - fn return_slice(&self, start: usize, end: usize) -> (usize, usize) { + fn return_slice(&self, start: usize, end: usize) -> (SyncPtr, usize) { let slice = &self.buf[start..end]; let len = slice.len(); - (slice.as_ptr() as usize, len) + (slice.as_ptr().into(), len) } - fn get_buf(&self) -> (usize, usize) { + fn get_buf_remaining(&self) -> (SyncPtr, usize) { let slice = &self.buf[self.buf_end..]; let len = slice.len(); - (slice.as_ptr() as usize, len) + (slice.as_ptr().into(), len) } + // Get next `n` offset positions. Where `n` is number of chunks. + + // This returns pointers into slices into `buf` + // we must process the slices before the next call + // as that will overwrite the slices fn read(&mut self, n: usize) -> bool { self.reslice(); @@ -267,7 +275,7 @@ pub struct BatchedCsvReaderRead<'a> { chunk_size: usize, finished: bool, file_chunk_reader: ChunkReader<'a>, - file_chunks: Vec<(usize, usize)>, + file_chunks: Vec<(SyncPtr, usize)>, projection: Vec, starting_point_offset: Option, row_index: Option, @@ -292,6 +300,7 @@ pub struct BatchedCsvReaderRead<'a> { } // impl<'a> BatchedCsvReaderRead<'a> { + /// `n` number of batches. pub fn next_batches(&mut self, n: usize) -> PolarsResult>> { if n == 0 || self.finished { return Ok(None); @@ -320,7 +329,8 @@ impl<'a> BatchedCsvReaderRead<'a> { // ensure we process the final slice as well. if self.file_chunk_reader.finished && self.file_chunks.len() < n { // get the final slice - self.file_chunks.push(self.file_chunk_reader.get_buf()); + self.file_chunks + .push(self.file_chunk_reader.get_buf_remaining()); self.finished = true } @@ -333,7 +343,7 @@ impl<'a> BatchedCsvReaderRead<'a> { self.file_chunks .par_iter() .map(|(ptr, len)| { - let chunk = unsafe { std::slice::from_raw_parts(*ptr as *const u8, *len) }; + let chunk = unsafe { std::slice::from_raw_parts(ptr.get(), *len) }; let stop_at_n_bytes = chunk.len(); let mut df = read_chunk( chunk, diff --git a/crates/polars-utils/src/sync.rs b/crates/polars-utils/src/sync.rs index 3659130990b2..e4257ac17b82 100644 --- a/crates/polars-utils/src/sync.rs +++ b/crates/polars-utils/src/sync.rs @@ -13,11 +13,7 @@ impl SyncPtr { Self(ptr) } - /// # Safety - /// - /// This will make a pointer sync and send. - /// Ensure that you don't break aliasing rules. - pub unsafe fn from_const(ptr: *const T) -> Self { + pub fn from_const(ptr: *const T) -> Self { Self(ptr as *mut T) } @@ -43,3 +39,9 @@ impl SyncPtr { unsafe impl Sync for SyncPtr {} unsafe impl Send for SyncPtr {} + +impl From<*const T> for SyncPtr { + fn from(value: *const T) -> Self { + Self::from_const(value) + } +}