Skip to content

Commit

Permalink
WFLY-10736 Increase retry intervals for startup cache operations up t…
Browse files Browse the repository at this point in the history
…o lock acquisition timeout.
  • Loading branch information
pferraro committed Aug 23, 2018
1 parent 14850a7 commit 36fb0f1
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 17 deletions.
@@ -0,0 +1,52 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2018, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.wildfly.clustering.ee.infinispan.retry;

import java.time.Duration;
import java.util.LinkedList;
import java.util.List;

import org.infinispan.Cache;
import org.infinispan.configuration.cache.Configuration;

/**
* Retrying invoker whose retry intervals are auto-generated based an Infinispan cache configuration.
* @author Paul Ferraro
*/
public class RetryingInvoker extends org.wildfly.clustering.ee.retry.RetryingInvoker {

public RetryingInvoker(Cache<?, ?> cache) {
super(calculateRetryIntervals(cache.getCacheConfiguration()));
}

private static List<Duration> calculateRetryIntervals(Configuration config) {
long timeout = config.locking().lockAcquisitionTimeout();
List<Duration> intervals = new LinkedList<>();
// Generate exponential back-off intervals
for (long interval = timeout; interval > 1; interval /= 10) {
intervals.add(0, Duration.ofMillis(interval));
}
intervals.add(0, Duration.ZERO);
return intervals;
}
}
Expand Up @@ -22,6 +22,8 @@
package org.wildfly.clustering.ee.retry;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import org.jboss.logging.Logger;
import org.wildfly.clustering.ee.Invoker;
Expand All @@ -37,22 +39,26 @@ public class RetryingInvoker implements Invoker {
// No logger interface for this module and no reason to create one for this class only
private static final Logger LOGGER = Logger.getLogger(RetryingInvoker.class);

private final Duration[] backOffIntervals;
private final List<Duration> retryIntevals;

public RetryingInvoker(Duration... backOffIntervals) {
this.backOffIntervals = backOffIntervals;
public RetryingInvoker(Duration... retryIntervals) {
this(Arrays.asList(retryIntervals));
}

protected RetryingInvoker(List<Duration> retryIntevals) {
this.retryIntevals = retryIntevals;
}

@Override
public <R, E extends Exception> R invoke(ExceptionSupplier<R, E> task) throws E {
for (int i = 0; i < this.backOffIntervals.length; ++i) {
int attempt = 0;
for (Duration delay : this.retryIntevals) {
if (Thread.currentThread().isInterrupted()) break;
try {
return task.get();
} catch (Exception e) {
LOGGER.debugf(e, "Attempt #%d failed", i + 1);
LOGGER.debugf(e, "Attempt #%d failed", ++attempt);
}
Duration delay = this.backOffIntervals[i];
if (delay.isZero() || delay.isNegative()) {
Thread.yield();
} else {
Expand Down
Expand Up @@ -22,7 +22,6 @@
package org.wildfly.clustering.server.provider;

import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -58,7 +57,7 @@
import org.wildfly.clustering.ee.Batch;
import org.wildfly.clustering.ee.Batcher;
import org.wildfly.clustering.ee.Invoker;
import org.wildfly.clustering.ee.retry.RetryingInvoker;
import org.wildfly.clustering.ee.infinispan.retry.RetryingInvoker;
import org.wildfly.clustering.group.GroupListener;
import org.wildfly.clustering.group.Membership;
import org.wildfly.clustering.group.Node;
Expand All @@ -85,14 +84,13 @@ private static ThreadFactory createThreadFactory(Class<?> targetClass) {
return WildFlySecurityManager.doUnchecked(action);
}

private static final Invoker INVOKER = new RetryingInvoker(Duration.ZERO, Duration.ofMillis(100), Duration.ofSeconds(1));

final Batcher<? extends Batch> batcher;
private final ConcurrentMap<T, Map.Entry<Listener, ExecutorService>> listeners = new ConcurrentHashMap<>();
private final Cache<T, Set<Address>> cache;
private final Group<Address> group;
private final Registration groupRegistration;
private final CommandDispatcher<Set<T>> dispatcher;
private final Invoker invoker;

public CacheServiceProviderRegistry(CacheServiceProviderRegistryConfiguration<T> config) {
this.group = config.getGroup();
Expand All @@ -101,6 +99,7 @@ public CacheServiceProviderRegistry(CacheServiceProviderRegistryConfiguration<T>
this.dispatcher = config.getCommandDispatcherFactory().createCommandDispatcher(config.getId(), this.listeners.keySet());
this.cache.addListener(this);
this.groupRegistration = this.group.register(this);
this.invoker = new RetryingInvoker(this.cache);
}

@Override
Expand Down Expand Up @@ -148,7 +147,7 @@ public void run() throws CacheException {
CacheServiceProviderRegistry.this.registerLocal(service);
}
};
INVOKER.invoke(registerAction);
this.invoker.invoke(registerAction);
return new SimpleServiceProviderRegistration<>(service, this, () -> {
Address localAddress = this.group.getAddress(this.group.getLocalMember());
try (Batch batch = this.batcher.createBatch()) {
Expand Down
Expand Up @@ -22,7 +22,6 @@
package org.wildfly.clustering.server.registry;

import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -60,8 +59,7 @@
import org.wildfly.clustering.Registration;
import org.wildfly.clustering.ee.Batch;
import org.wildfly.clustering.ee.Batcher;
import org.wildfly.clustering.ee.Invoker;
import org.wildfly.clustering.ee.retry.RetryingInvoker;
import org.wildfly.clustering.ee.infinispan.retry.RetryingInvoker;
import org.wildfly.clustering.group.Node;
import org.wildfly.clustering.infinispan.spi.distribution.ConsistentHashLocality;
import org.wildfly.clustering.infinispan.spi.distribution.Locality;
Expand All @@ -86,8 +84,6 @@ private static ThreadFactory createThreadFactory(Class<?> targetClass) {
return WildFlySecurityManager.doUnchecked(action);
}

private static final Invoker INVOKER = new RetryingInvoker(Duration.ZERO, Duration.ofMillis(100), Duration.ofSeconds(1));

private final ExecutorService topologyChangeExecutor = Executors.newSingleThreadExecutor(createThreadFactory(this.getClass()));
private final Map<RegistryListener<K, V>, ExecutorService> listeners = new ConcurrentHashMap<>();
private final Cache<Address, Map.Entry<K, V>> cache;
Expand All @@ -102,7 +98,7 @@ public CacheRegistry(CacheRegistryConfiguration<K, V> config, Map.Entry<K, V> en
this.group = config.getGroup();
this.closeTask = closeTask;
this.entry = new AbstractMap.SimpleImmutableEntry<>(entry);
INVOKER.invoke(this::populateRegistry);
new RetryingInvoker(this.cache).invoke(this::populateRegistry);
this.cache.addListener(this, new CacheRegistryFilter(), null);
}

Expand Down

0 comments on commit 36fb0f1

Please sign in to comment.