Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

ISPN-2021 Expose ReplicationQueueImpl's config as new config type

  • Loading branch information...
commit 1e42f26ccc31013f8c249327754455e98ace9419 1 parent e9e9b6b
@galderz galderz authored maniksurtani committed
View
35 core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java
@@ -22,10 +22,13 @@
*/
package org.infinispan.remoting;
+import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.MultipleRpcCommand;
-import org.infinispan.config.Configuration;
+import org.infinispan.configuration.cache.AsyncConfiguration;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.LegacyConfigurationAdaptor;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
@@ -73,6 +76,7 @@
private CommandsFactory commandsFactory;
private volatile ScheduledFuture<?> scheduledFuture;
private boolean trace;
+ private String cacheName;
/**
* @return true if this replication queue is enabled, false otherwise.
@@ -84,11 +88,19 @@ public boolean isEnabled() {
@Inject
public void injectDependencies(@ComponentName(KnownComponentNames.ASYNC_REPLICATION_QUEUE_EXECUTOR) ScheduledExecutorService executor,
- RpcManager rpcManager, Configuration configuration, CommandsFactory commandsFactory) {
+ RpcManager rpcManager, org.infinispan.config.Configuration configuration,
+ CommandsFactory commandsFactory, Cache cache) {
+ injectDependencies(executor, rpcManager, configuration, commandsFactory, cache.getName());
+ }
+
+ public void injectDependencies(ScheduledExecutorService executor,
+ RpcManager rpcManager, org.infinispan.config.Configuration configuration,
+ CommandsFactory commandsFactory, String cacheName) {
this.rpcManager = rpcManager;
- this.configuration = configuration;
+ this.configuration = LegacyConfigurationAdaptor.adapt(configuration);
this.commandsFactory = commandsFactory;
this.scheduledExecutor = executor;
+ this.cacheName = cacheName;
}
/**
@@ -97,19 +109,20 @@ public void injectDependencies(@ComponentName(KnownComponentNames.ASYNC_REPLICAT
@Override
@Start
public void start() {
- long interval = configuration.getReplQueueInterval();
+ AsyncConfiguration async = configuration.clustering().async();
+ long interval = async.replQueueInterval();
trace = log.isTraceEnabled();
if (trace)
log.tracef("Starting replication queue, with interval %d and maxElements %s", interval, maxElements);
- this.maxElements = configuration.getReplQueueMaxElements();
+ this.maxElements = async.replQueueMaxElements();
// check again
- enabled = configuration.isUseReplQueue();
+ enabled = async.useReplQueue();
if (enabled && interval > 0) {
scheduledFuture = scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
- LogFactory.pushNDC(configuration.getName(), trace);
+ LogFactory.pushNDC(cacheName, trace);
try {
flush();
} finally {
@@ -159,7 +172,9 @@ public synchronized int flush() {
log.tracef("Flushing %s elements", toReplicateSize);
MultipleRpcCommand multipleRpcCommand = commandsFactory.buildReplicateCommand(toReplicate);
// send to all live caches in the cluster
- rpcManager.invokeRemotely(null, multipleRpcCommand, ResponseMode.getAsyncResponseMode(configuration), configuration.getSyncReplTimeout());
+ rpcManager.invokeRemotely(null, multipleRpcCommand,
+ ResponseMode.getAsyncResponseMode(configuration),
+ configuration.clustering().sync().replTimeout());
} catch (Throwable t) {
log.failedReplicatingQueue(toReplicate.size(), t);
}
@@ -174,6 +189,10 @@ public synchronized int flush() {
return toReplicate;
}
+ protected Configuration getConfiguration() {
+ return configuration;
+ }
+
@Override
public int getElementsCount() {
return elements.size();
View
9 core/src/main/java/org/infinispan/remoting/rpc/ResponseMode.java
@@ -49,8 +49,15 @@ public boolean isAsynchronous() {
return this == ASYNCHRONOUS || this == ASYNCHRONOUS_WITH_SYNC_MARSHALLING;
}
+ @Deprecated
public static ResponseMode getAsyncResponseMode(Configuration c) {
- return c.isUseAsyncMarshalling() ? ResponseMode.ASYNCHRONOUS : ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING;
+ return c.isUseAsyncMarshalling() ? ResponseMode.ASYNCHRONOUS
+ : ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING;
+ }
+
+ public static ResponseMode getAsyncResponseMode(org.infinispan.configuration.cache.Configuration c) {
+ return c.clustering().async().asyncMarshalling() ? ResponseMode.ASYNCHRONOUS
+ : ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING;
}
}
View
6 core/src/test/java/org/infinispan/replication/ReplQueueRestartTest.java
@@ -57,13 +57,11 @@ public void testReplQueueImpl() {
Configuration c = new Configuration();
c.setUseReplQueue(true);
-
- rqi.injectDependencies(ses, rpc, c, commandsFactory);
+ rqi.injectDependencies(ses, rpc, c, commandsFactory, "");
rqi.start();
rqi.stop();
-
-
}
+
}
Please sign in to comment.
Something went wrong with that request. Please try again.