Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions vortex-array/src/arrays/varbinview/build_views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,46 +24,68 @@ pub fn offsets_to_lengths<P: NativePType>(offsets: &[P]) -> Buffer<P> {
/// Maximum number of buffer bytes that can be referenced by a single `BinaryView`
pub const MAX_BUFFER_LEN: usize = i32::MAX as usize;

/// Split a large buffer of input `bytes` holding string data
/// Build `BinaryView`s from a contiguous byte buffer and per-element lengths.
///
/// When total data exceeds `max_buffer_len` (2 GiB), buffers are split to ensure
/// offsets fit in `u32`.
pub fn build_views<P: NativePType + AsPrimitive<usize>>(
start_buf_index: u32,
max_buffer_len: usize,
mut bytes: ByteBufferMut,
lens: &[P],
) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
let mut views = BufferMut::<BinaryView>::with_capacity(lens.len());
let views_dst = views.spare_capacity_mut().as_mut_ptr().cast::<BinaryView>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, how much perf are we talkin about here


let mut buffers = Vec::new();
let mut buf_index = start_buf_index;

let mut offset = 0;
for &len in lens {
let mut bytes_ptr = bytes.as_slice().as_ptr();

for (i, &len) in lens.iter().enumerate() {
let len = len.as_();
assert!(len <= max_buffer_len, "values cannot exceed max_buffer_len");

if (offset + len) > max_buffer_len {
// Roll the buffer every 2GiB, to avoid overflowing VarBinView offset field
let rest = bytes.split_off(offset);

buffers.push(bytes.freeze());
buf_index += 1;
offset = 0;

bytes = rest;
bytes_ptr = bytes.as_slice().as_ptr();
}
let view = BinaryView::make_view(&bytes[offset..][..len], buf_index, offset.as_());
// SAFETY: we reserved the right capacity beforehand
unsafe { views.push_unchecked(view) };

// SAFETY: we reserved capacity for lens.len() views and i < lens.len().
// The split check above keeps offsets within max_buffer_len; the lengths
// describe bytes sequentially, so offset + len fits in the current byte buffer.
unsafe { write_view(views_dst, i, bytes_ptr, len, buf_index, offset) };
offset += len;
}

if !bytes.is_empty() {
buffers.push(bytes.freeze());
}

// SAFETY: the loop writes exactly lens.len() views into the reserved capacity.
unsafe { views.set_len(lens.len()) };

(buffers, views.freeze())
}

#[inline(always)]
unsafe fn write_view(
views_dst: *mut BinaryView,
view_index: usize,
bytes_ptr: *const u8,
len: usize,
buf_index: u32,
offset: usize,
) {
let value = unsafe { std::slice::from_raw_parts(bytes_ptr.add(offset), len) };
let view = BinaryView::make_view(value, buf_index, offset.as_());
unsafe { views_dst.add(view_index).write(view) };
}

#[cfg(test)]
mod tests {
use vortex_buffer::ByteBuffer;
Expand Down
Loading