Skip to content

Commit

Permalink
[Python] wrap more safety around ufunc utilizaton and implementation …
Browse files Browse the repository at this point in the history
…on all supported types; closes #38 closes #32
  • Loading branch information
ritchie46 committed Oct 3, 2020
1 parent 9d10675 commit ce8cab0
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 87 deletions.
7 changes: 0 additions & 7 deletions py-polars/pypolars/ffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,6 @@
from numpy import ctypeslib
import ctypes
from typing import Any
from .pypolars import (
aligned_array_f32,
aligned_array_f64,
aligned_array_i32,
aligned_array_i64,
series_from_ptr_f64,
)

# https://stackoverflow.com/questions/4355524/getting-data-from-ctypes-array-into-numpy

Expand Down
7 changes: 6 additions & 1 deletion py-polars/pypolars/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,12 @@ def groupby(self, by: Union[str, List[str]]) -> GroupBy:
return GroupBy(self._df, by)

def join(
self, df: DataFrame, left_on: str, right_on: str, how="inner", parallel: bool = False
self,
df: DataFrame,
left_on: str,
right_on: str,
how="inner",
parallel: bool = False,
) -> DataFrame:
"""
SQL like joins
Expand Down
85 changes: 70 additions & 15 deletions py-polars/pypolars/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,56 @@
from .pypolars import PySeries
import numpy as np
from typing import Optional, List, Sequence, Union, Any
from .ffi import ptr_to_numpy, aligned_array_f64, series_from_ptr_f64
from .ffi import ptr_to_numpy

import ctypes
from numbers import Number


class IdentityDict(dict):
def __missing__(self, key):
return key


# TODO: add all polars supported primitives.
DTYPE_TO_FFINAME = IdentityDict()
DTYPE_TO_FFINAME["date32"] = lambda x: "i32"
DTYPE_TO_FFINAME["date64"] = lambda x: "i64"
DTYPE_TO_FFINAME["time64(ns)"] = "i64"
DTYPE_TO_FFINAME["duration(ns)"] = "i64"


def get_ffi_func(
name: str, dtype: str, obj: Optional[Series] = None, default: Optional = None
):
"""
Dynamically obtain the proper ffi function/ method.
Parameters
----------
name
function or method name where dtype is replaced by <>
for example
"call_foo_<>"
dtype
polars dtype str
obj
Optional object to find the method for. If none provided globals are used.
default
default function to use if not found.
Returns
-------
ffi function
"""
ffi_name = DTYPE_TO_FFINAME[dtype]
fname = name.replace("<>", ffi_name)
if obj:
return getattr(obj, fname, default)
else:
return globals().get(fname, default)


def wrap_s(s: PySeries) -> Series:
return Series.from_pyseries(s)

Expand Down Expand Up @@ -47,8 +92,12 @@ def __init__(
if not isinstance(values, np.ndarray) and not nullable:
values = np.array(values)

# series path
if isinstance(values, Series):
self.from_pyseries(values)

# numpy path
if isinstance(values, np.ndarray):
elif isinstance(values, np.ndarray):
dtype = values.dtype
if dtype == np.int64:
self._s = PySeries.new_i64(name, values)
Expand Down Expand Up @@ -76,7 +125,6 @@ def __init__(
self._s = PySeries.new_u64(name, values)
else:
raise ValueError(f"dtype: {dtype} not known")

# list path
else:
dtype = find_first_non_none(values)
Expand Down Expand Up @@ -183,12 +231,11 @@ def __sub__(self, other) -> Series:
return wrap_s(f(other))

def __truediv__(self, other) -> Series:
if isinstance(other, Series):
return Series.from_pyseries(self._s.div(other._s))
f = getattr(self._s, f"div_{self.dtype}", None)
if f is None:
return NotImplemented
return wrap_s(f(other))
if not self.is_float():
out_dtype = "f64"
else:
out_dtype = DTYPE_TO_FFINAME[self.dtype]
return np.true_divide(self, other, dtype=out_dtype)

def __mul__(self, other) -> Series:
if isinstance(other, Series):
Expand Down Expand Up @@ -418,6 +465,12 @@ def is_numeric(self) -> bool:
return False
return True

def is_float(self) -> bool:
dtype = self.dtype
if dtype[0] == "f":
return True
return False

def view(self) -> np.ndarray:
dtype = self.dtype
if dtype == "u8":
Expand Down Expand Up @@ -468,13 +521,15 @@ def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
args.append(arg.view())
else:
return NotImplemented
(out, ptr) = aligned_array_f64(self.len())
kwargs["out"] = out
ufunc(*args, **kwargs)
# get method for current dtype
f = getattr(self._s, f"unsafe_from_ptr_{self.dtype}")
return wrap_s(f(ptr, self.len()))

if "dtype" in kwargs:
dtype = kwargs.pop("dtype")
else:
dtype = self.dtype

f = get_ffi_func("apply_ufunc_<>", dtype, self._s)
series = f(lambda out: ufunc(*args, out=out, **kwargs), self.len())
return wrap_s(series)
else:
return NotImplemented

Expand Down
42 changes: 1 addition & 41 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,14 @@
use numpy::PyArray1;
use crate::{dataframe::PyDataFrame, series::PySeries};
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;

pub mod dataframe;
pub mod error;
pub mod npy;
pub mod series;

use crate::{dataframe::PyDataFrame, series::PySeries};
use polars::chunked_array::builder::AlignedVec;
use polars::prelude::*;

macro_rules! create_aligned_buffer {
($name:ident, $type:ty) => {
#[pyfunction]
pub fn $name(size: usize) -> (Py<PyArray1<$type>>, usize) {
let (buf, ptr) = npy::aligned_array::<$type>(size);
(buf, ptr as usize)
}
};
}

create_aligned_buffer!(aligned_array_f32, f32);
create_aligned_buffer!(aligned_array_f64, f64);
create_aligned_buffer!(aligned_array_i8, i8);
create_aligned_buffer!(aligned_array_i16, i16);
create_aligned_buffer!(aligned_array_i32, i32);
create_aligned_buffer!(aligned_array_i64, i64);
create_aligned_buffer!(aligned_array_u8, u8);
create_aligned_buffer!(aligned_array_u16, u16);
create_aligned_buffer!(aligned_array_u32, u32);
create_aligned_buffer!(aligned_array_u64, u64);

#[pyfunction]
pub fn series_from_ptr_f64(name: &str, ptr: usize, len: usize) -> PySeries {
let v: Vec<f64> = unsafe { npy::vec_from_ptr(ptr, len) };
let av = AlignedVec::new(v).unwrap();
let ca = ChunkedArray::new_from_aligned_vec(name, av);
PySeries::new(Series::Float64(ca))
}

#[pymodule]
fn pypolars(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PySeries>().unwrap();
m.add_class::<PyDataFrame>().unwrap();
m.add_wrapped(wrap_pyfunction!(aligned_array_f32)).unwrap();
m.add_wrapped(wrap_pyfunction!(aligned_array_f64)).unwrap();
m.add_wrapped(wrap_pyfunction!(aligned_array_i32)).unwrap();
m.add_wrapped(wrap_pyfunction!(aligned_array_i64)).unwrap();
m.add_wrapped(wrap_pyfunction!(series_from_ptr_f64))
.unwrap();
Ok(())
}
59 changes: 56 additions & 3 deletions py-polars/src/series.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::error::PyPolarsEr;
use crate::npy;
use crate::npy::aligned_array;
use numpy::PyArray1;
use polars::chunked_array::builder::get_bitmap;
use polars::prelude::*;
use pyo3::types::PyList;
use pyo3::types::{PyList, PyTuple};
use pyo3::{exceptions::RuntimeError, prelude::*, Python};

#[pyclass]
Expand Down Expand Up @@ -358,6 +359,58 @@ impl PySeries {
}
}

macro_rules! impl_ufuncs {
($name:ident, $type:ty, $unsafe_from_ptr_method:ident) => {
#[pymethods]
impl PySeries {
// applies a ufunc by accepting a lambda out: ufunc(*args, out=out)
// the out array is allocated in this method, send to Python and once the ufunc is applied
// ownership is taken by Rust again to prevent memory leak.
// if the ufunc fails, we first must take ownership back.
pub fn $name(&self, lambda: &PyAny, size: usize) -> PyResult<PySeries> {
// numpy array object, and a *mut ptr
let (out_array, ptr) = aligned_array::<$type>(size);
let gil = Python::acquire_gil();
let py = gil.python();

// TODO: check if we can initialize such that we have a ref count of 1.
// why is it already 2 here?
assert_eq!(out_array.get_refcnt(py), 2);
// inserting it in a tuple increase the reference count by 1.
let args = PyTuple::new(py, &[out_array]);

// whatever the result, we must take the leaked memory ownership back
let s = match lambda.call1(args) {
Ok(out) => {
// if this assert fails, the lambda has taken a reference to the object, so we must panic
assert_eq!(out.get_refcnt(), 3);
self.$unsafe_from_ptr_method(ptr as usize, size)
}
Err(e) => {
// first take ownership from the leaked memory
// so the destructor gets called when we go out of scope
self.$unsafe_from_ptr_method(ptr as usize, size);
// return error information
return Err(e);
}
};

Ok(s)
}
}
};
}
impl_ufuncs!(apply_ufunc_f32, f32, unsafe_from_ptr_f32);
impl_ufuncs!(apply_ufunc_f64, f64, unsafe_from_ptr_f64);
impl_ufuncs!(apply_ufunc_u8, u8, unsafe_from_ptr_u8);
impl_ufuncs!(apply_ufunc_u16, u16, unsafe_from_ptr_u16);
impl_ufuncs!(apply_ufunc_u32, u32, unsafe_from_ptr_u32);
impl_ufuncs!(apply_ufunc_u64, u64, unsafe_from_ptr_u64);
impl_ufuncs!(apply_ufunc_i8, i8, unsafe_from_ptr_i8);
impl_ufuncs!(apply_ufunc_i16, i16, unsafe_from_ptr_i16);
impl_ufuncs!(apply_ufunc_i32, i32, unsafe_from_ptr_i32);
impl_ufuncs!(apply_ufunc_i64, i64, unsafe_from_ptr_i64);

macro_rules! impl_set_with_mask {
($name:ident, $native:ty, $cast:ident, $variant:ident) => {
fn $name(series: &Series, filter: &PySeries, value: Option<$native>) -> Result<Series> {
Expand Down Expand Up @@ -446,11 +499,11 @@ impl_get!(get_i16, Int16, i16);
impl_get!(get_i32, Int32, i32);
impl_get!(get_i64, Int64, i64);

// Not public methods.
macro_rules! impl_unsafe_from_ptr {
($name:ident, $series_variant:ident) => {
#[pymethods]
impl PySeries {
pub fn $name(&self, ptr: usize, len: usize) -> Self {
fn $name(&self, ptr: usize, len: usize) -> Self {
let v = unsafe { npy::vec_from_ptr(ptr, len) };
let av = AlignedVec::new(v).unwrap();
let (null_count, null_bitmap) = get_bitmap(self.series.chunks()[0].as_ref());
Expand Down
22 changes: 2 additions & 20 deletions py-polars/tests/test_series.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from pypolars import Series
from pypolars.ffi import aligned_array_f32
import numpy as np


Expand Down Expand Up @@ -38,12 +37,12 @@ def test_arithmetic():
b = a

assert ((a * b) == [1, 4]).sum() == 2
assert ((a / b) == [1, 1]).sum() == 2
assert ((a / b) == [1.0, 1.0]).sum() == 2
assert ((a + b) == [2, 4]).sum() == 2
assert ((a - b) == [0, 0]).sum() == 2
assert ((a + 1) == [2, 3]).sum() == 2
assert ((a - 1) == [0, 1]).sum() == 2
assert ((a / 1) == [1, 2]).sum() == 2
assert ((a / 1) == [1.0, 2.0]).sum() == 2
assert ((a * 2) == [2, 4]).sum() == 2
assert ((1 + a) == [2, 3]).sum() == 2
assert ((1 - a) == [0, -1]).sum() == 2
Expand Down Expand Up @@ -136,23 +135,6 @@ def test_view():
assert np.all(a.view() == np.array([1, 2, 3]))


def test_numpy_interface():
# this isn't used anymore.
a, ptr = aligned_array_f32(10)
assert a.dtype == np.float32
assert a.shape == (10,)
pointer, read_only_flag = a.__array_interface__["data"]
# set read only flag to False
a.__array_interface__["data"] = (pointer, False)
# the __array_interface is used to create a new array (pointing to the same memory)
b = np.array(a)
# now the memory is writeable
b[0] = 1

# TODO: sent pointer to Rust and take ownership of array.
# https://stackoverflow.com/questions/37988849/safer-way-to-expose-a-c-allocated-memory-buffer-using-numpy-ctypes


def test_ufunc():
a = Series("a", [1.0, 2.0, 3.0, 4.0])
b = np.multiply(a, 4)
Expand Down

0 comments on commit ce8cab0

Please sign in to comment.