Skip to content

Commit

Permalink
LazyFrame slice support - efficient paths only (#3970)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-beedie committed Jul 11, 2022
1 parent 35bb7bf commit 9c54ee3
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 37 deletions.
122 changes: 105 additions & 17 deletions py-polars/polars/internals/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,6 @@ def date_range(

FrameOrSeries = Union["pli.DataFrame", "pli.Series"]

# TODO:
# class LazyPolarsSlice:


class PolarsSlice:
"""
Expand All @@ -287,6 +284,7 @@ class PolarsSlice:
start: int
stride: int
slice_length: int
is_unbounded: bool
obj: FrameOrSeries

def __init__(self, obj: FrameOrSeries):
Expand Down Expand Up @@ -318,17 +316,9 @@ def _slice_negative(self, obj: "pli.LazyFrame") -> "pli.LazyFrame":
"""
Logic for slices with negative stride.
"""
# apply slice before reversing (more efficient)
stride = abs(self.stride)
lazyslice = obj.slice(self.stop + 1, self.slice_length)

# potential early-exit if single row
if self.slice_length == 1:
return lazyslice
else:
# reverse frame, applying 'take_every' if stride > 1
lazyslice = lazyslice.reverse()
return lazyslice.take_every(stride) if (stride > 1) else lazyslice
lazyslice = obj.slice(self.stop + 1, self.slice_length).reverse()
return lazyslice.take_every(stride) if (stride > 1) else lazyslice

def _slice_setup(self, s: slice) -> None:
"""
Expand All @@ -344,8 +334,6 @@ def _slice_setup(self, s: slice) -> None:
else:
self.is_unbounded = (stop == -1) and (start >= obj_len - 1)

self._positive_indices = start >= 0 and stop >= 0

# determine slice length
if self.obj.is_empty():
self.slice_length = 0
Expand Down Expand Up @@ -377,14 +365,114 @@ def apply(self, s: slice) -> FrameOrSeries:
elif self.is_unbounded and self.stride in (-1, 1):
return self.obj.reverse() if (self.stride < 0) else self.obj.clone()

elif self._positive_indices and self.stride == 1:
elif self.start >= 0 and self.stop >= 0 and self.stride == 1:
return self.obj.slice(self.start, self.slice_length)

elif self.stride < 0 and self.slice_length == 1:
return self.obj.slice(self.stop + 1, 1)
else:
# multi-operation call; make lazy
# multi-operation calls; make lazy
lazyobj = self._lazify(self.obj)
sliced = (
self._slice_positive(lazyobj)
if self.stride > 0
else self._slice_negative(lazyobj)
)
return self._as_original(sliced, self.obj)


class LazyPolarsSlice:
"""
Apply python slice object to Polars LazyFrame. Only slices with efficient
computation paths mapping directly to existing lazy methods are supported.
"""

obj: "pli.LazyFrame"

def __init__(self, obj: "pli.LazyFrame"):
self.obj = obj

def apply(self, s: slice) -> "pli.LazyFrame":
"""
Apply a slice operation. Note that LazyFrame is designed primarily for efficient
computation and does not know its own length so, unlike DataFrame, certain slice
patterns (such as those requiring negative stop/step) may not be supported.
"""
start = s.start or 0
step = s.step or 1

# fail on operations that require length to do efficiently
if s.stop and s.stop < 0:
raise ValueError("Negative stop is not supported for lazy slices")
if step < 0 and (start > 0 or s.stop is not None) and (start != s.stop):
if not (start > 0 > step and s.stop is None):
raise ValueError(
"Negative stride is not supported in conjunction with start+stop"
)

# ---------------------------------------
# empty slice patterns.
# ---------------------------------------
# [:0]
# [i:<=i]
# [i:>=i:-k]
if step > 0 and (s.stop is not None and start >= s.stop):
return self.obj.cleared()
elif step < 0 and (s.stop is not None and s.stop >= s.start >= 0):
return self.obj.cleared()

# ---------------------------------------
# straight-though mappings for "reverse"
# and/or "take_every"
# ---------------------------------------
# [:] => clone()
# [::k] => take_every(k),
# [::-1] => reverse(),
# [::-k] => reverse().take_every(abs(k))
elif start == 0 and s.stop is None:
if step == 1:
return self.obj.clone()
elif step > 1:
return self.obj.take_every(step)
elif step == -1:
return self.obj.reverse()
elif step < -1:
return self.obj.reverse().take_every(abs(step))

elif start > 0 > step and s.stop is None:
obj = self.obj.head(s.start + 1).reverse()
return obj if (abs(step) == 1) else obj.take_every(abs(step))

# ---------------------------------------
# straight-though mappings for "head"
# ---------------------------------------
# [:j] => head(j)
# [:j:k] => head(j).take_every(k)
elif start == 0 and (s.stop or 0) >= 1:
obj = self.obj.head(s.stop)
return obj if (step == 1) else obj.take_every(step)

# ---------------------------------------
# straight-though mappings for "tail"
# ---------------------------------------
# [-i:] => tail(abs(i))
# [-i::k] => tail(abs(i)).take_every(k)
elif start < 0 and s.stop is None:
obj = self.obj.tail(abs(start))
return obj if (step == 1) else obj.take_every(step)

# ---------------------------------------
# straight-though mappings for "slice"
# ---------------------------------------
# [i:] => slice(i)
# [i:j] => slice(i,j-i)
# [i:j:k] => slice(i,j-i).take_every(k)
elif start > 0 and (s.stop is None or s.stop >= 0):
slice_length = None if (s.stop is None) else (s.stop - start)
obj = self.obj.slice(start, slice_length)
return obj if (step == 1) else obj.take_every(step)

raise ValueError(
f"The given slice {s} is not supported by lazy computation; consider a "
f"more efficient approach, or construct explicitly with other methods"
)
15 changes: 11 additions & 4 deletions py-polars/polars/internals/lazy_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from polars import internals as pli
from polars.cfg import Config
from polars.datatypes import DataType, py_type_to_dtype
from polars.internals.functions import LazyPolarsSlice
from polars.utils import (
_in_notebook,
_prepare_row_count_args,
Expand Down Expand Up @@ -352,10 +353,12 @@ def _scan_python_function(
)
return self

def __getitem__(self, item: int | range | slice) -> None:
raise TypeError(
"'LazyFrame' object is not subscriptable. Use 'select()' or 'filter()' instead."
)
def __getitem__(self: LDF, item: int | range | slice) -> LazyFrame:
if not isinstance(item, slice):
raise TypeError(
"'LazyFrame' object is not subscriptable (aside from slicing). Use 'select()' or 'filter()' instead."
)
return LazyPolarsSlice(self).apply(item)

def pipe(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
"""
Expand Down Expand Up @@ -1763,6 +1766,10 @@ def slice(self: LDF, offset: int, length: int | None = None) -> LDF:
└─────┴─────┴─────┘
"""
if length and length < 0:
raise ValueError(
f"Negative slice lengths ({length}) are invalid for LazyFrame"
)
return self._from_pyldf(self._ldf.slice(offset, length))

def limit(self: LDF, n: int = 5) -> LDF:
Expand Down
26 changes: 12 additions & 14 deletions py-polars/polars/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@


def assert_frame_equal(
left: DataFrame,
right: DataFrame,
left: DataFrame | LazyFrame,
right: DataFrame | LazyFrame,
check_dtype: bool = True,
check_exact: bool = False,
check_column_names: bool = True,
Expand All @@ -85,7 +85,6 @@ def assert_frame_equal(
"""
Raise detailed AssertionError if `left` does not equal `right`.
Parameters
----------
left
Expand All @@ -112,25 +111,24 @@ def assert_frame_equal(
>>> pl.testing.assert_frame_equal(df1, df2) # doctest: +SKIP
"""

obj = "DataFrame"
if isinstance(left, LazyFrame) and isinstance(right, LazyFrame):
left, right = left.collect(), right.collect()
obj = "LazyFrame"
else:
obj = "DataFrame"

if not (isinstance(left, DataFrame) and isinstance(right, DataFrame)):
raise_assert_detail(obj, "Type mismatch", type(left), type(right))

if left.shape[0] != right.shape[0]:
elif left.shape[0] != right.shape[0]:
raise_assert_detail(obj, "Length mismatch", left.shape, right.shape)

# this assumes we want it in the same order
union_cols = list(set(left.columns).union(set(right.columns)))
for c in union_cols:
if c not in right.columns:
raise AssertionError(
f"column {c} in left dataframe, but not in right dataframe"
)
raise AssertionError(f"column {c} in left frame, but not in right")
if c not in left.columns:
raise AssertionError(
f"column {c} in right dataframe, but not in left dataframe"
)
raise AssertionError(f"column {c} in right frame, but not in left")

if check_column_names:
if left.columns != right.columns:
Expand All @@ -139,8 +137,8 @@ def assert_frame_equal(
# this does not assume a particular order
for c in left.columns:
_assert_series_inner(
left[c],
right[c],
left[c], # type: ignore
right[c], # type: ignore
check_dtype,
check_exact,
nans_compare_equal,
Expand Down
25 changes: 25 additions & 0 deletions py-polars/tests/test_lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,31 @@ def test_take_every() -> None:
assert_frame_equal(expected_df, df.take_every(2).collect())


def test_slice() -> None:
ldf = pl.DataFrame({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]}).lazy()
expected = pl.DataFrame({"a": [3, 4], "b": ["c", "d"]}).lazy()
for slice_params in (
[2, 10], # slice > len(df)
[2, 4], # slice == len(df)
[2], # optional len
):
assert_frame_equal(ldf.slice(*slice_params), expected)

for py_slice in (
slice(1, 2),
slice(0, 3, 2),
slice(-3, None),
slice(None, 2, 2),
slice(3, None, -1),
slice(1, None, -2),
):
# confirm frame slice matches python slice
assert ldf[py_slice].collect().rows() == ldf.collect().rows()[py_slice]

assert_frame_equal(ldf[::-1], ldf.reverse())
assert_frame_equal(ldf[::-2], ldf.reverse().take_every(2))


def test_agg() -> None:
df = pl.DataFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0]})
ldf = df.lazy().min()
Expand Down
2 changes: 1 addition & 1 deletion py-polars/tests_parametric/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_frame_slice(df: pl.DataFrame) -> None:
#
# given the average number of rows in the frames, and the value of
# max_examples, this will result in close to 5000 test permutations,
# running in around ~3 secs (depending on hardware/etc).
# running in around ~1.5 secs (depending on hardware/etc).
py_data = df.rows()

for start, stop, step, _ in py_data:
Expand Down
53 changes: 52 additions & 1 deletion py-polars/tests_parametric/test_lazyframe.py
Original file line number Diff line number Diff line change
@@ -1 +1,52 @@
# TODO:
from hypothesis import given, settings
from hypothesis.strategies import integers

import polars as pl
from polars.testing import column, dataframes


@given(
ldf=dataframes(
max_size=10,
lazy=True,
cols=[
column(
"start",
dtype=pl.Int8,
null_probability=0.15,
strategy=integers(min_value=-4, max_value=6),
),
column(
"stop",
dtype=pl.Int8,
null_probability=0.15,
strategy=integers(min_value=-2, max_value=8),
),
column(
"step",
dtype=pl.Int8,
null_probability=0.15,
strategy=integers(min_value=-3, max_value=3).filter(lambda x: x != 0),
),
column("misc", dtype=pl.Int32),
],
)
)
@settings(max_examples=500)
def test_lazyframe_slice(ldf: pl.LazyFrame) -> None:
py_data = ldf.collect().rows()

for start, stop, step, _ in py_data:
s = slice(start, stop, step)
sliced_py_data = py_data[s]
try:
sliced_df_data = ldf[s].collect().rows()
assert (
sliced_py_data == sliced_df_data
), f"slice [{start}:{stop}:{step}] failed on lazy df w/len={len(py_data)}"

except ValueError as err:
# test params will trigger some known
# unsupported cases; filter them here.
if "not supported" not in str(err):
raise

0 comments on commit 9c54ee3

Please sign in to comment.