Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Add some comments #16008

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 17 additions & 7 deletions crates/polars-io/src/csv/read/read_impl/batched_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_core::frame::DataFrame;
use polars_core::schema::SchemaRef;
use polars_core::POOL;
use polars_error::PolarsResult;
use polars_utils::sync::SyncPtr;
use polars_utils::IdxSize;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};

Expand Down Expand Up @@ -54,6 +55,8 @@ pub(crate) fn get_offsets(
}
}

/// Reads bytes from `file` to `buf` and returns pointers into `buf` that can be parsed.
/// TODO! this can be implemented without copying by pointing in the memmapped file.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this reads from file into buf and then returns offsets (SyncPtr<u8>, usize), which is semantically (*const u8, usize), being the starting offset ptr and the length of the chunk that can be parsed.

These pointers currently point into buf. However we can directly mmap the file and point into the memory mapped slice directly. This saves an explicit read and reslice (copy remaningin bytes to start of buf).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking through and noticed that we also have an mmap-based reader at:

pub struct BatchedCsvReaderMmap<'a> {

It looks like we switch between the 2 readers depending on low_memory:

let batched_reader = if options.low_memory {
let batched_reader = unsafe { Box::new((*reader).batched_borrowed_read()?) };
let batched_reader = Box::leak(batched_reader) as *mut BatchedCsvReaderRead;
Either::Right(batched_reader)
} else {
let batched_reader = unsafe { Box::new((*reader).batched_borrowed_mmap()?) };
let batched_reader = Box::leak(batched_reader) as *mut BatchedCsvReaderMmap;
Either::Left(batched_reader)
};

I'm thinking we can maybe just always use the mmap reader? I don't think mmap should be memory intensive, and we could also adjust how far we seek as well to accomodate low memory conditions.

struct ChunkReader<'a> {
file: &'a File,
buf: Vec<u8>,
Expand Down Expand Up @@ -109,18 +112,23 @@ impl<'a> ChunkReader<'a> {
self.buf_end = 0;
}

fn return_slice(&self, start: usize, end: usize) -> (usize, usize) {
fn return_slice(&self, start: usize, end: usize) -> (SyncPtr<u8>, usize) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return SyncPtr as it is more clear that we return pointers that way.

let slice = &self.buf[start..end];
let len = slice.len();
(slice.as_ptr() as usize, len)
(slice.as_ptr().into(), len)
}

fn get_buf(&self) -> (usize, usize) {
fn get_buf_remaining(&self) -> (SyncPtr<u8>, usize) {
let slice = &self.buf[self.buf_end..];
let len = slice.len();
(slice.as_ptr() as usize, len)
(slice.as_ptr().into(), len)
}

// Get next `n` offset positions. Where `n` is number of chunks.

// This returns pointers into slices into `buf`
// we must process the slices before the next call
// as that will overwrite the slices
fn read(&mut self, n: usize) -> bool {
self.reslice();

Expand Down Expand Up @@ -267,7 +275,7 @@ pub struct BatchedCsvReaderRead<'a> {
chunk_size: usize,
finished: bool,
file_chunk_reader: ChunkReader<'a>,
file_chunks: Vec<(usize, usize)>,
file_chunks: Vec<(SyncPtr<u8>, usize)>,
projection: Vec<usize>,
starting_point_offset: Option<usize>,
row_index: Option<RowIndex>,
Expand All @@ -292,6 +300,7 @@ pub struct BatchedCsvReaderRead<'a> {
}
//
impl<'a> BatchedCsvReaderRead<'a> {
/// `n` number of batches.
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if n == 0 || self.finished {
return Ok(None);
Expand Down Expand Up @@ -320,7 +329,8 @@ impl<'a> BatchedCsvReaderRead<'a> {
// ensure we process the final slice as well.
if self.file_chunk_reader.finished && self.file_chunks.len() < n {
// get the final slice
self.file_chunks.push(self.file_chunk_reader.get_buf());
self.file_chunks
.push(self.file_chunk_reader.get_buf_remaining());
self.finished = true
}

Expand All @@ -333,7 +343,7 @@ impl<'a> BatchedCsvReaderRead<'a> {
self.file_chunks
.par_iter()
.map(|(ptr, len)| {
let chunk = unsafe { std::slice::from_raw_parts(*ptr as *const u8, *len) };
let chunk = unsafe { std::slice::from_raw_parts(ptr.get(), *len) };
let stop_at_n_bytes = chunk.len();
let mut df = read_chunk(
chunk,
Expand Down
12 changes: 7 additions & 5 deletions crates/polars-utils/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ impl<T> SyncPtr<T> {
Self(ptr)
}

/// # Safety
///
/// This will make a pointer sync and send.
/// Ensure that you don't break aliasing rules.
pub unsafe fn from_const(ptr: *const T) -> Self {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating this isn't unsafe.

pub fn from_const(ptr: *const T) -> Self {
Self(ptr as *mut T)
}

Expand All @@ -43,3 +39,9 @@ impl<T> SyncPtr<T> {

unsafe impl<T> Sync for SyncPtr<T> {}
unsafe impl<T> Send for SyncPtr<T> {}

impl<T> From<*const T> for SyncPtr<T> {
fn from(value: *const T) -> Self {
Self::from_const(value)
}
}