Skip to content

Commit

Permalink
bug fix: cached value in getNext() with limit() fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Komarov (k0m@) committed Jul 3, 2021
1 parent 1e2b80e commit 592cadb
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 14 deletions.
40 changes: 26 additions & 14 deletions src/main/java/pw/komarov/streamer/Streamer.java
Expand Up @@ -6,14 +6,14 @@

@SuppressWarnings({"WeakerAccess","unused","UnusedReturnValue"})
public final class Streamer<T> implements Stream<T>, Iterable<T> {
private Iterator<T> externalIterator; //source of data

/*
Constructing
*/

private final StreamerIterator streamerIterator;

private Streamer(Iterator<T> externalIterator) {
this.externalIterator = externalIterator;
this.streamerIterator = new StreamerIterator(externalIterator);
}

public static <T> Streamer<T> empty() {
Expand Down Expand Up @@ -129,7 +129,7 @@ public void close() {
}

private void internalClose() {
externalIterator = null;
streamerIterator.setExternalIterator(null);

state = State.CLOSED;
}
Expand All @@ -149,9 +149,15 @@ private void throwIfNotWaiting() {
Internal streamer iterator
*/

private final StreamerIterator streamerIterator = new StreamerIterator();

private class StreamerIterator implements Iterator<T> {
private Iterator<T> externalIterator; //source of data

public StreamerIterator(Iterator<T> externalIterator) {
this.externalIterator = externalIterator;
}

private boolean noNext;

private Boolean hasNext;
private T next;

Expand Down Expand Up @@ -214,7 +220,7 @@ private void calculateSorted() {
data.sort(sortedOperation.comparator);

//now, we can replace the iterator
externalIterator = data.iterator();
setExternalIterator(data.iterator());
}
}

Expand All @@ -230,19 +236,20 @@ private void calcNextAndHasNext() { //метод расчитывающий вн
@SuppressWarnings({"unchecked"})
private Optional<T> getNext(List<IntermediateOperation> operations) {
T next = null;
boolean terminated = false;

boolean hasNext = externalIterator.hasNext();
while (hasNext && !terminated) {
boolean hasNext = !noNext && externalIterator.hasNext();
while (hasNext) {
next = externalIterator.next();

boolean filtered = false;
for (IntermediateOperation operation : operations)
if (operation instanceof FilteringOperation) {
if (!filtered) {
filtered = ((FilteringOperation) operation).test(next);
if (filtered && operation instanceof LimitOperation)
terminated = true;
if (filtered && operation instanceof LimitOperation) {
filtered = false;
noNext = true;
}
}
} else if (operation instanceof MapOperation)
next = (T) ((MapOperation)operation).function.apply(next);
Expand All @@ -255,7 +262,7 @@ private Optional<T> getNext(List<IntermediateOperation> operations) {
hasNext = externalIterator.hasNext();
}

if (hasNext && !terminated) {
if (hasNext) {
for (Consumer<? super T> peekSequence : peekSequences)
peekSequence.accept(next);

Expand All @@ -265,6 +272,11 @@ private Optional<T> getNext(List<IntermediateOperation> operations) {
//noinspection OptionalAssignedToNull
return null;
}

public void setExternalIterator(Iterator<T> externalIterator) {
this.externalIterator = externalIterator;
noNext = false;
}
}

/*
Expand All @@ -289,7 +301,7 @@ private static class LimitOperation<E> implements FilteringOperation<E> {

@Override
public boolean test(E t) {
return maxSize < ++filteredByLimit;
return maxSize < ++filteredByLimit + 1;
}
}

Expand Down
33 changes: 33 additions & 0 deletions src/test/java/pw/komarov/streamer/CachedAfterLimitBugFixTests.java
@@ -0,0 +1,33 @@
package pw.komarov.streamer;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;

class CachedAfterLimitBugFixTests {
private final StringBuilder sb = new StringBuilder();
private final AtomicInteger ai = new AtomicInteger();

@BeforeEach
void beforeEach() {
sb.delete(0, sb.length());
ai.set(0);
}

@Test
void test1() {
Streamer.generate(ai::getAndIncrement).limit(10).forEach((v) -> sb.append(v).append(" "));
sb.append(ai.get());
assertEquals("0 1 2 3 4 5 6 7 8 9 10", sb.toString());
}

@Test
void test2() {
Streamer.generate(ai::getAndIncrement).limit(10).distinct().filter(i -> i != 5).limit(8).limit(16).forEach((v) -> sb.append(v).append(" "));
sb.append(ai.get());
assertEquals("0 1 2 3 4 6 7 8 9", sb.toString());
}
}

0 comments on commit 592cadb

Please sign in to comment.