Skip to content

Commit

Permalink
refactor(python): Refactor functionality related to Series buffers (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego committed Dec 29, 2023
1 parent b8552ef commit b44adb9
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 160 deletions.
4 changes: 2 additions & 2 deletions py-polars/polars/interchange/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def bufsize(self) -> int:
if dtype[0] == DtypeKind.STRING:
return self._data.str.len_bytes().sum() # type: ignore[return-value]
elif dtype[0] == DtypeKind.BOOL:
offset, length, _pointer = self._data._s.get_ptr()
_, offset, length = self._data._get_buffer_info()
n_bits = offset + length
n_bytes, rest = divmod(n_bits, 8)
# Round up to the nearest byte
Expand All @@ -62,7 +62,7 @@ def bufsize(self) -> int:
@property
def ptr(self) -> int:
"""Pointer to start of the buffer as an integer."""
_offset, _length, pointer = self._data._s.get_ptr()
pointer, _, _ = self._data._get_buffer_info()
return pointer

def __dlpack__(self) -> NoReturn:
Expand Down
15 changes: 6 additions & 9 deletions py-polars/polars/interchange/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
Endianness,
)
from polars.interchange.utils import polars_dtype_to_dtype
from polars.utils._wrap import wrap_s

if TYPE_CHECKING:
from collections.abc import Iterator
Expand Down Expand Up @@ -54,7 +53,7 @@ def size(self) -> int:
@property
def offset(self) -> int:
"""Offset of the first element with respect to the start of the underlying buffer.""" # noqa: W505
offset, _length, _pointer = self._col._s.get_ptr()
_, offset, _ = self._col._get_buffer_info()
return offset

@property
Expand Down Expand Up @@ -158,7 +157,7 @@ def get_buffers(self) -> ColumnBuffers:
}

def _get_data_buffer(self) -> tuple[PolarsBuffer, Dtype]:
s = wrap_s(self._col._s.get_buffer(0))
s = self._col._get_buffer(0)
buffer = PolarsBuffer(s, allow_copy=self._allow_copy)

dtype = self.dtype
Expand All @@ -168,21 +167,19 @@ def _get_data_buffer(self) -> tuple[PolarsBuffer, Dtype]:
return buffer, dtype

def _get_validity_buffer(self) -> tuple[PolarsBuffer, Dtype] | None:
buffer = self._col._s.get_buffer(1)
if buffer is None:
s = self._col._get_buffer(1)
if s is None:
return None

s = wrap_s(buffer)
buffer = PolarsBuffer(s, allow_copy=self._allow_copy)
dtype = (DtypeKind.BOOL, 1, "b", Endianness.NATIVE)
return buffer, dtype

def _get_offsets_buffer(self) -> tuple[PolarsBuffer, Dtype] | None:
buffer = self._col._s.get_buffer(2)
if buffer is None:
s = self._col._get_buffer(2)
if s is None:
return None

s = wrap_s(buffer)
buffer = PolarsBuffer(s, allow_copy=self._allow_copy)
dtype = (DtypeKind.INT, 64, "l", Endianness.NATIVE)
return buffer, dtype
77 changes: 72 additions & 5 deletions py-polars/polars/series/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,16 +358,83 @@ def _from_pandas(
pandas_to_pyseries(name, values, nan_to_null=nan_to_null)
)

def _get_ptr(self) -> tuple[int, int, int]:
def _get_buffer_info(self) -> tuple[int, int, int]:
"""
Get a pointer to the start of the values buffer of a numeric Series.
Return pointer, offset, and length information about the underlying buffer.
This will raise an error if the `Series` contains multiple chunks.
Returns
-------
tuple of ints
Tuple of the form (pointer, offset, length)
Raises
------
ComputeError
If the `Series` contains multiple chunks.
"""
return self._s._get_buffer_info()

@overload
def _get_buffer(self, index: Literal[0]) -> Self:
...

@overload
def _get_buffer(self, index: Literal[1, 2]) -> Self | None:
...

def _get_buffer(self, index: Literal[0, 1, 2]) -> Self | None:
"""
Return the underlying data, validity, or offsets buffer as a Series.
The data buffer always exists.
The validity buffer may not exist if the column contains no null values.
The offsets buffer only exists for Series of data type `String` and `List`.
Parameters
----------
index
An index indicating the buffer to return:
- `0` -> data buffer
- `1` -> validity buffer
- `2` -> offsets buffer
This will return the offset, length and the pointer itself.
Returns
-------
Series or None
`Series` if the specified buffer exists, `None` otherwise.
Raises
------
ComputeError
If the `Series` contains multiple chunks.
"""
buffer = self._s._get_buffer(index)
if buffer is None:
return None
return self._from_pyseries(buffer)

@classmethod
def _from_buffer(
self, dtype: PolarsDataType, buffer_info: tuple[int, int, int], base: Any
) -> Self:
"""
Construct a Series from information about its underlying buffer.
Parameters
----------
dtype
The data type of the buffer.
buffer_info
Tuple containing buffer information in the form (pointer, offset, length).
base
The object owning the buffer.
Returns
-------
Series
"""
return self._s.get_ptr()
return self._from_pyseries(PySeries._from_buffer(dtype, buffer_info, base))

@property
def dtype(self) -> DataType:
Expand Down
135 changes: 126 additions & 9 deletions py-polars/src/series/buffers.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,41 @@
use polars::export::arrow;
use polars::export::arrow::array::Array;
use polars::export::arrow::types::NativeType;
use polars_core::export::arrow::array::PrimitiveArray;
use polars_rs::export::arrow::offset::OffsetsBuffer;
use pyo3::exceptions::{PyTypeError, PyValueError};

use super::*;

struct BufferInfo {
pointer: usize,
offset: usize,
length: usize,
}
impl IntoPy<PyObject> for BufferInfo {
fn into_py(self, py: Python<'_>) -> PyObject {
(self.pointer, self.offset, self.length).to_object(py)
}
}
impl<'a> FromPyObject<'a> for BufferInfo {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
let (pointer, offset, length) = ob.extract()?;
Ok(Self {
pointer,
offset,
length,
})
}
}

#[pymethods]
impl PySeries {
/// Returns `(offset, len, ptr)`
fn get_ptr(&self) -> PyResult<(usize, usize, usize)> {
/// Return pointer, offset, and length information about the underlying buffer.
fn _get_buffer_info(&self) -> PyResult<BufferInfo> {
let s = self.series.to_physical_repr();
let arrays = s.chunks();
if arrays.len() != 1 {
let msg = "Only can take pointer, if the 'series' contains a single chunk";
let msg = "cannot get buffer info for Series consisting of multiple chunks";
raise_err!(msg, ComputeError);
}
match s.dtype() {
Expand All @@ -20,21 +45,33 @@ impl PySeries {
// this one is quite useless as you need to know the offset
// into the first byte.
let (slice, start, len) = arr.values().as_slice();
Ok((start, len, slice.as_ptr() as usize))
Ok(BufferInfo {
pointer: slice.as_ptr() as usize,
offset: start,
length: len,
})
},
dt if dt.is_numeric() => Ok(with_match_physical_numeric_polars_type!(s.dtype(), |$T| {
let ca: &ChunkedArray<$T> = s.as_ref().as_ref().as_ref();
(0, ca.len(), get_ptr(ca))
BufferInfo { pointer: get_pointer(ca), offset: 0, length: ca.len() }
})),
DataType::String => {
let ca = s.str().unwrap();
let arr = ca.downcast_iter().next().unwrap();
Ok((0, arr.len(), arr.values().as_ptr() as usize))
Ok(BufferInfo {
pointer: arr.values().as_ptr() as usize,
offset: 0,
length: arr.len(),
})
},
DataType::Binary => {
let ca = s.binary().unwrap();
let arr = ca.downcast_iter().next().unwrap();
Ok((0, arr.len(), arr.values().as_ptr() as usize))
Ok(BufferInfo {
pointer: arr.values().as_ptr() as usize,
offset: 0,
length: arr.len(),
})
},
_ => {
let msg = "Cannot take pointer of nested type, try to first select a buffer";
Expand All @@ -43,7 +80,8 @@ impl PySeries {
}
}

fn get_buffer(&self, index: usize) -> PyResult<Option<Self>> {
/// Return the underlying data, validity, or offsets buffer as a Series.
fn _get_buffer(&self, index: usize) -> PyResult<Option<Self>> {
match self.series.dtype().to_physical() {
dt if dt.is_numeric() => get_buffer_from_primitive(&self.series, index),
DataType::Boolean => get_buffer_from_primitive(&self.series, index),
Expand Down Expand Up @@ -160,7 +198,86 @@ fn get_buffer_from_primitive(s: &Series, index: usize) -> PyResult<Option<PySeri
}
}

fn get_ptr<T: PolarsNumericType>(ca: &ChunkedArray<T>) -> usize {
fn get_pointer<T: PolarsNumericType>(ca: &ChunkedArray<T>) -> usize {
let arr = ca.downcast_iter().next().unwrap();
arr.values().as_ptr() as usize
}

#[pymethods]
impl PySeries {
/// Construct a PySeries from information about its underlying buffer.
#[staticmethod]
unsafe fn _from_buffer(
py: Python,
dtype: Wrap<DataType>,
buffer_info: BufferInfo,
base: &PyAny,
) -> PyResult<Self> {
let dtype = dtype.0;
let BufferInfo {
pointer,
offset,
length,
} = buffer_info;
let base = base.to_object(py);

let arr_boxed = match dtype {
DataType::Int8 => unsafe { from_buffer_impl::<i8>(pointer, length, base) },
DataType::Int16 => unsafe { from_buffer_impl::<i16>(pointer, length, base) },
DataType::Int32 => unsafe { from_buffer_impl::<i32>(pointer, length, base) },
DataType::Int64 => unsafe { from_buffer_impl::<i64>(pointer, length, base) },
DataType::UInt8 => unsafe { from_buffer_impl::<u8>(pointer, length, base) },
DataType::UInt16 => unsafe { from_buffer_impl::<u16>(pointer, length, base) },
DataType::UInt32 => unsafe { from_buffer_impl::<u32>(pointer, length, base) },
DataType::UInt64 => unsafe { from_buffer_impl::<u64>(pointer, length, base) },
DataType::Float32 => unsafe { from_buffer_impl::<f32>(pointer, length, base) },
DataType::Float64 => unsafe { from_buffer_impl::<f64>(pointer, length, base) },
DataType::Boolean => {
unsafe { from_buffer_boolean_impl(pointer, offset, length, base) }?
},
dt => {
return Err(PyTypeError::new_err(format!(
"`from_buffer` requires a physical type as input for `dtype`, got {dt}",
)))
},
};

let s = Series::from_arrow("", arr_boxed).unwrap().into();
Ok(s)
}
}

unsafe fn from_buffer_impl<T: NativeType>(
pointer: usize,
length: usize,
base: Py<PyAny>,
) -> Box<dyn Array> {
let pointer = pointer as *const T;
let slice = unsafe { std::slice::from_raw_parts(pointer, length) };
let arr = unsafe { arrow::ffi::mmap::slice_and_owner(slice, base) };
arr.to_boxed()
}
unsafe fn from_buffer_boolean_impl(
pointer: usize,
offset: usize,
length: usize,
base: Py<PyAny>,
) -> PyResult<Box<dyn Array>> {
let length_in_bytes = get_boolean_buffer_length_in_bytes(length, offset);

let pointer = pointer as *const u8;
let slice = unsafe { std::slice::from_raw_parts(pointer, length_in_bytes) };
let arr_result = unsafe { arrow::ffi::mmap::bitmap_and_owner(slice, offset, length, base) };
let arr = arr_result.map_err(PyPolarsErr::from)?;
Ok(arr.to_boxed())
}
fn get_boolean_buffer_length_in_bytes(length: usize, offset: usize) -> usize {
let n_bits = offset + length;
let n_bytes = n_bits / 8;
let rest = n_bits % 8;
if rest == 0 {
n_bytes
} else {
n_bytes + 1
}
}
Loading

0 comments on commit b44adb9

Please sign in to comment.