Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added buffer interoperability with arrow-rs (#1429) (#1437)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Mar 22, 2023
1 parent f16900a commit 1073211
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 5 deletions.
9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "Unofficial implementation of Apache Arrow spec in safe Rust"
homepage = "https://github.com/jorgecarleitao/arrow2"
repository = "https://github.com/jorgecarleitao/arrow2"
authors = ["Jorge C. Leitao <jorgecarleitao@gmail.com>", "Apache Arrow <dev@arrow.apache.org>"]
keywords = [ "arrow", "analytics" ]
keywords = ["arrow", "analytics"]
edition = "2021"
exclude = ["testing/"]

Expand Down Expand Up @@ -100,6 +100,9 @@ odbc-api = { version = "0.36", optional = true }
# Faster hashing
ahash = "0.8"

# Support conversion to/from arrow-rs
arrow-buffer = { version = "35.0.0", optional = true }

[target.wasm32-unknown-unknown.dependencies]
getrandom = { version = "0.2", features = ["js"] }

Expand Down Expand Up @@ -131,6 +134,7 @@ rustdoc-args = ["--cfg", "docsrs"]
[features]
default = []
full = [
"arrow",
"io_odbc",
"io_csv",
"io_csv_async",
Expand All @@ -154,6 +158,7 @@ full = [
# parses timezones used in timestamp conversions
"chrono-tz",
]
arrow = ["arrow-buffer"]
io_odbc = ["odbc-api"]
io_csv = ["io_csv_read", "io_csv_write"]
io_csv_async = ["io_csv_read_async"]
Expand Down Expand Up @@ -195,7 +200,7 @@ io_avro_compression = [
]
io_avro_async = ["avro-schema/async"]

io_orc = [ "orc-format" ]
io_orc = ["orc-format"]

# serde+serde_json: its dependencies + error handling
# serde_derive: there is some derive around
Expand Down
27 changes: 27 additions & 0 deletions src/bitmap/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,22 @@ impl Bitmap {
) -> std::result::Result<Self, E> {
Ok(MutableBitmap::try_from_trusted_len_iter_unchecked(iterator)?.into())
}

/// Create a new [`Bitmap`] from an arrow [`NullBuffer`]
///
/// [`NullBuffer`]: arrow_buffer::buffer::NullBuffer
#[cfg(feature = "arrow")]
pub fn from_null_buffer(value: arrow_buffer::buffer::NullBuffer) -> Self {
let offset = value.offset();
let length = value.len();
let unset_bits = value.null_count();
Self {
offset,
length,
unset_bits,
bytes: Arc::new(crate::buffer::to_bytes(value.buffer().clone())),
}
}
}

impl<'a> IntoIterator for &'a Bitmap {
Expand All @@ -394,3 +410,14 @@ impl IntoIterator for Bitmap {
IntoIter::new(self)
}
}

#[cfg(feature = "arrow")]
impl From<Bitmap> for arrow_buffer::buffer::NullBuffer {
fn from(value: Bitmap) -> Self {
let null_count = value.unset_bits;
let buffer = crate::buffer::to_buffer(value.bytes);
let buffer = arrow_buffer::buffer::BooleanBuffer::new(buffer, value.offset, value.length);
// Safety: null count is accurate
unsafe { arrow_buffer::buffer::NullBuffer::new_unchecked(buffer, null_count) }
}
}
14 changes: 14 additions & 0 deletions src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,17 @@ impl<T: Copy> IntoIterator for Buffer<T> {
IntoIter::new(self)
}
}

#[cfg(feature = "arrow")]
impl<T: crate::types::NativeType> From<arrow_buffer::Buffer> for Buffer<T> {
fn from(value: arrow_buffer::Buffer) -> Self {
Self::from_bytes(crate::buffer::to_bytes(value))
}
}

#[cfg(feature = "arrow")]
impl<T: crate::types::NativeType> From<Buffer<T>> for arrow_buffer::Buffer {
fn from(value: Buffer<T>) -> Self {
crate::buffer::to_buffer(value.data)
}
}
37 changes: 36 additions & 1 deletion src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,42 @@ mod iterator;

use crate::ffi::InternalArrowArray;

pub(crate) type Bytes<T> = foreign_vec::ForeignVec<InternalArrowArray, T>;
pub(crate) enum BytesAllocator {
InternalArrowArray(InternalArrowArray),

#[cfg(feature = "arrow")]
Arrow(arrow_buffer::Buffer),
}

pub(crate) type Bytes<T> = foreign_vec::ForeignVec<BytesAllocator, T>;

#[cfg(feature = "arrow")]
pub(crate) fn to_buffer<T: crate::types::NativeType>(
value: std::sync::Arc<Bytes<T>>,
) -> arrow_buffer::Buffer {
// This should never panic as ForeignVec pointer must be non-null
let ptr = std::ptr::NonNull::new(value.as_ptr() as _).unwrap();
let len = value.len() * std::mem::size_of::<T>();
// Safety: allocation is guaranteed to be valid for `len` bytes
unsafe { arrow_buffer::Buffer::from_custom_allocation(ptr, len, value) }
}

#[cfg(feature = "arrow")]
pub(crate) fn to_bytes<T: crate::types::NativeType>(value: arrow_buffer::Buffer) -> Bytes<T> {
let ptr = value.as_ptr();
let align = ptr.align_offset(std::mem::align_of::<T>());
assert_eq!(align, 0, "not aligned");
let len = value.len() / std::mem::size_of::<T>();

// Valid as `NativeType: Pod` and checked alignment above
let ptr = value.as_ptr() as *const T;

let owner = crate::buffer::BytesAllocator::Arrow(value);

// Safety: slice is valid for len elements of T
unsafe { Bytes::from_foreign(ptr, len, owner) }
}

pub(super) use iterator::IntoIter;

pub use immutable::Buffer;
5 changes: 3 additions & 2 deletions src/ffi/array.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Contains functionality to load an ArrayData from the C Data Interface
use std::sync::Arc;

use crate::buffer::BytesAllocator;
use crate::{
array::*,
bitmap::{utils::bytes_for, Bitmap},
Expand Down Expand Up @@ -237,7 +238,7 @@ unsafe fn create_buffer<T: NativeType>(

let len = buffer_len(array, data_type, index)?;
let offset = buffer_offset(array, data_type, index);
let bytes = Bytes::from_foreign(ptr, len, owner);
let bytes = Bytes::from_foreign(ptr, len, BytesAllocator::InternalArrowArray(owner));

Ok(Buffer::from_bytes(bytes).sliced(offset, len - offset))
}
Expand All @@ -258,7 +259,7 @@ unsafe fn create_bitmap(
let len: usize = array.length.try_into().expect("length to fit in `usize`");
let offset: usize = array.offset.try_into().expect("Offset to fit in `usize`");
let bytes_len = bytes_for(offset + len);
let bytes = Bytes::from_foreign(ptr, bytes_len, owner);
let bytes = Bytes::from_foreign(ptr, bytes_len, BytesAllocator::InternalArrowArray(owner));

Ok(Bitmap::from_bytes(bytes, offset + len).sliced(offset, len))
}
Expand Down
2 changes: 2 additions & 0 deletions src/types/native.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::convert::TryFrom;
use std::ops::Neg;
use std::panic::RefUnwindSafe;

use bytemuck::{Pod, Zeroable};

Expand All @@ -14,6 +15,7 @@ pub trait NativeType:
+ Send
+ Sync
+ Sized
+ RefUnwindSafe
+ std::fmt::Debug
+ std::fmt::Display
+ PartialEq
Expand Down
26 changes: 26 additions & 0 deletions tests/it/bitmap/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,29 @@ fn debug() {

assert_eq!(format!("{b:?}"), "[0b111110__, 0b_______1]");
}

#[test]
#[cfg(feature = "arrow")]
fn from_arrow() {
use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
let buffer = arrow_buffer::Buffer::from_iter(vec![true, true, true, false, false, false, true]);
let bools = BooleanBuffer::new(buffer, 0, 7);
let nulls = NullBuffer::new(bools);
assert_eq!(nulls.null_count(), 3);

let bitmap = Bitmap::from_null_buffer(nulls.clone());
assert_eq!(nulls.null_count(), bitmap.unset_bits());
assert_eq!(nulls.len(), bitmap.len());
let back = NullBuffer::from(bitmap);
assert_eq!(nulls, back);

let nulls = nulls.slice(1, 3);
assert_eq!(nulls.null_count(), 1);
assert_eq!(nulls.len(), 3);

let bitmap = Bitmap::from_null_buffer(nulls.clone());
assert_eq!(nulls.null_count(), bitmap.unset_bits());
assert_eq!(nulls.len(), bitmap.len());
let back = NullBuffer::from(bitmap);
assert_eq!(nulls, back);
}
64 changes: 64 additions & 0 deletions tests/it/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,67 @@ fn from_vec() {
assert_eq!(buffer.len(), 3);
assert_eq!(buffer.as_slice(), &[0, 1, 2]);
}

#[test]
#[cfg(feature = "arrow")]
fn from_arrow() {
let buffer = arrow_buffer::Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]);
let b = Buffer::<i32>::from(buffer.clone());
assert_eq!(b.len(), 3);
assert_eq!(b.as_slice(), &[1, 2, 3]);
let back = arrow_buffer::Buffer::from(b);
assert_eq!(back, buffer);

let buffer = buffer.slice(4);
let b = Buffer::<i32>::from(buffer.clone());
assert_eq!(b.len(), 2);
assert_eq!(b.as_slice(), &[2, 3]);
let back = arrow_buffer::Buffer::from(b);
assert_eq!(back, buffer);

let buffer = arrow_buffer::Buffer::from_vec(vec![1_i64, 2_i64]);
let b = Buffer::<i32>::from(buffer.clone());
assert_eq!(b.len(), 4);
assert_eq!(b.as_slice(), &[1, 0, 2, 0]);
let back = arrow_buffer::Buffer::from(b);
assert_eq!(back, buffer);

let buffer = buffer.slice(4);
let b = Buffer::<i32>::from(buffer.clone());
assert_eq!(b.len(), 3);
assert_eq!(b.as_slice(), &[0, 2, 0]);
let back = arrow_buffer::Buffer::from(b);
assert_eq!(back, buffer);
}

#[test]
#[cfg(feature = "arrow")]
fn from_arrow_vec() {
// Zero-copy vec conversion in arrow-rs
let buffer = arrow_buffer::Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]);
let back: Vec<i32> = buffer.into_vec().unwrap();

// Zero-copy vec conversion in arrow2
let buffer = Buffer::<i32>::from(back);
let back: Vec<i32> = buffer.into_mut().unwrap_right();

let buffer = arrow_buffer::Buffer::from_vec(back);
let buffer = Buffer::<i32>::from(buffer);

// But not possible after conversion between buffer representations
let _ = buffer.into_mut().unwrap_left();

let buffer = Buffer::<i32>::from(vec![1_i32]);
let buffer = arrow_buffer::Buffer::from(buffer);

// But not possible after conversion between buffer representations
let _ = buffer.into_vec::<i32>().unwrap_err();
}

#[test]
#[cfg(feature = "arrow")]
#[should_panic(expected = "not aligned")]
fn from_arrow_misaligned() {
let buffer = arrow_buffer::Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]).slice(1);
let _ = Buffer::<i32>::from(buffer);
}

0 comments on commit 1073211

Please sign in to comment.