Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-10322 Create unified interface for initializing commands #6834

Merged
merged 4 commits into from Jun 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,6 +20,7 @@
import org.infinispan.commons.time.TimeService;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.util.logging.LogFactory;
Expand Down Expand Up @@ -108,12 +109,13 @@ public void createCache(String cacheName, String baseCacheName) {
if (configuration.clustering().cacheMode().isClustered()) {
AdvancedCache<?, ?> clusteredCache = cacheManager.getCache(baseCacheName).getAdvancedCache();
RpcManager rpc = clusteredCache.getRpcManager();
CommandsFactory factory = clusteredCache.getComponentRegistry().getComponent(CommandsFactory.class);
ComponentRegistry componentRegistry = clusteredCache.getComponentRegistry();
CommandsFactory factory = componentRegistry.getComponent(CommandsFactory.class);

CreateCacheCommand ccc = factory.buildCreateCacheCommand(cacheName, baseCacheName);
try {
rpc.invokeRemotely(null, ccc, rpc.getSyncRpcOptions());
ccc.init(cacheManager);
ccc.init(componentRegistry, false);
ccc.invoke();
} catch (Throwable e) {
throw log.cannotCreateClusteredCaches(e, cacheName);
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/java/org/infinispan/commands/CancelCommand.java
Expand Up @@ -8,6 +8,7 @@

import org.infinispan.commands.remote.BaseRpcCommand;
import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
Expand All @@ -19,7 +20,7 @@
* @author Vladimir Blagojevic
* @since 5.2
*/
public class CancelCommand extends BaseRpcCommand {
public class CancelCommand extends BaseRpcCommand implements InitializableCommand {

private static final Log log = LogFactory.getLog(CancelCommand.class);
public static final byte COMMAND_ID = 34;
Expand All @@ -40,8 +41,9 @@ public CancelCommand(ByteString ownerCacheName, UUID commandToCancel) {
this.commandToCancel = commandToCancel;
}

public void init(CancellationService service) {
this.service = service;
@Override
public void init(ComponentRegistry componentRegistry, boolean isRemote) {
this.service = componentRegistry.getCancellationService().running();
}

@Override
Expand Down
27 changes: 25 additions & 2 deletions core/src/main/java/org/infinispan/commands/CommandsFactory.java
Expand Up @@ -17,12 +17,15 @@
import javax.transaction.xa.Xid;

import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.functional.Mutation;
import org.infinispan.commands.functional.ReadOnlyKeyCommand;
import org.infinispan.commands.functional.ReadOnlyManyCommand;
import org.infinispan.commands.functional.ReadWriteKeyCommand;
import org.infinispan.commands.functional.ReadWriteKeyValueCommand;
import org.infinispan.commands.functional.ReadWriteManyCommand;
import org.infinispan.commands.functional.ReadWriteManyEntriesCommand;
import org.infinispan.commands.functional.TxReadOnlyKeyCommand;
import org.infinispan.commands.functional.TxReadOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyKeyCommand;
import org.infinispan.commands.functional.WriteOnlyKeyValueCommand;
import org.infinispan.commands.functional.WriteOnlyManyCommand;
Expand Down Expand Up @@ -73,6 +76,7 @@
import org.infinispan.commons.util.IntSet;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.functional.EntryView.ReadEntryView;
Expand Down Expand Up @@ -104,8 +108,18 @@

/**
* A factory to build commands, initializing and injecting dependencies accordingly. Commands built for a specific,
* named cache instance cannot be reused on a different cache instance since most commands contain the cache name it
* was built for along with references to other named-cache scoped components.
* named cache instance cannot be reused on a different cache instance since most commands contain the cache name it was
* built for along with references to other named-cache scoped components.
* <p>
* Commands returned by the various build*Command methods should be initialised sufficiently for local execution via the
* interceptor chain, with no calls to {@link #initializeReplicableCommand(ReplicableCommand, boolean)} required.
* However, for remote execution, it's assumed that a command will be initialized via {@link
* #initializeReplicableCommand(ReplicableCommand, boolean)} before being invoked.
* <p>
* Note, {@link InitializableCommand} implementations should not rely on access to the {@link
* org.infinispan.factories.ComponentRegistry} in their constructors for local execution initialization as this leads to
* duplicated code. Instead implementations of this interface should call {@link InitializableCommand#init(ComponentRegistry,
* boolean)} before returning the created command.
*
* @author Manik Surtani
* @author Mircea.Markus@jboss.com
Expand Down Expand Up @@ -562,6 +576,15 @@ <K, V, R> ReadWriteManyCommand<K, V, R> buildReadWriteManyCommand(Collection<?>

<K, V, T, R> ReadWriteManyEntriesCommand<K, V, T, R> buildReadWriteManyEntriesCommand(Map<?, ?> entries, BiFunction<T, ReadWriteEntryView<K, V>, R> f, Params params, DataConversion keyDataConversion, DataConversion valueDataConversion);

<K, V, R> TxReadOnlyKeyCommand<K, V, R> buildTxReadOnlyKeyCommand(Object key, Function<ReadEntryView<K, V>, R> f,
List<Mutation<K, V, ?>> mutations, int segment,
Params params, DataConversion keyDataConversion,
DataConversion valueDataConversion);

<K, V, R> TxReadOnlyManyCommand<K, V, R> buildTxReadOnlyManyCommand(Collection<?> keys, List<List<Mutation<K,V,?>>> mutations,
Params params, DataConversion keyDataConversion,
DataConversion valueDataConversion);

BackupAckCommand buildBackupAckCommand(long id, int topologyId);

BackupMultiKeyAckCommand buildBackupMultiKeyAckCommand(long id, int segment, int topologyId);
Expand Down