Skip to content

Commit

Permalink
Add limit option to ScanCursor.
Browse files Browse the repository at this point in the history
ScanCursor can be now limited to a maximum number of elements to be retrieved.

Closes #1575
  • Loading branch information
mp911de authored and christophstrobl committed Mar 25, 2021
1 parent 2924330 commit ad0aa4a
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 3 deletions.
Expand Up @@ -122,4 +122,12 @@ public long getPosition() {
return delegate.getPosition();
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.core.Cursor#limit(long)
*/
@Override
public Cursor<T> limit(long count) {
return new ConvertingCursor<>(delegate.limit(count), converter);
}
}
19 changes: 17 additions & 2 deletions src/main/java/org/springframework/data/redis/core/Cursor.java
Expand Up @@ -16,14 +16,19 @@
package org.springframework.data.redis.core;

import java.io.Closeable;
import java.util.Iterator;

import org.springframework.data.redis.util.BoundedIterator;

/**
* Cursor abstraction to scan over the keyspace or elements within a data structure using a variant of a {@code SCAN}
* command.
*
* @author Christoph Strobl
* @author Mark Paluch
* @param <T>
* @since 1.4
*/
public interface Cursor<T> extends Iterator<T>, Closeable {
public interface Cursor<T> extends BoundedIterator<T>, Closeable {

/**
* Get the reference cursor. <br>
Expand All @@ -50,4 +55,14 @@ public interface Cursor<T> extends Iterator<T>, Closeable {
*/
long getPosition();

/**
* Limit the maximum number of elements to be returned from this cursor. The returned cursor object can be used to
* iterate over the remaining items and to {@link #close() release} associated resources. The returned cursor is not
* attached to the state of {@code this} cursor and this object should be no longer used.
*
* @return a new {@link Cursor} with detached iteration state.
* @since 2.5
*/
@Override
Cursor<T> limit(long count);
}
56 changes: 56 additions & 0 deletions src/main/java/org/springframework/data/redis/core/ScanCursor.java
Expand Up @@ -22,6 +22,7 @@

import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/**
Expand All @@ -43,6 +44,7 @@ public abstract class ScanCursor<T> implements Cursor<T> {
private @Nullable Iterator<T> delegate;
private @Nullable final ScanOptions scanOptions;
private long position;
private final long limit;

/**
* Crates new {@link ScanCursor} with {@code id=0} and {@link ScanOptions#NONE}
Expand Down Expand Up @@ -81,6 +83,23 @@ public ScanCursor(long cursorId, ScanOptions options) {
this.cursorId = cursorId;
this.state = CursorState.READY;
this.delegate = Collections.<T> emptyList().iterator();
this.limit = -1;
}

/**
* Crates a new {@link ScanCursor}.
*
* @param source
* @param limit
* @since 2.5
*/
private ScanCursor(ScanCursor<T> source, long limit) {

this.scanOptions = source.scanOptions;
this.cursorId = source.cursorId;
this.state = source.state;
this.delegate = source.delegate;
this.limit = limit;
}

private void scan(long cursorId) {
Expand Down Expand Up @@ -178,6 +197,10 @@ public boolean hasNext() {

assertCursorIsOpen();

if (limit != -1 && getPosition() > limit - 1) {
return false;
}

while (!delegate.hasNext() && !CursorState.FINISHED.equals(state)) {
scan(cursorId);
}
Expand Down Expand Up @@ -283,10 +306,43 @@ public long getPosition() {
return position;
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.core.Cursor#limit(long)
*/
@Override
public ScanCursor<T> limit(long count) {

Assert.isTrue(count >= 0, "Count must be greater or equal to zero");

return new ScanCursorWrapper<>(this, count);
}

/**
* @author Thomas Darimont
*/
enum CursorState {
READY, OPEN, FINISHED, CLOSED;
}

/**
* Wrapper for a concrete {@link ScanCursor} forwarding {@link #doScan(long, ScanOptions)}.
*
* @param <T>
* @since 2.5
*/
private static class ScanCursorWrapper<T> extends ScanCursor<T> {

private final ScanCursor<T> delegate;

public ScanCursorWrapper(ScanCursor<T> delegate, long limit) {
super(delegate, limit);
this.delegate = delegate;
}

@Override
protected ScanIteration<T> doScan(long cursorId, ScanOptions options) {
return delegate.doScan(cursorId, options);
}
}
}
@@ -0,0 +1,36 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.util;

import java.util.Iterator;

/**
* Extension to {@link Iterator} that can be {@link #limit(long) limited} to a maximum number of items.
*
* @author Mark Paluch
* @since 2.5
*/
public interface BoundedIterator<T> extends Iterator<T> {

/**
* Limit the maximum number of elements to return. The limit is only applied to the returned instance and not applied
* to {@code this} iterator.
*
* @param count the maximum number of elements of iterator to return. Must be greater or equal to zero.
* @return a new instance of {@link BoundedIterator} with {@code count} applied.
*/
BoundedIterator<T> limit(long count);
}
Expand Up @@ -17,7 +17,6 @@

import static org.assertj.core.api.Assertions.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -32,7 +31,10 @@
import org.springframework.dao.InvalidDataAccessApiUsageException;

/**
* Unit tests for {@link ScanCursor}.
*
* @author Christoph Strobl
* @author Mark Paluch
*/
class ScanCursorUnitTests {

Expand Down Expand Up @@ -198,6 +200,43 @@ void hasNextShouldStopCorrectlyWhenWholeScanIterationDoesNotReturnResultsAndStat
assertThat(cursor.getCursorId()).isEqualTo(0L);
}

@Test // GH-1575
void limitShouldApplyLimitation() {

LinkedList<ScanIteration<String>> values = new LinkedList<>();
values.add(createIteration(1, "spring"));
values.add(createIteration(2, "data"));
values.add(createIteration(3, "redis"));
values.add(createIteration(0));
Cursor<String> cursor = initCursor(values).limit(2);

List<String> result = new ArrayList<>();
while (cursor.hasNext()) {
result.add(cursor.next());
}

assertThat(result).hasSize(2).contains("spring", "data");
}

@Test // GH-1575
void limitShouldNotLimitOriginalCursor() {

LinkedList<ScanIteration<String>> values = new LinkedList<>();
values.add(createIteration(1, "spring"));
values.add(createIteration(2, "data"));
values.add(createIteration(3, "redis"));
values.add(createIteration(0));
Cursor<String> cursor = initCursor(values);
cursor.limit(1);

List<String> result = new ArrayList<>();
while (cursor.hasNext()) {
result.add(cursor.next());
}

assertThat(result).hasSize(3);
}

private CapturingCursorDummy initCursor(Queue<ScanIteration<String>> values) {
CapturingCursorDummy cursor = new CapturingCursorDummy(values);
cursor.open();
Expand Down

0 comments on commit ad0aa4a

Please sign in to comment.