Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-9812 Streaming Cluster Publisher #7523

Merged
merged 1 commit into from Nov 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -206,7 +206,7 @@ public interface Ids {
int SIMPLE_PUBLISHER_RESULT = 140;
int PUBLISHER_REDUCERS = 141;

int MARSHALLED_VALUE = 142;
int CACHE_STREAM_INTERMEDIATE_PUBLISHER = 142;
wburns marked this conversation as resolved.
Show resolved Hide resolved

int CACHE_STREAM_INTERMEDIATE_REDUCER = 143;

Expand All @@ -216,6 +216,8 @@ public interface Ids {
int IMMUTABLE_LIST_COPY = 147;
int INTERNAL_ENUMS = 148;

int PUBLISHER_RESPONSE = 149;

int COUNTER_CONFIGURATION = 2000; //from counter
int COUNTER_STATE = 2001; //from counter
}
Expand Up @@ -172,8 +172,7 @@ public static IntSet concurrentSet(int maxExclusive) {

/**
* Returns a copy of the given set that supports concurrent operations. The returned set will contain all of the
* ints the provided set contained. The returned set only supports up to the maximum size the previous int set
* supported when this method is invoked or the largest int it held.
* ints the provided set contained. The returned mutable IntSet can store values in the range of {@code 0..maxExclusive-1}
* @param intSet set to copy from
* @param maxExclusive the maximum value - 1 that can be inserted into the set
* @return concurrent copy
Expand Down
Expand Up @@ -9,7 +9,6 @@

import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.marshall.Ids;
import org.infinispan.commons.util.InjectiveFunction;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.encoding.DataConversion;
Expand All @@ -24,7 +23,7 @@
* the requested format.
*/
@Scope(Scopes.NAMED_CACHE)
public class EncoderEntryMapper<K, V, T extends Map.Entry<K, V>> implements InjectiveFunction<T, T> {
public class EncoderEntryMapper<K, V, T extends Map.Entry<K, V>> implements EncodingFunction<T> {
@Inject
transient InternalEntryFactory entryFactory;

Expand Down
Expand Up @@ -8,7 +8,6 @@

import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.marshall.Ids;
import org.infinispan.commons.util.InjectiveFunction;
import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.annotations.Inject;
Expand All @@ -22,7 +21,7 @@
* @since 9.1
*/
@Scope(Scopes.NONE)
public class EncoderKeyMapper<K> implements InjectiveFunction<K, K> {
public class EncoderKeyMapper<K> implements EncodingFunction<K> {
private final DataConversion dataConversion;

public EncoderKeyMapper(DataConversion dataConversion) {
Expand Down
Expand Up @@ -5,7 +5,6 @@
import java.io.ObjectOutput;
import java.util.Collections;
import java.util.Set;
import java.util.function.Function;

import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.marshall.Ids;
Expand All @@ -22,7 +21,7 @@
* @since 9.1
*/
@Scope(Scopes.NONE)
public class EncoderValueMapper<V> implements Function<V, V> {
public class EncoderValueMapper<V> implements EncodingFunction<V> {

private final DataConversion dataConversion;

Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/org/infinispan/cache/impl/EncodingFunction.java
@@ -0,0 +1,16 @@
package org.infinispan.cache.impl;

import java.util.function.UnaryOperator;

import org.infinispan.commons.util.InjectiveFunction;

/**
* This is a marker interface to signal that this function may perform an encoding of the provided value. The returned
* value therefore will always be equivalent to the provided value, but may be in a slightly different form (whether
* due to unwrapping, encoding or transcoding. This may allow certain optimizations knowing that the value is
* equivalent to what it was before.
* @author wburns
* @since 10.1
*/
public interface EncodingFunction<T> extends UnaryOperator<T>, InjectiveFunction<T, T> {
}
17 changes: 14 additions & 3 deletions core/src/main/java/org/infinispan/commands/CommandsFactory.java
Expand Up @@ -87,7 +87,10 @@
import org.infinispan.notifications.cachelistener.cluster.ClusterEvent;
import org.infinispan.notifications.cachelistener.cluster.MultiClusterEventCommand;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.PublisherRequestCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.CancelPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.InitialPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.NextPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.reduction.ReductionPublisherRequestCommand;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateRequestCommand;
Expand Down Expand Up @@ -607,15 +610,23 @@ <K, V, R> TxReadOnlyManyCommand<K, V, R> buildTxReadOnlyManyCommand(Collection<?

MultiKeyFunctionalBackupWriteCommand buildMultiKeyFunctionalBackupWriteCommand();

<K, R> PublisherRequestCommand<K> buildKeyPublisherCommand(boolean parallelStream, DeliveryGuarantee deliveryGuarantee,
<K, R> ReductionPublisherRequestCommand<K> buildKeyReductionPublisherCommand(boolean parallelStream, DeliveryGuarantee deliveryGuarantee,
IntSet segments, Set<K> keys, Set<K> excludedKeys, boolean includeLoader,
Function<? super Publisher<K>, ? extends CompletionStage<R>> transformer,
Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer);

<K, V, R> PublisherRequestCommand<K> buildEntryPublisherCommand(boolean parallelStream, DeliveryGuarantee deliveryGuarantee,
<K, V, R> ReductionPublisherRequestCommand<K> buildEntryReductionPublisherCommand(boolean parallelStream, DeliveryGuarantee deliveryGuarantee,
IntSet segments, Set<K> keys, Set<K> excludedKeys, boolean includeLoader,
Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> transformer,
Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer);

<K, I, R> InitialPublisherCommand<K, I, R> buildInitialPublisherCommand(Object requestId, DeliveryGuarantee deliveryGuarantee,
int batchSize, IntSet segments, Set<K> keys, Set<K> excludedKeys, boolean includeLoader, boolean entryStream,
boolean trackKeys, Function<? super Publisher<I>, ? extends Publisher<R>> transformer);

NextPublisherCommand buildNextPublisherCommand(Object requestId);

CancelPublisherCommand buildCancelPublisherCommand(Object requestId);

<K, V> MultiClusterEventCommand<K, V> buildMultiClusterEventCommand(Map<UUID, Collection<ClusterEvent<K, V>>> events);
}
Expand Up @@ -13,6 +13,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

import javax.transaction.xa.Xid;

import org.infinispan.Cache;
Expand Down Expand Up @@ -110,7 +111,10 @@
import org.infinispan.notifications.cachelistener.cluster.ClusterEvent;
import org.infinispan.notifications.cachelistener.cluster.MultiClusterEventCommand;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.PublisherRequestCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.CancelPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.InitialPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.NextPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.reduction.ReductionPublisherRequestCommand;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateRequestCommand;
Expand Down Expand Up @@ -620,23 +624,41 @@ public MultiKeyFunctionalBackupWriteCommand buildMultiKeyFunctionalBackupWriteCo
}

@Override
public <K, R> PublisherRequestCommand<K> buildKeyPublisherCommand(boolean parallelStream,
public <K, R> ReductionPublisherRequestCommand<K> buildKeyReductionPublisherCommand(boolean parallelStream,
DeliveryGuarantee deliveryGuarantee, IntSet segments, Set<K> keys, Set<K> excludedKeys, boolean includeLoader,
Function<? super Publisher<K>, ? extends CompletionStage<R>> transformer,
Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
return new PublisherRequestCommand<>(cacheName, parallelStream, deliveryGuarantee, segments, keys, excludedKeys,
return new ReductionPublisherRequestCommand<>(cacheName, parallelStream, deliveryGuarantee, segments, keys, excludedKeys,
includeLoader, false, transformer, finalizer);
}

@Override
public <K, V, R> PublisherRequestCommand<K> buildEntryPublisherCommand(boolean parallelStream,
public <K, V, R> ReductionPublisherRequestCommand<K> buildEntryReductionPublisherCommand(boolean parallelStream,
DeliveryGuarantee deliveryGuarantee, IntSet segments, Set<K> keys, Set<K> excludedKeys, boolean includeLoader,
Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> transformer,
Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
return new PublisherRequestCommand<>(cacheName, parallelStream, deliveryGuarantee, segments, keys, excludedKeys,
return new ReductionPublisherRequestCommand<>(cacheName, parallelStream, deliveryGuarantee, segments, keys, excludedKeys,
includeLoader, true, transformer, finalizer);
}

@Override
public <K, I, R> InitialPublisherCommand<K, I, R> buildInitialPublisherCommand(Object requestId, DeliveryGuarantee deliveryGuarantee,
int batchSize, IntSet segments, Set<K> keys, Set<K> excludedKeys, boolean includeLoader, boolean entryStream,
boolean trackKeys, Function<? super Publisher<I>, ? extends Publisher<R>> transformer) {
return init(new InitialPublisherCommand<>(cacheName, requestId, deliveryGuarantee, batchSize, segments, keys,
excludedKeys, includeLoader, entryStream, trackKeys, transformer));
}

@Override
public NextPublisherCommand buildNextPublisherCommand(Object requestId) {
return new NextPublisherCommand(cacheName, requestId);
}

@Override
public CancelPublisherCommand buildCancelPublisherCommand(Object requestId) {
return new CancelPublisherCommand(cacheName, requestId);
}

@Override
public <K, V> MultiClusterEventCommand<K, V> buildMultiClusterEventCommand(Map<UUID, Collection<ClusterEvent<K, V>>> events) {
return new MultiClusterEventCommand<>(cacheName, events);
Expand Down
Expand Up @@ -68,7 +68,10 @@
import org.infinispan.manager.impl.ReplicableManagerFunctionCommand;
import org.infinispan.manager.impl.ReplicableRunnableCommand;
import org.infinispan.notifications.cachelistener.cluster.MultiClusterEventCommand;
import org.infinispan.reactive.publisher.impl.PublisherRequestCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.CancelPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.InitialPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.NextPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.reduction.ReductionPublisherRequestCommand;
import org.infinispan.statetransfer.StateRequestCommand;
import org.infinispan.statetransfer.StateResponseCommand;
import org.infinispan.stream.impl.StreamIteratorCloseCommand;
Expand Down Expand Up @@ -355,12 +358,21 @@ public CacheRpcCommand fromStream(byte id, byte type, ByteString cacheName) {
case UpdateLastAccessCommand.COMMAND_ID:
command = new UpdateLastAccessCommand(cacheName);
break;
case PublisherRequestCommand.COMMAND_ID:
command = new PublisherRequestCommand<>(cacheName);
case ReductionPublisherRequestCommand.COMMAND_ID:
command = new ReductionPublisherRequestCommand<>(cacheName);
break;
case MultiClusterEventCommand.COMMAND_ID:
command = new MultiClusterEventCommand<>(cacheName);
break;
case InitialPublisherCommand.COMMAND_ID:
command = new InitialPublisherCommand<>(cacheName);
break;
case NextPublisherCommand.COMMAND_ID:
command = new NextPublisherCommand(cacheName);
break;
case CancelPublisherCommand.COMMAND_ID:
command = new CancelPublisherCommand(cacheName);
break;
default:
throw new CacheException("Unknown command id " + id + "!");
}
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/java/org/infinispan/context/Flag.java
Expand Up @@ -236,7 +236,13 @@ public enum Flag {
* normally need to use this flag. This is helpful if there are concerns that can cause just a simple size invocation
* from being consistent (eg. on-going transaction with modifications).
*/
SKIP_SIZE_OPTIMIZATION
SKIP_SIZE_OPTIMIZATION,

/**
* Flag that is used by keySet, entrySet and values methods so that they do not return transactional values. Normally
* an end user would not need to do this.
*/
IGNORE_TRANSACTION,
;

/**
Expand Down
Expand Up @@ -41,6 +41,7 @@ public class FlagBitSets {
public static final long ROLLING_UPGRADE = EnumUtil.bitSetOf(Flag.ROLLING_UPGRADE);
public static final long REMOTE_ITERATION = EnumUtil.bitSetOf(Flag.REMOTE_ITERATION);
public static final long SKIP_SIZE_OPTIMIZATION = EnumUtil.bitSetOf(Flag.SKIP_SIZE_OPTIMIZATION);
public static final long IGNORE_TRANSACTION = EnumUtil.bitSetOf(Flag.IGNORE_TRANSACTION);

/**
* Creates a copy of a Flag BitSet removing instances of FAIL_SILENTLY.
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.infinispan.persistence.manager.PreloadManager;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.reactive.publisher.impl.PublisherHandler;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.responses.ResponseGenerator;
import org.infinispan.remoting.rpc.RpcManager;
Expand Down Expand Up @@ -100,6 +101,7 @@ public class ComponentRegistry extends AbstractComponentRegistry {
private ComponentRef<OrderedUpdatesManager> orderedUpdatesManager;
private ComponentRef<PerCacheInboundInvocationHandler> inboundInvocationHandler;
private ComponentRef<PersistenceMarshaller> persistenceMarshaller;
private ComponentRef<PublisherHandler> publisherHandler;
private ComponentRef<RecoveryManager> recoveryManager;
private ComponentRef<ResponseGenerator> responseGenerator;
private ComponentRef<RpcManager> rpcManager;
Expand Down Expand Up @@ -376,6 +378,7 @@ public void cacheComponents() {
lockManager = basicComponentRegistry.getComponent(LockManager.class);
orderedUpdatesManager = basicComponentRegistry.getComponent(OrderedUpdatesManager.class);
persistenceMarshaller = basicComponentRegistry.getComponent(KnownComponentNames.PERSISTENCE_MARSHALLER, PersistenceMarshaller.class);
publisherHandler = basicComponentRegistry.getComponent(PublisherHandler.class);
recoveryManager = basicComponentRegistry.getComponent(RecoveryManager.class);
responseGenerator = basicComponentRegistry.getComponent(ResponseGenerator.class);
rpcManager = basicComponentRegistry.getComponent(RpcManager.class);
Expand Down Expand Up @@ -467,6 +470,11 @@ public ComponentRef<LocalPublisherManager> getLocalPublisherManager() {
return localPublisherManager;
}

public ComponentRef<PublisherHandler> getPublisherHandler() {
return publisherHandler;
}

@SuppressWarnings("unchecked")
public ComponentRef<LocalStreamManager> getLocalStreamManager() {
return localStreamManager;
}
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.infinispan.persistence.manager.PersistenceManagerStub;
import org.infinispan.persistence.manager.PreloadManager;
import org.infinispan.persistence.spi.MarshallableEntryFactory;
import org.infinispan.reactive.publisher.impl.PublisherHandler;
import org.infinispan.scattered.BiasManager;
import org.infinispan.scattered.ScatteredVersionManager;
import org.infinispan.scattered.impl.BiasManagerImpl;
Expand Down Expand Up @@ -88,7 +89,7 @@
XSiteStateTransferManager.class, XSiteStateConsumer.class, XSiteStateProvider.class,
FunctionalNotifier.class, CommandAckCollector.class, TriangleOrderManager.class,
OrderedUpdatesManager.class, ScatteredVersionManager.class, TransactionOriginatorChecker.class,
BiasManager.class, OffHeapEntryFactory.class, OffHeapMemoryAllocator.class})
BiasManager.class, OffHeapEntryFactory.class, OffHeapMemoryAllocator.class, PublisherHandler.class})
public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {

@Override
Expand Down Expand Up @@ -199,6 +200,8 @@ public Object construct(String componentName) {
} else if (componentName.equals(RemoteValueRetrievedListener.class.getName())) {
// L1Manager is currently only listener for remotely retrieved values
return ComponentAlias.of(L1Manager.class);
} else if (componentName.equals(PublisherHandler.class.getName())) {
return new PublisherHandler();
}

throw CONTAINER.factoryCannotConstructComponent(componentName);
Expand Down
Expand Up @@ -3,12 +3,14 @@
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.factories.annotations.DefaultFactoryFor;
import org.infinispan.factories.impl.ComponentAlias;
import org.infinispan.partitionhandling.PartitionHandling;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl;
import org.infinispan.reactive.publisher.impl.LocalClusterPublisherManagerImpl;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl;
import org.infinispan.reactive.publisher.impl.NonSegmentedLocalPublisherManagerImpl;
import org.infinispan.reactive.publisher.impl.PartitionAwareClusterPublisherManager;

/**
* Factory that allows creation of a {@link LocalPublisherManager} or {@link ClusterPublisherManager} based on the provided
Expand All @@ -33,6 +35,9 @@ public Object construct(String componentName) {
}
CacheMode cacheMode = configuration.clustering().cacheMode();
if (cacheMode.needsStateTransfer() && componentName.equals(ClusterPublisherManager.class.getName())) {
if (configuration.clustering().partitionHandling().whenSplit() == PartitionHandling.DENY_READ_WRITES) {
return new PartitionAwareClusterPublisherManager<>();
}
return new ClusterPublisherManagerImpl<>();
}
return ComponentAlias.of(LOCAL_CLUSTER_PUBLISHER);
Expand Down