-
-
Notifications
You must be signed in to change notification settings - Fork 521
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
120 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from django.core.paginator import EmptyPage, InvalidPage | ||
|
||
|
||
class CursorPaginator: | ||
def __init__(self, queryset, order_by, per_page): | ||
self.queryset = queryset | ||
self.order_by = order_by | ||
self.per_page = int(per_page) | ||
|
||
def get_page(self, start=0): | ||
if start < 0: | ||
raise InvalidPage() | ||
|
||
object_list = self._get_slice(start) | ||
if start and not object_list: | ||
raise EmptyPage() | ||
|
||
next_cursor = None | ||
if len(object_list) > self.per_page: | ||
next_slice_first_item = object_list.pop(-1) | ||
next_cursor = getattr(next_slice_first_item, self.order_by) | ||
|
||
return Page(start, object_list, next_cursor) | ||
|
||
def _get_slice(self, start): | ||
page_len = self.per_page + 1 | ||
if start: | ||
filter_name = "%s__gte" % self.order_by | ||
return self.queryset.filter(**{filter_name: start})[:page_len] | ||
return self.queryset[:page_len] | ||
|
||
|
||
class Page: | ||
def __init__(self, start, object_list, next_): | ||
self.start = start or None | ||
self.object_list = object_list | ||
self.next = next_ | ||
|
||
def __len__(self): | ||
return len(self.object_list) | ||
|
||
def has_next(self): | ||
return bool(self.next) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
import pytest | ||
|
||
from ..cursorpaginator import CursorPaginator, EmptyPage, InvalidPage | ||
|
||
|
||
@pytest.fixture | ||
def mock_objects(mocker): | ||
return [mocker.Mock(post=i) for i in range(1, 12)] | ||
|
||
|
||
@pytest.fixture | ||
def mock_queryset(mocker, mock_objects): | ||
return mocker.Mock(filter=mocker.Mock(return_value=mock_objects)) | ||
|
||
|
||
def test_paginator_returns_first_page(mock_objects): | ||
paginator = CursorPaginator(mock_objects, "post", 6) | ||
assert paginator.get_page() | ||
|
||
|
||
def test_first_page_has_no_start(mock_objects): | ||
paginator = CursorPaginator(mock_objects, "post", 6) | ||
assert paginator.get_page().start is None | ||
|
||
|
||
def test_first_page_has_correct_length(mock_objects): | ||
paginator = CursorPaginator(mock_objects, "post", 6) | ||
assert len(paginator.get_page().object_list) == 6 | ||
|
||
|
||
def test_first_page_has_correct_items(mock_objects): | ||
paginator = CursorPaginator(mock_objects, "post", 6) | ||
assert paginator.get_page().object_list == mock_objects[:6] | ||
|
||
|
||
def test_page_has_next_attr_pointing_to_first_item_of_next_page(mock_objects): | ||
paginator = CursorPaginator(mock_objects, "post", 6) | ||
assert paginator.get_page().next == 7 | ||
|
||
|
||
def test_page_can_be_tested_to_see_if_next_page_exists(mock_objects): | ||
paginator = CursorPaginator(mock_objects, "post", 6) | ||
assert paginator.get_page().has_next() | ||
|
||
|
||
def test_paginator_returns_empty_first_page_without_errors(): | ||
paginator = CursorPaginator([], "post", 6) | ||
assert paginator.get_page().object_list == [] | ||
|
||
|
||
def test_paginator_returns_page_starting_at_requested_address(mock_queryset): | ||
paginator = CursorPaginator(mock_queryset, "post", 6) | ||
assert paginator.get_page(7) | ||
|
||
|
||
def test_requesting_next_page_filters_queryset_using_filter_name(mock_queryset): | ||
paginator = CursorPaginator(mock_queryset, "post", 6) | ||
paginator.get_page(7) | ||
mock_queryset.filter.assert_called_once_with(post__gte=7) | ||
|
||
|
||
def test_requesting_next_page_limits_queryset_to_specified_length(mock_queryset): | ||
paginator = CursorPaginator(mock_queryset, "post", 6) | ||
assert len(paginator.get_page(7).object_list) == 6 | ||
|
||
|
||
def test_paginator_raises_empty_page_error_if_nth_page_is_empty(mocker): | ||
queryset = mocker.Mock(filter=lambda **_: []) | ||
paginator = CursorPaginator(queryset, "post", 6) | ||
with pytest.raises(EmptyPage): | ||
paginator.get_page(20) | ||
|
||
|
||
def test_paginator_raises_invalid_page_error_if_starting_position_is_negative(): | ||
paginator = CursorPaginator(None, None, 0) | ||
with pytest.raises(InvalidPage): | ||
paginator.get_page(-1) |