Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(python): Refactor functionality related to Series buffers #13291

Merged
merged 6 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions py-polars/polars/interchange/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def bufsize(self) -> int:
if dtype[0] == DtypeKind.STRING:
return self._data.str.len_bytes().sum() # type: ignore[return-value]
elif dtype[0] == DtypeKind.BOOL:
offset, length, _pointer = self._data._s.get_ptr()
_, offset, length = self._data._get_buffer_info()
n_bits = offset + length
n_bytes, rest = divmod(n_bits, 8)
# Round up to the nearest byte
Expand All @@ -62,7 +62,7 @@ def bufsize(self) -> int:
@property
def ptr(self) -> int:
"""Pointer to start of the buffer as an integer."""
_offset, _length, pointer = self._data._s.get_ptr()
pointer, _, _ = self._data._get_buffer_info()
return pointer

def __dlpack__(self) -> NoReturn:
Expand Down
15 changes: 6 additions & 9 deletions py-polars/polars/interchange/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
Endianness,
)
from polars.interchange.utils import polars_dtype_to_dtype
from polars.utils._wrap import wrap_s

if TYPE_CHECKING:
from collections.abc import Iterator
Expand Down Expand Up @@ -54,7 +53,7 @@ def size(self) -> int:
@property
def offset(self) -> int:
"""Offset of the first element with respect to the start of the underlying buffer.""" # noqa: W505
offset, _length, _pointer = self._col._s.get_ptr()
_, offset, _ = self._col._get_buffer_info()
return offset

@property
Expand Down Expand Up @@ -158,7 +157,7 @@ def get_buffers(self) -> ColumnBuffers:
}

def _get_data_buffer(self) -> tuple[PolarsBuffer, Dtype]:
s = wrap_s(self._col._s.get_buffer(0))
s = self._col._get_buffer(0)
buffer = PolarsBuffer(s, allow_copy=self._allow_copy)

dtype = self.dtype
Expand All @@ -168,21 +167,19 @@ def _get_data_buffer(self) -> tuple[PolarsBuffer, Dtype]:
return buffer, dtype

def _get_validity_buffer(self) -> tuple[PolarsBuffer, Dtype] | None:
buffer = self._col._s.get_buffer(1)
if buffer is None:
s = self._col._get_buffer(1)
if s is None:
return None

s = wrap_s(buffer)
buffer = PolarsBuffer(s, allow_copy=self._allow_copy)
dtype = (DtypeKind.BOOL, 1, "b", Endianness.NATIVE)
return buffer, dtype

def _get_offsets_buffer(self) -> tuple[PolarsBuffer, Dtype] | None:
buffer = self._col._s.get_buffer(2)
if buffer is None:
s = self._col._get_buffer(2)
if s is None:
return None

s = wrap_s(buffer)
buffer = PolarsBuffer(s, allow_copy=self._allow_copy)
dtype = (DtypeKind.INT, 64, "l", Endianness.NATIVE)
return buffer, dtype
77 changes: 72 additions & 5 deletions py-polars/polars/series/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,16 +358,83 @@ def _from_pandas(
pandas_to_pyseries(name, values, nan_to_null=nan_to_null)
)

def _get_ptr(self) -> tuple[int, int, int]:
def _get_buffer_info(self) -> tuple[int, int, int]:
"""
Get a pointer to the start of the values buffer of a numeric Series.
Return pointer, offset, and length information about the underlying buffer.

This will raise an error if the `Series` contains multiple chunks.
Returns
-------
tuple of ints
Tuple of the form (pointer, offset, length)

Raises
------
ComputeError
If the `Series` contains multiple chunks.
"""
return self._s._get_buffer_info()

@overload
def _get_buffer(self, index: Literal[0]) -> Self:
...

@overload
def _get_buffer(self, index: Literal[1, 2]) -> Self | None:
...

def _get_buffer(self, index: Literal[0, 1, 2]) -> Self | None:
"""
Return the underlying data, validity, or offsets buffer as a Series.

The data buffer always exists.
The validity buffer may not exist if the column contains no null values.
The offsets buffer only exists for Series of data type `String` and `List`.

Parameters
----------
index
An index indicating the buffer to return:

- `0` -> data buffer
- `1` -> validity buffer
- `2` -> offsets buffer

This will return the offset, length and the pointer itself.
Returns
-------
Series or None
`Series` if the specified buffer exists, `None` otherwise.

Raises
------
ComputeError
If the `Series` contains multiple chunks.
"""
buffer = self._s._get_buffer(index)
if buffer is None:
return None
return self._from_pyseries(buffer)

@classmethod
def _from_buffer(
self, dtype: PolarsDataType, buffer_info: tuple[int, int, int], base: Any
) -> Self:
"""
Construct a Series from information about its underlying buffer.

Parameters
----------
dtype
The data type of the buffer.
buffer_info
Tuple containing buffer information in the form (pointer, offset, length).
base
The object owning the buffer.

Returns
-------
Series
"""
return self._s.get_ptr()
return self._from_pyseries(PySeries._from_buffer(dtype, buffer_info, base))

@property
def dtype(self) -> DataType:
Expand Down
135 changes: 126 additions & 9 deletions py-polars/src/series/buffers.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,41 @@
use polars::export::arrow;
use polars::export::arrow::array::Array;
use polars::export::arrow::types::NativeType;
use polars_core::export::arrow::array::PrimitiveArray;
use polars_rs::export::arrow::offset::OffsetsBuffer;
use pyo3::exceptions::{PyTypeError, PyValueError};

use super::*;

struct BufferInfo {
pointer: usize,
offset: usize,
length: usize,
}
impl IntoPy<PyObject> for BufferInfo {
fn into_py(self, py: Python<'_>) -> PyObject {
(self.pointer, self.offset, self.length).to_object(py)
}
}
impl<'a> FromPyObject<'a> for BufferInfo {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
let (pointer, offset, length) = ob.extract()?;
Ok(Self {
pointer,
offset,
length,
})
}
}

#[pymethods]
impl PySeries {
/// Returns `(offset, len, ptr)`
fn get_ptr(&self) -> PyResult<(usize, usize, usize)> {
/// Return pointer, offset, and length information about the underlying buffer.
fn _get_buffer_info(&self) -> PyResult<BufferInfo> {
let s = self.series.to_physical_repr();
let arrays = s.chunks();
if arrays.len() != 1 {
let msg = "Only can take pointer, if the 'series' contains a single chunk";
let msg = "cannot get buffer info for Series consisting of multiple chunks";
raise_err!(msg, ComputeError);
}
match s.dtype() {
Expand All @@ -20,21 +45,33 @@ impl PySeries {
// this one is quite useless as you need to know the offset
// into the first byte.
let (slice, start, len) = arr.values().as_slice();
Ok((start, len, slice.as_ptr() as usize))
Ok(BufferInfo {
pointer: slice.as_ptr() as usize,
offset: start,
length: len,
})
},
dt if dt.is_numeric() => Ok(with_match_physical_numeric_polars_type!(s.dtype(), |$T| {
let ca: &ChunkedArray<$T> = s.as_ref().as_ref().as_ref();
(0, ca.len(), get_ptr(ca))
BufferInfo { pointer: get_pointer(ca), offset: 0, length: ca.len() }
})),
DataType::String => {
let ca = s.str().unwrap();
let arr = ca.downcast_iter().next().unwrap();
Ok((0, arr.len(), arr.values().as_ptr() as usize))
Ok(BufferInfo {
pointer: arr.values().as_ptr() as usize,
offset: 0,
length: arr.len(),
})
},
DataType::Binary => {
let ca = s.binary().unwrap();
let arr = ca.downcast_iter().next().unwrap();
Ok((0, arr.len(), arr.values().as_ptr() as usize))
Ok(BufferInfo {
pointer: arr.values().as_ptr() as usize,
offset: 0,
length: arr.len(),
})
},
_ => {
let msg = "Cannot take pointer of nested type, try to first select a buffer";
Expand All @@ -43,7 +80,8 @@ impl PySeries {
}
}

fn get_buffer(&self, index: usize) -> PyResult<Option<Self>> {
/// Return the underlying data, validity, or offsets buffer as a Series.
fn _get_buffer(&self, index: usize) -> PyResult<Option<Self>> {
match self.series.dtype().to_physical() {
dt if dt.is_numeric() => get_buffer_from_primitive(&self.series, index),
DataType::Boolean => get_buffer_from_primitive(&self.series, index),
Expand Down Expand Up @@ -160,7 +198,86 @@ fn get_buffer_from_primitive(s: &Series, index: usize) -> PyResult<Option<PySeri
}
}

fn get_ptr<T: PolarsNumericType>(ca: &ChunkedArray<T>) -> usize {
fn get_pointer<T: PolarsNumericType>(ca: &ChunkedArray<T>) -> usize {
let arr = ca.downcast_iter().next().unwrap();
arr.values().as_ptr() as usize
}

#[pymethods]
impl PySeries {
/// Construct a PySeries from information about its underlying buffer.
#[staticmethod]
unsafe fn _from_buffer(
py: Python,
dtype: Wrap<DataType>,
buffer_info: BufferInfo,
base: &PyAny,
) -> PyResult<Self> {
let dtype = dtype.0;
let BufferInfo {
pointer,
offset,
length,
} = buffer_info;
let base = base.to_object(py);

let arr_boxed = match dtype {
DataType::Int8 => unsafe { from_buffer_impl::<i8>(pointer, length, base) },
DataType::Int16 => unsafe { from_buffer_impl::<i16>(pointer, length, base) },
DataType::Int32 => unsafe { from_buffer_impl::<i32>(pointer, length, base) },
DataType::Int64 => unsafe { from_buffer_impl::<i64>(pointer, length, base) },
DataType::UInt8 => unsafe { from_buffer_impl::<u8>(pointer, length, base) },
DataType::UInt16 => unsafe { from_buffer_impl::<u16>(pointer, length, base) },
DataType::UInt32 => unsafe { from_buffer_impl::<u32>(pointer, length, base) },
DataType::UInt64 => unsafe { from_buffer_impl::<u64>(pointer, length, base) },
DataType::Float32 => unsafe { from_buffer_impl::<f32>(pointer, length, base) },
DataType::Float64 => unsafe { from_buffer_impl::<f64>(pointer, length, base) },
DataType::Boolean => {
unsafe { from_buffer_boolean_impl(pointer, offset, length, base) }?
},
dt => {
return Err(PyTypeError::new_err(format!(
"`from_buffer` requires a physical type as input for `dtype`, got {dt}",
)))
},
};

let s = Series::from_arrow("", arr_boxed).unwrap().into();
Ok(s)
}
}

unsafe fn from_buffer_impl<T: NativeType>(
pointer: usize,
length: usize,
base: Py<PyAny>,
) -> Box<dyn Array> {
let pointer = pointer as *const T;
let slice = unsafe { std::slice::from_raw_parts(pointer, length) };
let arr = unsafe { arrow::ffi::mmap::slice_and_owner(slice, base) };
arr.to_boxed()
}
unsafe fn from_buffer_boolean_impl(
pointer: usize,
offset: usize,
length: usize,
base: Py<PyAny>,
) -> PyResult<Box<dyn Array>> {
let length_in_bytes = get_boolean_buffer_length_in_bytes(length, offset);

let pointer = pointer as *const u8;
let slice = unsafe { std::slice::from_raw_parts(pointer, length_in_bytes) };
let arr_result = unsafe { arrow::ffi::mmap::bitmap_and_owner(slice, offset, length, base) };
let arr = arr_result.map_err(PyPolarsErr::from)?;
Ok(arr.to_boxed())
}
fn get_boolean_buffer_length_in_bytes(length: usize, offset: usize) -> usize {
let n_bits = offset + length;
let n_bytes = n_bits / 8;
let rest = n_bits % 8;
if rest == 0 {
n_bytes
} else {
n_bytes + 1
}
}
Loading