Skip to content

Commit

Permalink
ISPN-5553 Refactor WaitMode to FutureMode
Browse files Browse the repository at this point in the history
* WaitMode refactored to FutureMode so that it better represents that
  it's only relevant for how the completable futures work.
* WaitMode for Traversables is gone since it does not make much sense
  given that its collect methods do pretty much the same.
* Added Traversable.collect(Collector) since it offers ease of use when
  it comes to doing most common collect operations, and with the
  addition of CacheCollectors.serializableCollector, it's much easier
  than ever before to marshaller Collector instances.
  • Loading branch information
galderz committed Aug 21, 2015
1 parent fc3c007 commit 8cde312
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 148 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package org.infinispan.commons.api.functional;

import org.infinispan.commons.util.CloseableIterator;

import java.util.concurrent.CompletableFuture;

/**
* An easily extensible parameter that allows functional map operations to be
* tweaked. Apart from {@link org.infinispan.commons.api.functional.Param.WaitMode}, examples
* tweaked. Apart from {@link org.infinispan.commons.api.functional.Param.FutureMode}, examples
* would include local-only parameter, skip-cache-store parameter and others.
*
* What makes {@link Param} different from {@link MetaParam} is that {@link Param}
Expand All @@ -16,17 +14,13 @@
* Since {@link Param} instances control how the internals work, only
* {@link Param} implementations by Infinispan will be supported.
*
* DESIGN RATIONALES:
* <ul>
* <il>This interface is equivalent to Infinispan's Flag, but it's more powerful
* because it allows to pass a flag along with a value. Infinispan's Flag
* are enum based which means no values can be passed along with value.
* </il>
* <il>Since each param is an independent entity, it's easy to create public
* versus private parameter distinction. When parameters are stored in enums,
* it's more difficult to make such distinction.
* </il>
* </ul>
* @apiNote This interface is equivalent to Infinispan's Flag, but it's more
* powerful because it allows to pass a flag along with a value. Infinispan's
* Flag are enum based which means no values can be passed along with value.
*
* @apiNote Since each param is an independent entity, it's easy to create
* public versus private parameter distinction. When parameters are stored in
* enums, it's more difficult to make such distinction.
*
* @param <P> type of parameter
*/
Expand All @@ -35,12 +29,9 @@ public interface Param<P> {
/**
* A parameter's identifier. Each parameter must have a different id.
*
* DESIGN RATIONALES:
* <ul>
* <il>Why does a Param need an id? The most efficient way to store
* multiple parameters is to keep them in an array. An integer-based id
* means it can act as index of the array.</il>
* </ul>
* @implNote Why does a Param need an id? The most efficient way to store
* multiple parameters is to keep them in an array. An integer-based id
* means it can act as index of the array.
*/
int id();

Expand All @@ -50,47 +41,51 @@ public interface Param<P> {
P get();

/**
* Wait mode controls whether the functional operation is blocking or
* not blocking.
* When a method defines {@link CompletableFuture} as a return type, it
* implies the method called will be called asynchronously and that the
* {@link CompletableFuture} returned will be completed once the method's
* work is complete.
*
* By default, functional map operations are non-blocking,
* so for {@link CompletableFuture} returns, these be asynchronously
* completed, and for {@link Traversable} or {@link CloseableIterator}
* returns, they will be computed without waiting for all the results
* to be available.
* So, calling a method that returns {@link CompletableFuture} normally
* implies that the method will allocate a thread to do that job. However,
* there are situations when the user calls a method that returns
* {@link CompletableFuture} and immediately calls {@link CompletableFuture#get()}
* or similar methods to get the result. Calling such methods result in the
* caller thread blocking in which case, having such method spawn another
* thread is a waste of resources. So, for such situations, the caller can
* pass in the {@link #COMPLETED} param so that the internal logic avoids
* creating a separate thread, since the caller thread will block to get
* the result immediately.
*
* If blocking, functional map operations will block
* until the operations are completed. So, when an operation returns a
* {@link CompletableFuture}, it'll already be completed. For operations
* that {@link Traversable} or {@link CloseableIterator}, these will
* already be pre-computed and the navigation will happen over the
* already computed values.
* By default, all methods returning {@link CompletableFuture} are
* asynchronous, hence using the {@link #ASYNC} future mode.
*/
enum WaitMode implements Param<WaitMode> {
BLOCKING {
enum FutureMode implements Param<FutureMode> {
ASYNC {
@Override
public WaitMode get() {
return BLOCKING;
public FutureMode get() {
return ASYNC;
}
}, NON_BLOCKING {
},
COMPLETED {
@Override
public WaitMode get() {
return NON_BLOCKING;
public FutureMode get() {
return COMPLETED;
}
};

public static final int ID = ParamIds.WAIT_MODE_ID;
public static final int ID = ParamIds.FUTURE_MODE_ID;

@Override
public int id() {
return ID;
}

/**
* Provides default wait mode.
* Provides default future mode.
*/
public static WaitMode defaultValue() {
return NON_BLOCKING;
public static FutureMode defaultValue() {
return ASYNC;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
public final class ParamIds {

public static final int WAIT_MODE_ID = 0;
public static final int FUTURE_MODE_ID = 0;

private ParamIds() {
// Cannot be instantiated, it's just a holder class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collector;

/**
* Unsorted traversable stream. Design is inspired on {@link java.util.stream.Stream}.
Expand Down Expand Up @@ -145,9 +146,28 @@ public interface Traversable<T> extends AutoCloseable {
* given consumer. The combiner can be used to combine accumulated results
* executed in paralell or coming from different nodes in a distributed
* environment.
*
* In distributed environments where some keys are remote, the
* {@link Supplier} and {@link BiConsumer} instances passed in are sent to
* other nodes and hence they need to be marshallable. If the collect
* operation can be defined using the helper methods in
* {@link java.util.stream.Collectors}, it is recommended that those are
* used, which can easily be made marshalled using the
* org.infinispan.stream.CacheCollectors#serializableCollector method.
*/
<R> R collect(Supplier<R> s, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);

/**
* Transforms the traversable into a result container using a
* {@code Collector}.
*
* In distributed environments where some keys are remote, the
* {@link Collector} instance passed in is sent other nodes and hence it
* needs to be marshallable. This can easily be made achieved using the
* org.infinispan.stream.CacheCollectors#serializableCollector method.
*/
<R, A> R collect(Collector<? super T, A, R> collector);

/**
* Returns an optional containing the minimum element of this traversable
* based on the comparator passed in. If the traversable is empty,
Expand Down
23 changes: 22 additions & 1 deletion core/src/main/java/org/infinispan/functional/impl/Params.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package org.infinispan.functional.impl;

import org.infinispan.commons.api.functional.Param;
import org.infinispan.commons.api.functional.Param.FutureMode;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

/**
* Internal class that encapsulates collection of parameters used to tweak
Expand All @@ -25,7 +29,7 @@
final class Params {

private static final Param<?>[] DEFAULTS = new Param<?>[]{
Param.WaitMode.defaultValue(),
FutureMode.defaultValue(),
};

final Param<?>[] params;
Expand Down Expand Up @@ -86,4 +90,21 @@ public static Params from(Param<?>... ps) {
return new Params(paramsAll);
}

static <T> CompletableFuture<T> withFuture(Param<FutureMode> futureParam,
ExecutorService asyncExec, Supplier<T> s) {
switch (futureParam.get()) {
case COMPLETED:
// If completed, complete the future directly with the result.
// No separate thread or executor is instantiated.
return CompletableFuture.completedFuture(s.get());
case ASYNC:
// If async, execute the supply function asynchronously,
// and return a future that's completed when the supply
// function returns.
return CompletableFuture.supplyAsync(s, asyncExec);
default:
throw new IllegalStateException();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.infinispan.commons.api.functional.EntryView.ReadEntryView;
import org.infinispan.commons.api.functional.FunctionalMap.ReadOnlyMap;
import org.infinispan.commons.api.functional.Param;
import org.infinispan.commons.api.functional.Param.WaitMode;
import org.infinispan.commons.api.functional.Param.FutureMode;
import org.infinispan.commons.api.functional.Traversable;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.container.entries.CacheEntry;
Expand All @@ -22,8 +22,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.infinispan.functional.impl.WaitModes.withWaitFuture;
import static org.infinispan.functional.impl.WaitModes.withWaitTraversable;
import static org.infinispan.functional.impl.Params.withFuture;

public final class ReadOnlyMapImpl<K, V> extends AbstractFunctionalMap<K, V> implements ReadOnlyMap<K, V> {
private static final Log log = LogFactory.getLog(ReadOnlyMapImpl.class);
Expand All @@ -45,38 +44,35 @@ private static <K, V> ReadOnlyMap<K, V> create(Params params, FunctionalMapImpl<
@Override
public <R> CompletableFuture<R> eval(K key, Function<ReadEntryView<K, V>, R> f) {
log.tracef("Invoked eval(k=%s, %s)", key, params);
Param<WaitMode> waitMode = params.get(WaitMode.ID);
Param<FutureMode> futureMode = params.get(FutureMode.ID);
ReadOnlyKeyCommand cmd = fmap.cmdFactory().buildReadOnlyKeyCommand(key, f);
InvocationContext ctx = fmap.invCtxFactory().createInvocationContext(false, 1);
return withWaitFuture(waitMode, fmap.asyncExec(), () -> (R) fmap.chain().invoke(ctx, cmd));
return withFuture(futureMode, fmap.asyncExec(), () -> (R) fmap.chain().invoke(ctx, cmd));
}

@Override
public <R> Traversable<R> evalMany(Set<? extends K> keys, Function<ReadEntryView<K, V>, R> f) {
log.tracef("Invoked evalMany(m=%s, %s)", keys, params);
Param<WaitMode> waitMode = params.get(WaitMode.ID);
ReadOnlyManyCommand<K, V, R> cmd = fmap.cmdFactory().buildReadOnlyManyCommand(keys, f);
InvocationContext ctx = fmap.invCtxFactory().createInvocationContext(false, keys.size());
return withWaitTraversable(waitMode, () -> (Stream<R>) fmap.chain().invoke(ctx, cmd));
return Traversables.of((Stream<R>) fmap.chain().invoke(ctx, cmd));
}

@Override
public Traversable<K> keys() {
log.tracef("Invoked keys(%s)", params);
Param<WaitMode> waitMode = params.get(WaitMode.ID);
return withWaitTraversable(waitMode, () -> fmap.cache.keySet().stream());
return Traversables.of(fmap.cache.keySet().stream());
}

@Override
public Traversable<ReadEntryView<K, V>> entries() {
log.tracef("Invoked entries(%s)", params);
Param<WaitMode> waitMode = params.get(WaitMode.ID);
CloseableIterator<CacheEntry<K, V>> it = fmap.cache
.filterEntries(AcceptAllKeyValueFilter.getInstance()).iterator();
// TODO: Don't really need a Stream here...
Stream<CacheEntry<K, V>> stream = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(it, Spliterator.IMMUTABLE), false);
return withWaitTraversable(waitMode, () -> stream.map(EntryViews::readOnly));
return Traversables.of(stream.map(EntryViews::readOnly));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.infinispan.commons.api.functional.FunctionalMap.ReadWriteMap;
import org.infinispan.commons.api.functional.Listeners.ReadWriteListeners;
import org.infinispan.commons.api.functional.Param;
import org.infinispan.commons.api.functional.Param.WaitMode;
import org.infinispan.commons.api.functional.Param.FutureMode;
import org.infinispan.commons.api.functional.Traversable;
import org.infinispan.commons.util.CloseableIteratorSet;
import org.infinispan.context.InvocationContext;
Expand All @@ -22,8 +22,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;

import static org.infinispan.functional.impl.WaitModes.withWaitFuture;
import static org.infinispan.functional.impl.WaitModes.withWaitTraversable;
import static org.infinispan.functional.impl.Params.withFuture;

public final class ReadWriteMapImpl<K, V> extends AbstractFunctionalMap<K, V> implements ReadWriteMap<K, V> {
private static final Log log = LogFactory.getLog(ReadWriteMapImpl.class);
Expand All @@ -45,49 +44,49 @@ private static <K, V> ReadWriteMap<K, V> create(Params params, FunctionalMapImpl
@Override
public <R> CompletableFuture<R> eval(K key, Function<ReadWriteEntryView<K, V>, R> f) {
log.tracef("Invoked eval(k=%s, %s)", key, params);
Param<WaitMode> waitMode = params.get(WaitMode.ID);
Param<FutureMode> futureMode = params.get(FutureMode.ID);
ReadWriteKeyCommand cmd = fmap.cmdFactory().buildReadWriteKeyCommand(key, f);
InvocationContext ctx = fmap.invCtxFactory().createInvocationContext(true, 1);
ctx.setLockOwner(cmd.getLockOwner());
return withWaitFuture(waitMode, fmap.asyncExec(), () -> (R) fmap.chain().invoke(ctx, cmd));
return withFuture(futureMode, fmap.asyncExec(), () -> (R) fmap.chain().invoke(ctx, cmd));
}

@Override
public <R> CompletableFuture<R> eval(K key, V value, BiFunction<V, ReadWriteEntryView<K, V>, R> f) {
log.tracef("Invoked eval(k=%s, v=%s, %s)", key, value, params);
Param<WaitMode> waitMode = params.get(WaitMode.ID);
Param<FutureMode> futureMode = params.get(FutureMode.ID);
ReadWriteKeyValueCommand cmd = fmap.cmdFactory().buildReadWriteKeyValueCommand(key, value, f);
InvocationContext ctx = fmap.invCtxFactory().createInvocationContext(true, 1);
ctx.setLockOwner(cmd.getLockOwner());
return withWaitFuture(waitMode, fmap.asyncExec(), () -> (R) fmap.chain().invoke(ctx, cmd));
return withFuture(futureMode, fmap.asyncExec(), () -> (R) fmap.chain().invoke(ctx, cmd));
}

@Override
public <R> Traversable<R> evalMany(Map<? extends K, ? extends V> entries, BiFunction<V, ReadWriteEntryView<K, V>, R> f) {
log.tracef("Invoked evalMany(entries=%s, %s)", entries, params);
Param<WaitMode> waitMode = params.get(WaitMode.ID);
Param<FutureMode> futureMode = params.get(FutureMode.ID);
ReadWriteManyEntriesCommand cmd = fmap.cmdFactory().buildReadWriteManyEntriesCommand(entries, f);
InvocationContext ctx = fmap.invCtxFactory().createInvocationContext(true, entries.size());
return withWaitTraversable(waitMode, () -> ((List<R>) fmap.chain().invoke(ctx, cmd)).stream());
return Traversables.of(((List<R>) fmap.chain().invoke(ctx, cmd)).stream());
}

@Override
public <R> Traversable<R> evalMany(Set<? extends K> keys, Function<ReadWriteEntryView<K, V>, R> f) {
log.tracef("Invoked evalMany(keys=%s, %s)", keys, params);
Param<WaitMode> waitMode = params.get(WaitMode.ID);
Param<FutureMode> futureMode = params.get(FutureMode.ID);
ReadWriteManyCommand cmd = fmap.cmdFactory().buildReadWriteManyCommand(keys, f);
InvocationContext ctx = fmap.invCtxFactory().createInvocationContext(true, keys.size());
return withWaitTraversable(waitMode, () -> ((List<R>) fmap.chain().invoke(ctx, cmd)).stream());
return Traversables.of(((List<R>) fmap.chain().invoke(ctx, cmd)).stream());
}

@Override
public <R> Traversable<R> evalAll(Function<ReadWriteEntryView<K, V>, R> f) {
log.tracef("Invoked evalAll(%s)", params);
Param<WaitMode> waitMode = params.get(WaitMode.ID);
Param<FutureMode> futureMode = params.get(FutureMode.ID);
CloseableIteratorSet<K> keys = fmap.cache.keySet();
ReadWriteManyCommand cmd = fmap.cmdFactory().buildReadWriteManyCommand(keys, f);
InvocationContext ctx = fmap.invCtxFactory().createInvocationContext(true, keys.size());
return withWaitTraversable(waitMode, () -> ((List<R>) fmap.chain().invoke(ctx, cmd)).stream());
return Traversables.of(((List<R>) fmap.chain().invoke(ctx, cmd)).stream());
}

@Override
Expand Down
Loading

0 comments on commit 8cde312

Please sign in to comment.