Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion xfp/tests/xiter/test_xiter_methods.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from dataclasses import dataclass
import itertools

import pytest
from xfp import XRBranch, Xeither, Xiter, tupled
from xfp import tupled, XRBranch, Xeither, Xiter, Xlist


def compare[X](actual: Xiter[X], expected: Xiter[X]) -> bool:
Expand All @@ -11,6 +12,15 @@ def compare[X](actual: Xiter[X], expected: Xiter[X]) -> bool:
return True


def test_xiter__init__not_iterable():
with pytest.raises(TypeError):
Xiter(123)


def test_xiter__repr__defined():
assert repr(Xiter([1, 2, 3])).startswith("<list_iterator")


def test_xiter_copy():
r1 = Xiter([1, 2, 3])
r2 = r1.copy()
Expand Down Expand Up @@ -126,6 +136,42 @@ def test_xiter_map():
assert compare(actual, expected)


def test_xiter_slice_does_not_copy():
input = Xiter(range(0, 10))
_ = input.slice(4)
assert compare(input, Xiter(range(0, 10)))


def test_xiter_slice_stop_arg():
input = Xiter(range(0, 10))
assert compare(input.slice(4), Xiter(range(4)))


def test_xiter_slice_stop_none_arg():
input = Xiter(range(0, 10))
assert compare(input.slice(None), Xiter(range(0, 10)))


def test_xiter_slice_start_stop_args():
input = Xiter(range(0, 10))
assert compare(input.slice(1, 7), Xiter(range(1, 7)))


def test_xiter_slice_start_stop_step_args():
input = Xiter(range(0, 100))
assert compare(input.slice(1, 60, 3), Xiter(range(1, 60, 3)))


def test_xiter_slice_start_stop_step_none_args():
input = Xiter(range(0, 100))
assert compare(input.slice(None, None, None), Xiter(range(0, 100)))


def test_xiter_slice_too_many_args():
with pytest.raises(TypeError):
Xiter([1]).slice(1, 2, 3, 4)


def test_xiter_tail():
input = Xiter([1, 2, 3])
assert compare(input.tail(), Xiter([2, 3]))
Expand All @@ -151,6 +197,28 @@ def test_xiter_tail_fx_fail():
assert isinstance(actual.value, IndexError) and actual.branch == XRBranch.LEFT


def test_xiter_takewhile():
input = Xiter(itertools.count(0, 1))
actual = input.takewhile(lambda x: x < 10)
expected = Xiter(range(10))
assert compare(actual, expected)
assert next(input) == 0


def test_xiter_takeuntil():
input = Xiter(itertools.count(0, 1))
actual = input.takeuntil(lambda x: x >= 10)
expected = Xiter(range(10))
assert compare(actual, expected)
assert next(input) == 0


def test_xiter_to_xlist():
input = Xiter(range(5))
assert input.to_Xlist() == Xlist([0, 1, 2, 3, 4])
assert next(input) == 0


def test_xiter_zip():
in1 = Xiter([1, 2, 3])
in2 = Xiter([4, 5])
Expand Down
105 changes: 101 additions & 4 deletions xfp/xiter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from copy import deepcopy
from itertools import tee
import itertools
from typing import Callable, Iterable, Iterator, Any, cast
from typing import Callable, Iterable, Iterator, Any, cast, overload
from collections.abc import Iterable as ABCIterable

from xfp import Xresult, Xlist, Xtry
Expand Down Expand Up @@ -60,6 +60,28 @@ def __getitem__(self, i: int) -> X:
"""
return self.get(i)

def takewhile(self, predicate: Callable[[X], bool]) -> "Xiter[X]":
"""Return a new iterator that stops yielding elements when predicate = False.

Do not consume the original iterator.

Useful to limit an infinite Xiter with a predicate.

### Usage

```python
from xfp import Xiter
import itertools

until_xiter = (
Xiter(itertools.count(start=0,step=2)) # -> Xiter([0,2,4,6,8,...])
.takewhile(lambda x: x<6) # -> Xiter([0,2,4])
)
```
"""

return Xiter(itertools.takewhile(predicate, self.copy().__iter))

def copy(self):
"""Return a new Xiter, tee-ed from self.

Expand Down Expand Up @@ -214,7 +236,7 @@ def filter(self, predicate: Callable[[X], bool]) -> "Xiter[X]":
def foreach(self, statement: Callable[[X], Any]) -> None:
"""Do the 'statement' procedure once for each element of the iterator.

Do not consume the original iterator
Do not consume the original iterator.

### Usage

Expand Down Expand Up @@ -278,10 +300,85 @@ def flat_map(self, f: Callable[[X], Iterable[E]]) -> "Xiter[E]":
"""
return self.map(f).flatten()

def take(self, n: int) -> "Xiter[E]":
"""Return a new iterator limited to the first 'n' elements.
Return a copy if the original iterator has less than 'n' elements.
Return an empty Xiter if n is negative.

Do not consume the original iterator.

### Usage

```python
from xfp import Xiter
import itertools

infinite_xiter = Xiter(itertools.repeat(42)) # -> Xiter([42,42,42,...])
until_xiter = infinite_xiter.take(3) # -> Xiter([42,42])
```
"""

return Xiter(self.slice(n))

def takeuntil(self, predicate: Callable[[X], bool]) -> "Xiter[X]":
"""Return a new iterator that stops yielding elements when predicate = True.

Do not consume the original iterator.

Useful to limit an infinite Xiter with a predicate.

### Usage

```python
from xfp import Xiter
import itertools

infinite_xiter = Xiter(itertools.count(start=0,step=2)) # -> Xiter([0,2,4,6,8,...])
until_xiter = infinite_xiter.takeuntil(lambda x: x >=6) # -> Xiter([0,2,4])
```
"""

return self.takewhile(lambda x: not predicate(x))

@overload
def slice(self, stop: int | None, /): ...

@overload
def slice(self, start: int | None, stop: int | None, step: int | None = 1, /): ...

def slice(self, *args):
"""Return an new Xiter with selected elements from the Xiter.
Works like sequence slicing but does not support negative values
for start, stop, or step.

Do not consume the original iterator.

If start is zero or None, iteration starts at zero.
Otherwise, elements from the iterable are skipped until start is reached.

If stop is None, iteration continues until the input is exhausted,
if at all. Otherwise, it stops at the specified position.

If step is None, the step defaults to one.
Elements are returned consecutively unless step is set higher than
one which results in items being skipped.
"""
__iter_copy = self.copy()

if len(args) not in (1, 2, 3):
raise TypeError(
"slice expected from 1 to 3 positional arguments: 'stop' | 'start' 'stop' ['step']"
)

return Xiter(itertools.islice(__iter_copy, *args))

def zip(self, other: Iterable[E]) -> "Xiter[tuple[X, E]]":
"""Zip this iterator with another iterable."""
return Xiter(zip(self.copy(), other))

def to_Xlist(self) -> "Xlist[X]":
"""Return an Xlist being the evaluated version of self."""
return Xlist(self)
"""Return an Xlist being the evaluated version of self.

Do not consume the original iterator.
"""
return Xlist(self.copy())