Skip to content

Commit

Permalink
ISPN-9812 Implement streaming response publisher method
Browse files Browse the repository at this point in the history
  • Loading branch information
wburns committed Nov 14, 2019
1 parent f2af3d7 commit 17a8913
Show file tree
Hide file tree
Showing 81 changed files with 3,808 additions and 300 deletions.
Expand Up @@ -206,7 +206,7 @@ public interface Ids {
int SIMPLE_PUBLISHER_RESULT = 140;
int PUBLISHER_REDUCERS = 141;

int MARSHALLED_VALUE = 142;
int CACHE_STREAM_INTERMEDIATE_PUBLISHER = 142;

int CACHE_STREAM_INTERMEDIATE_REDUCER = 143;

Expand All @@ -216,6 +216,8 @@ public interface Ids {
int IMMUTABLE_LIST_COPY = 147;
int INTERNAL_ENUMS = 148;

int PUBLISHER_RESPONSE = 149;

int COUNTER_CONFIGURATION = 2000; //from counter
int COUNTER_STATE = 2001; //from counter
}
Expand Up @@ -172,8 +172,7 @@ public static IntSet concurrentSet(int maxExclusive) {

/**
* Returns a copy of the given set that supports concurrent operations. The returned set will contain all of the
* ints the provided set contained. The returned set only supports up to the maximum size the previous int set
* supported when this method is invoked or the largest int it held.
* ints the provided set contained. The returned mutable IntSet can store values in the range of {@code 0..maxExclusive-1}
* @param intSet set to copy from
* @param maxExclusive the maximum value - 1 that can be inserted into the set
* @return concurrent copy
Expand Down
Expand Up @@ -9,7 +9,6 @@

import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.marshall.Ids;
import org.infinispan.commons.util.InjectiveFunction;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.encoding.DataConversion;
Expand All @@ -24,7 +23,7 @@
* the requested format.
*/
@Scope(Scopes.NAMED_CACHE)
public class EncoderEntryMapper<K, V, T extends Map.Entry<K, V>> implements InjectiveFunction<T, T> {
public class EncoderEntryMapper<K, V, T extends Map.Entry<K, V>> implements EncodingFunction<T> {
@Inject
transient InternalEntryFactory entryFactory;

Expand Down
Expand Up @@ -8,7 +8,6 @@

import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.marshall.Ids;
import org.infinispan.commons.util.InjectiveFunction;
import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.annotations.Inject;
Expand All @@ -22,7 +21,7 @@
* @since 9.1
*/
@Scope(Scopes.NONE)
public class EncoderKeyMapper<K> implements InjectiveFunction<K, K> {
public class EncoderKeyMapper<K> implements EncodingFunction<K> {
private final DataConversion dataConversion;

public EncoderKeyMapper(DataConversion dataConversion) {
Expand Down
Expand Up @@ -5,7 +5,6 @@
import java.io.ObjectOutput;
import java.util.Collections;
import java.util.Set;
import java.util.function.Function;

import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.marshall.Ids;
Expand All @@ -22,7 +21,7 @@
* @since 9.1
*/
@Scope(Scopes.NONE)
public class EncoderValueMapper<V> implements Function<V, V> {
public class EncoderValueMapper<V> implements EncodingFunction<V> {

private final DataConversion dataConversion;

Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/org/infinispan/cache/impl/EncodingFunction.java
@@ -0,0 +1,16 @@
package org.infinispan.cache.impl;

import java.util.function.UnaryOperator;

import org.infinispan.commons.util.InjectiveFunction;

/**
* This is a marker interface to signal that this function may perform an encoding of the provided value. The returned
* value therefore will always be equivalent to the provided value, but may be in a slightly different form (whether
* due to unwrapping, encoding or transcoding. This may allow certain optimizations knowing that the value is
* equivalent to what it was before.
* @author wburns
* @since 10.1
*/
public interface EncodingFunction<T> extends UnaryOperator<T>, InjectiveFunction<T, T> {
}
17 changes: 14 additions & 3 deletions core/src/main/java/org/infinispan/commands/CommandsFactory.java
Expand Up @@ -87,7 +87,10 @@
import org.infinispan.notifications.cachelistener.cluster.ClusterEvent;
import org.infinispan.notifications.cachelistener.cluster.MultiClusterEventCommand;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.PublisherRequestCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.CancelPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.InitialPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.NextPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.reduction.ReductionPublisherRequestCommand;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateRequestCommand;
Expand Down Expand Up @@ -607,15 +610,23 @@ <K, V, R> TxReadOnlyManyCommand<K, V, R> buildTxReadOnlyManyCommand(Collection<?

MultiKeyFunctionalBackupWriteCommand buildMultiKeyFunctionalBackupWriteCommand();

<K, R> PublisherRequestCommand<K> buildKeyPublisherCommand(boolean parallelStream, DeliveryGuarantee deliveryGuarantee,
<K, R> ReductionPublisherRequestCommand<K> buildKeyReductionPublisherCommand(boolean parallelStream, DeliveryGuarantee deliveryGuarantee,
IntSet segments, Set<K> keys, Set<K> excludedKeys, boolean includeLoader,
Function<? super Publisher<K>, ? extends CompletionStage<R>> transformer,
Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer);

<K, V, R> PublisherRequestCommand<K> buildEntryPublisherCommand(boolean parallelStream, DeliveryGuarantee deliveryGuarantee,
<K, V, R> ReductionPublisherRequestCommand<K> buildEntryReductionPublisherCommand(boolean parallelStream, DeliveryGuarantee deliveryGuarantee,
IntSet segments, Set<K> keys, Set<K> excludedKeys, boolean includeLoader,
Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> transformer,
Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer);

<K, I, R> InitialPublisherCommand<K, I, R> buildInitialPublisherCommand(Object requestId, DeliveryGuarantee deliveryGuarantee,
int batchSize, IntSet segments, Set<K> keys, Set<K> excludedKeys, boolean includeLoader, boolean entryStream,
boolean trackKeys, Function<? super Publisher<I>, ? extends Publisher<R>> transformer);

NextPublisherCommand buildNextPublisherCommand(Object requestId);

CancelPublisherCommand buildCancelPublisherCommand(Object requestId);

<K, V> MultiClusterEventCommand<K, V> buildMultiClusterEventCommand(Map<UUID, Collection<ClusterEvent<K, V>>> events);
}
Expand Up @@ -13,6 +13,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

import javax.transaction.xa.Xid;

import org.infinispan.Cache;
Expand Down Expand Up @@ -110,7 +111,10 @@
import org.infinispan.notifications.cachelistener.cluster.ClusterEvent;
import org.infinispan.notifications.cachelistener.cluster.MultiClusterEventCommand;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.PublisherRequestCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.CancelPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.InitialPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.NextPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.reduction.ReductionPublisherRequestCommand;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateRequestCommand;
Expand Down Expand Up @@ -620,23 +624,41 @@ public MultiKeyFunctionalBackupWriteCommand buildMultiKeyFunctionalBackupWriteCo
}

@Override
public <K, R> PublisherRequestCommand<K> buildKeyPublisherCommand(boolean parallelStream,
public <K, R> ReductionPublisherRequestCommand<K> buildKeyReductionPublisherCommand(boolean parallelStream,
DeliveryGuarantee deliveryGuarantee, IntSet segments, Set<K> keys, Set<K> excludedKeys, boolean includeLoader,
Function<? super Publisher<K>, ? extends CompletionStage<R>> transformer,
Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
return new PublisherRequestCommand<>(cacheName, parallelStream, deliveryGuarantee, segments, keys, excludedKeys,
return new ReductionPublisherRequestCommand<>(cacheName, parallelStream, deliveryGuarantee, segments, keys, excludedKeys,
includeLoader, false, transformer, finalizer);
}

@Override
public <K, V, R> PublisherRequestCommand<K> buildEntryPublisherCommand(boolean parallelStream,
public <K, V, R> ReductionPublisherRequestCommand<K> buildEntryReductionPublisherCommand(boolean parallelStream,
DeliveryGuarantee deliveryGuarantee, IntSet segments, Set<K> keys, Set<K> excludedKeys, boolean includeLoader,
Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> transformer,
Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
return new PublisherRequestCommand<>(cacheName, parallelStream, deliveryGuarantee, segments, keys, excludedKeys,
return new ReductionPublisherRequestCommand<>(cacheName, parallelStream, deliveryGuarantee, segments, keys, excludedKeys,
includeLoader, true, transformer, finalizer);
}

@Override
public <K, I, R> InitialPublisherCommand<K, I, R> buildInitialPublisherCommand(Object requestId, DeliveryGuarantee deliveryGuarantee,
int batchSize, IntSet segments, Set<K> keys, Set<K> excludedKeys, boolean includeLoader, boolean entryStream,
boolean trackKeys, Function<? super Publisher<I>, ? extends Publisher<R>> transformer) {
return init(new InitialPublisherCommand<>(cacheName, requestId, deliveryGuarantee, batchSize, segments, keys,
excludedKeys, includeLoader, entryStream, trackKeys, transformer));
}

@Override
public NextPublisherCommand buildNextPublisherCommand(Object requestId) {
return new NextPublisherCommand(cacheName, requestId);
}

@Override
public CancelPublisherCommand buildCancelPublisherCommand(Object requestId) {
return new CancelPublisherCommand(cacheName, requestId);
}

@Override
public <K, V> MultiClusterEventCommand<K, V> buildMultiClusterEventCommand(Map<UUID, Collection<ClusterEvent<K, V>>> events) {
return new MultiClusterEventCommand<>(cacheName, events);
Expand Down
Expand Up @@ -68,7 +68,10 @@
import org.infinispan.manager.impl.ReplicableManagerFunctionCommand;
import org.infinispan.manager.impl.ReplicableRunnableCommand;
import org.infinispan.notifications.cachelistener.cluster.MultiClusterEventCommand;
import org.infinispan.reactive.publisher.impl.PublisherRequestCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.CancelPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.InitialPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.NextPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.reduction.ReductionPublisherRequestCommand;
import org.infinispan.statetransfer.StateRequestCommand;
import org.infinispan.statetransfer.StateResponseCommand;
import org.infinispan.stream.impl.StreamIteratorCloseCommand;
Expand Down Expand Up @@ -355,12 +358,21 @@ public CacheRpcCommand fromStream(byte id, byte type, ByteString cacheName) {
case UpdateLastAccessCommand.COMMAND_ID:
command = new UpdateLastAccessCommand(cacheName);
break;
case PublisherRequestCommand.COMMAND_ID:
command = new PublisherRequestCommand<>(cacheName);
case ReductionPublisherRequestCommand.COMMAND_ID:
command = new ReductionPublisherRequestCommand<>(cacheName);
break;
case MultiClusterEventCommand.COMMAND_ID:
command = new MultiClusterEventCommand<>(cacheName);
break;
case InitialPublisherCommand.COMMAND_ID:
command = new InitialPublisherCommand<>(cacheName);
break;
case NextPublisherCommand.COMMAND_ID:
command = new NextPublisherCommand(cacheName);
break;
case CancelPublisherCommand.COMMAND_ID:
command = new CancelPublisherCommand(cacheName);
break;
default:
throw new CacheException("Unknown command id " + id + "!");
}
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/java/org/infinispan/context/Flag.java
Expand Up @@ -236,7 +236,13 @@ public enum Flag {
* normally need to use this flag. This is helpful if there are concerns that can cause just a simple size invocation
* from being consistent (eg. on-going transaction with modifications).
*/
SKIP_SIZE_OPTIMIZATION
SKIP_SIZE_OPTIMIZATION,

/**
* Flag that is used by keySet, entrySet and values methods so that they do not return transactional values. Normally
* an end user would not need to do this.
*/
IGNORE_TRANSACTION,
;

/**
Expand Down
Expand Up @@ -41,6 +41,7 @@ public class FlagBitSets {
public static final long ROLLING_UPGRADE = EnumUtil.bitSetOf(Flag.ROLLING_UPGRADE);
public static final long REMOTE_ITERATION = EnumUtil.bitSetOf(Flag.REMOTE_ITERATION);
public static final long SKIP_SIZE_OPTIMIZATION = EnumUtil.bitSetOf(Flag.SKIP_SIZE_OPTIMIZATION);
public static final long IGNORE_TRANSACTION = EnumUtil.bitSetOf(Flag.IGNORE_TRANSACTION);

/**
* Creates a copy of a Flag BitSet removing instances of FAIL_SILENTLY.
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.infinispan.persistence.manager.PreloadManager;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.reactive.publisher.impl.PublisherHandler;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.responses.ResponseGenerator;
import org.infinispan.remoting.rpc.RpcManager;
Expand Down Expand Up @@ -100,6 +101,7 @@ public class ComponentRegistry extends AbstractComponentRegistry {
private ComponentRef<OrderedUpdatesManager> orderedUpdatesManager;
private ComponentRef<PerCacheInboundInvocationHandler> inboundInvocationHandler;
private ComponentRef<PersistenceMarshaller> persistenceMarshaller;
private ComponentRef<PublisherHandler> publisherHandler;
private ComponentRef<RecoveryManager> recoveryManager;
private ComponentRef<ResponseGenerator> responseGenerator;
private ComponentRef<RpcManager> rpcManager;
Expand Down Expand Up @@ -376,6 +378,7 @@ public void cacheComponents() {
lockManager = basicComponentRegistry.getComponent(LockManager.class);
orderedUpdatesManager = basicComponentRegistry.getComponent(OrderedUpdatesManager.class);
persistenceMarshaller = basicComponentRegistry.getComponent(KnownComponentNames.PERSISTENCE_MARSHALLER, PersistenceMarshaller.class);
publisherHandler = basicComponentRegistry.getComponent(PublisherHandler.class);
recoveryManager = basicComponentRegistry.getComponent(RecoveryManager.class);
responseGenerator = basicComponentRegistry.getComponent(ResponseGenerator.class);
rpcManager = basicComponentRegistry.getComponent(RpcManager.class);
Expand Down Expand Up @@ -467,6 +470,11 @@ public ComponentRef<LocalPublisherManager> getLocalPublisherManager() {
return localPublisherManager;
}

public ComponentRef<PublisherHandler> getPublisherHandler() {
return publisherHandler;
}

@SuppressWarnings("unchecked")
public ComponentRef<LocalStreamManager> getLocalStreamManager() {
return localStreamManager;
}
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.infinispan.persistence.manager.PersistenceManagerStub;
import org.infinispan.persistence.manager.PreloadManager;
import org.infinispan.persistence.spi.MarshallableEntryFactory;
import org.infinispan.reactive.publisher.impl.PublisherHandler;
import org.infinispan.scattered.BiasManager;
import org.infinispan.scattered.ScatteredVersionManager;
import org.infinispan.scattered.impl.BiasManagerImpl;
Expand Down Expand Up @@ -88,7 +89,7 @@
XSiteStateTransferManager.class, XSiteStateConsumer.class, XSiteStateProvider.class,
FunctionalNotifier.class, CommandAckCollector.class, TriangleOrderManager.class,
OrderedUpdatesManager.class, ScatteredVersionManager.class, TransactionOriginatorChecker.class,
BiasManager.class, OffHeapEntryFactory.class, OffHeapMemoryAllocator.class})
BiasManager.class, OffHeapEntryFactory.class, OffHeapMemoryAllocator.class, PublisherHandler.class})
public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {

@Override
Expand Down Expand Up @@ -199,6 +200,8 @@ public Object construct(String componentName) {
} else if (componentName.equals(RemoteValueRetrievedListener.class.getName())) {
// L1Manager is currently only listener for remotely retrieved values
return ComponentAlias.of(L1Manager.class);
} else if (componentName.equals(PublisherHandler.class.getName())) {
return new PublisherHandler();
}

throw CONTAINER.factoryCannotConstructComponent(componentName);
Expand Down
Expand Up @@ -3,12 +3,14 @@
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.factories.annotations.DefaultFactoryFor;
import org.infinispan.factories.impl.ComponentAlias;
import org.infinispan.partitionhandling.PartitionHandling;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl;
import org.infinispan.reactive.publisher.impl.LocalClusterPublisherManagerImpl;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl;
import org.infinispan.reactive.publisher.impl.NonSegmentedLocalPublisherManagerImpl;
import org.infinispan.reactive.publisher.impl.PartitionAwareClusterPublisherManager;

/**
* Factory that allows creation of a {@link LocalPublisherManager} or {@link ClusterPublisherManager} based on the provided
Expand All @@ -33,6 +35,9 @@ public Object construct(String componentName) {
}
CacheMode cacheMode = configuration.clustering().cacheMode();
if (cacheMode.needsStateTransfer() && componentName.equals(ClusterPublisherManager.class.getName())) {
if (configuration.clustering().partitionHandling().whenSplit() == PartitionHandling.DENY_READ_WRITES) {
return new PartitionAwareClusterPublisherManager<>();
}
return new ClusterPublisherManagerImpl<>();
}
return ComponentAlias.of(LOCAL_CLUSTER_PUBLISHER);
Expand Down

0 comments on commit 17a8913

Please sign in to comment.