Skip to content

Commit

Permalink
Fix StreamBuilder bugs. This closes #35, closes #36 and closes #37
Browse files Browse the repository at this point in the history
  • Loading branch information
minborg committed Sep 14, 2015
1 parent 6ebb315 commit 0b7449b
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 192 deletions.
Expand Up @@ -16,12 +16,23 @@
*/
package com.speedment.internal.core.stream.builder;

import com.speedment.exception.SpeedmentException;
import com.speedment.internal.core.stream.autoclose.AbstractAutoClosingStream;
import com.speedment.internal.core.stream.builder.pipeline.PipelineImpl;
import com.speedment.internal.core.stream.builder.streamterminator.StreamTerminator;
import com.speedment.internal.core.stream.builder.action.Action;
import com.speedment.util.StreamComposition;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import static java.util.Objects.requireNonNull;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.DoubleSupplier;
import java.util.function.IntSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.BaseStream;

/**
*
Expand All @@ -31,18 +42,25 @@
*/
public abstract class AbstractStreamBuilder<T extends AbstractStreamBuilder<T, P>, P> {

protected final String UNSUPPORTED_BECAUSE_OF_CLOSE_MAY_NOT_BE_CALLED = "This method has been disabled for this Stream type because improper use will "
+ "lead to resources not being freed up. "
+ "We regret any inconvenience caused by this. "
+ "If you want to concatenate two or more stream, please use the " + StreamComposition.class.getName() + "#concatAndAutoClose() method instead.";

protected final PipelineImpl<?> pipeline;
protected final StreamTerminator streamTerminator;
private final List<Runnable> closeHandlers;
protected final Set<BaseStream<?, ?>> streamSet; // Keeps track of the chain of streams so that we can auto-close them all
private final List<Runnable> closeHandlers; // The close handlers for this particular stream
private boolean parallel;
private boolean ordered;
private boolean closed;

protected AbstractStreamBuilder(PipelineImpl<?> pipeline, StreamTerminator streamTerminator) {
protected AbstractStreamBuilder(PipelineImpl<?> pipeline, StreamTerminator streamTerminator, Set<BaseStream<?, ?>> streamSet) {
this.pipeline = requireNonNull(pipeline);
this.streamTerminator = requireNonNull(streamTerminator);
this.closeHandlers = new ArrayList<>();
this.ordered = true;
this.streamSet = streamSet;
}

protected T append(Action<?, ?> newAction) {
Expand Down Expand Up @@ -78,8 +96,18 @@ public T onClose(Runnable closeHandler) {

public void close() {
if (!closed) {
closeHandlers.forEach(Runnable::run);
closed = true;
try {
AbstractAutoClosingStream.composedRunnable(closeHandlers); // Run this stream's close handlers
} catch (Exception e) {
throw new SpeedmentException(e);
} finally {
try {
AbstractAutoClosingStream.composedClose(streamSet.toArray(new AutoCloseable[0])); // Close the other streams
} catch (Exception e) {
throw new SpeedmentException(e);
}
}
}
}

Expand All @@ -99,12 +127,56 @@ private T self() {
return thizz;
}

protected <T> T finallyClose(T t) {
protected <T> boolean finallyClose(BooleanSupplier bs) {
try {
return t;
return bs.getAsBoolean();
} finally {
close();
}
}

protected <T> long finallyClose(LongSupplier lp) {
try {
return lp.getAsLong();
} finally {
close();
}
}

protected <T> int finallyClose(IntSupplier is) {
try {
return is.getAsInt();
} finally {
close();
}
}

protected <T> double finallyClose(DoubleSupplier ds) {
try {
return ds.getAsDouble();
} finally {
close();
}
}

protected <T> void finallyClose(Runnable r) {
try {
r.run();
} finally {
close();
}
}

protected <T> T finallyClose(Supplier<T> s) {
try {
return s.get();
} finally {
close();
}
}

protected static Set<BaseStream<?, ?>> newStreamSet() {
return new HashSet<>();
}

}
Expand Up @@ -35,6 +35,7 @@
import static java.util.Objects.requireNonNull;
import java.util.OptionalDouble;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.function.BiConsumer;
import java.util.function.DoubleBinaryOperator;
Expand All @@ -46,6 +47,7 @@
import java.util.function.DoubleUnaryOperator;
import java.util.function.ObjDoubleConsumer;
import java.util.function.Supplier;
import java.util.stream.BaseStream;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
Expand All @@ -57,8 +59,13 @@
*/
public final class DoubleStreamBuilder extends AbstractStreamBuilder<DoubleStreamBuilder, DoublePipeline> implements DoubleStream {

DoubleStreamBuilder(final PipelineImpl<?> pipeline, final StreamTerminator streamTerminator, Set<BaseStream<?, ?>> streamSet) {
super(pipeline, streamTerminator, streamSet);
streamSet.add(this); // Add this new stream to the streamSet so it may be closed later
}

public DoubleStreamBuilder(final PipelineImpl<?> pipeline, final StreamTerminator streamTerminator) {
super(pipeline, streamTerminator);
this(pipeline, streamTerminator, newStreamSet());
}

@Override
Expand All @@ -76,19 +83,19 @@ public DoubleStream map(DoubleUnaryOperator mapper) {
@Override
public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
requireNonNull(mapper);
return new ReferenceStreamBuilder<U>(pipeline, streamTerminator).append(new DoubleMapToObjAction<>(mapper));
return new ReferenceStreamBuilder<U>(pipeline, streamTerminator, streamSet).append(new DoubleMapToObjAction<>(mapper));
}

@Override
public IntStream mapToInt(DoubleToIntFunction mapper) {
requireNonNull(mapper);
return new IntStreamBuilder(pipeline, streamTerminator).append(new DoubleMapToIntAction(mapper));
return new IntStreamBuilder(pipeline, streamTerminator, streamSet).append(new DoubleMapToIntAction(mapper));
}

@Override
public LongStream mapToLong(DoubleToLongFunction mapper) {
requireNonNull(mapper);
return new LongStreamBuilder(pipeline, streamTerminator).append(new DoubleMapToLongAction(mapper));
return new LongStreamBuilder(pipeline, streamTerminator, streamSet).append(new DoubleMapToLongAction(mapper));
}

@Override
Expand Down Expand Up @@ -126,7 +133,7 @@ public DoubleStream skip(long n) {

@Override
public Stream<Double> boxed() {
return new ReferenceStreamBuilder<Double>(pipeline, streamTerminator).append(new DoubleBoxedAction());
return new ReferenceStreamBuilder<Double>(pipeline, streamTerminator, streamSet).append(new DoubleBoxedAction());
}

/**
Expand Down Expand Up @@ -211,7 +218,7 @@ public double reduce(double identity, DoubleBinaryOperator op) {
@Override
public OptionalDouble reduce(DoubleBinaryOperator op) {
requireNonNull(op);
return finallyClose(streamTerminator.reduce(pipeline(), op));
return finallyClose(() -> streamTerminator.reduce(pipeline(), op));
}

/**
Expand All @@ -227,7 +234,7 @@ public <R> R collect(Supplier<R> supplier, ObjDoubleConsumer<R> accumulator, BiC
requireNonNull(supplier);
requireNonNull(accumulator);
requireNonNull(combiner);
return finallyClose(streamTerminator.collect(pipeline(), supplier, accumulator, combiner));
return finallyClose(() -> streamTerminator.collect(pipeline(), supplier, accumulator, combiner));
}

/**
Expand All @@ -240,11 +247,7 @@ public <R> R collect(Supplier<R> supplier, ObjDoubleConsumer<R> accumulator, BiC
*/
@Override
public double sum() {
try {
return streamTerminator.sum(pipeline());
} finally {
close();
}
return finallyClose(() -> streamTerminator.sum(pipeline()));
}

/**
Expand All @@ -257,7 +260,7 @@ public double sum() {
*/
@Override
public OptionalDouble min() {
return finallyClose(streamTerminator.min(pipeline()));
return finallyClose(() -> streamTerminator.min(pipeline()));
}

/**
Expand All @@ -270,7 +273,7 @@ public OptionalDouble min() {
*/
@Override
public OptionalDouble max() {
return finallyClose(streamTerminator.max(pipeline()));
return finallyClose(() -> streamTerminator.max(pipeline()));
}

/**
Expand All @@ -283,11 +286,7 @@ public OptionalDouble max() {
*/
@Override
public long count() {
try {
return streamTerminator.count(pipeline());
} finally {
close();
}
return finallyClose(() -> streamTerminator.count(pipeline()));
}

/**
Expand All @@ -300,7 +299,7 @@ public long count() {
*/
@Override
public OptionalDouble average() {
return finallyClose(streamTerminator.average(pipeline()));
return finallyClose(() -> streamTerminator.average(pipeline()));
}

/**
Expand All @@ -313,7 +312,7 @@ public OptionalDouble average() {
*/
@Override
public DoubleSummaryStatistics summaryStatistics() {
return finallyClose(streamTerminator.summaryStatistics(pipeline()));
return finallyClose(() -> streamTerminator.summaryStatistics(pipeline()));
}

/**
Expand All @@ -327,11 +326,7 @@ public DoubleSummaryStatistics summaryStatistics() {
@Override
public boolean anyMatch(DoublePredicate predicate) {
requireNonNull(predicate);
try {
return streamTerminator.anyMatch(pipeline(), predicate);
} finally {
close();
}
return finallyClose(() -> streamTerminator.anyMatch(pipeline(), predicate));
}

/**
Expand All @@ -344,11 +339,7 @@ public boolean anyMatch(DoublePredicate predicate) {
*/
@Override
public boolean allMatch(DoublePredicate predicate) {
try {
return streamTerminator.allMatch(pipeline(), predicate);
} finally {
close();
}
return finallyClose(() -> streamTerminator.allMatch(pipeline(), predicate));
}

/**
Expand All @@ -361,11 +352,7 @@ public boolean allMatch(DoublePredicate predicate) {
*/
@Override
public boolean noneMatch(DoublePredicate predicate) {
try {
return streamTerminator.noneMatch(pipeline(), predicate);
} finally {
close();
}
return finallyClose(() -> streamTerminator.noneMatch(pipeline(), predicate));
}

/**
Expand All @@ -378,7 +365,7 @@ public boolean noneMatch(DoublePredicate predicate) {
*/
@Override
public OptionalDouble findFirst() {
return finallyClose(streamTerminator.findFirst(pipeline()));
return finallyClose(() -> streamTerminator.findFirst(pipeline()));
}

/**
Expand All @@ -391,35 +378,37 @@ public OptionalDouble findFirst() {
*/
@Override
public OptionalDouble findAny() {
return finallyClose(streamTerminator.findAny(pipeline()));
return finallyClose(() -> streamTerminator.findAny(pipeline()));
}

/**
* {@inheritDoc}
*
* <p>
* N.B. This method may short-circuit operations in the Stream pipeline and
* closes the stream automatically when a terminal operation is performed.
* N.B. This method may short-circuit operations in the Stream pipeline.
* <p>
* If you call this method, you <em>must</em> ensure to call the stream's
* {@link #close() } method or else resources may not be released properly.
*
* @return an iterator of primitive doubles
*/
@Override
public PrimitiveIterator.OfDouble iterator() {
return finallyClose(streamTerminator.iterator(pipeline()));
return streamTerminator.iterator(pipeline());
}

/**
* {@inheritDoc}
*
* <p>
* N.B. This method may short-circuit operations in the Stream pipeline and
* closes the stream automatically when a terminal operation is performed.
* N.B. This method may short-circuit operations in the Stream pipeline.
* <p>
* If you call this method, you <em>must</em> ensure to call the stream's
* {@link #close() } method or else resources may not be released properly.
*
* @return an spliterator of primitive doubles
*/
@Override
public Spliterator.OfDouble spliterator() {
return finallyClose(streamTerminator.spliterator(pipeline()));
return streamTerminator.spliterator(pipeline());
}

}

0 comments on commit 0b7449b

Please sign in to comment.