Skip to content

Commit

Permalink
ISPN-6677 Deal with unavailable persistence dependencies during startup
Browse files Browse the repository at this point in the history
The PersistenceManager now attempts to start each of the configured
stores several times before failure. The number of attempts and the
time between attempts is configuratble.

Furthermore, the PersistenceManager now periodically probes the
availability of the configured stores via the CacheWriter and
CacheLoader isAvailable methods.
  • Loading branch information
ryanemerson authored and wburns committed Jun 1, 2018
1 parent 885e639 commit 63fad6f
Show file tree
Hide file tree
Showing 39 changed files with 596 additions and 74 deletions.
Expand Up @@ -13,17 +13,26 @@
*/
public class PersistenceConfiguration implements Matchable<PersistenceConfiguration> {
public static final AttributeDefinition<Boolean> PASSIVATION = AttributeDefinition.builder("passivation", false).immutable().build();
public static final AttributeDefinition<Integer> AVAILABILITY_INTERVAL = AttributeDefinition.builder("availabilityInterval", 1000).immutable().build();
public static final AttributeDefinition<Integer> CONNECTION_ATTEMPTS = AttributeDefinition.builder("connectionAttempts", 10).immutable().build();
public static final AttributeDefinition<Integer> CONNECTION_INTERVAL = AttributeDefinition.builder("connectionInterval", 50).immutable().build();
static AttributeSet attributeDefinitionSet() {
return new AttributeSet(PersistenceConfiguration.class, PASSIVATION);
return new AttributeSet(PersistenceConfiguration.class, PASSIVATION, AVAILABILITY_INTERVAL, CONNECTION_ATTEMPTS, CONNECTION_INTERVAL);
}

private final Attribute<Boolean> passivation;
private final Attribute<Integer> availabilityInterval;
private final Attribute<Integer> connectionAttempts;
private final Attribute<Integer> connectionInterval;
private final AttributeSet attributes;
private final List<StoreConfiguration> stores;

PersistenceConfiguration(AttributeSet attributes, List<StoreConfiguration> stores) {
this.attributes = attributes.checkProtection();
passivation = attributes.attribute(PASSIVATION);
this.passivation = attributes.attribute(PASSIVATION);
this.availabilityInterval = attributes.attribute(AVAILABILITY_INTERVAL);
this.connectionAttempts = attributes.attribute(CONNECTION_ATTEMPTS);
this.connectionInterval = attributes.attribute(CONNECTION_INTERVAL);
this.stores = stores;
}

Expand All @@ -40,6 +49,18 @@ public boolean passivation() {
return passivation.get();
}

public int availabilityInterval() {
return availabilityInterval.get();
}

public int connectionAttempts() {
return connectionAttempts.get();
}

public int connectionInterval() {
return connectionInterval.get();
}

public List<StoreConfiguration> stores() {
return stores;
}
Expand Down
@@ -1,5 +1,8 @@
package org.infinispan.configuration.cache;

import static org.infinispan.configuration.cache.PersistenceConfiguration.AVAILABILITY_INTERVAL;
import static org.infinispan.configuration.cache.PersistenceConfiguration.CONNECTION_ATTEMPTS;
import static org.infinispan.configuration.cache.PersistenceConfiguration.CONNECTION_INTERVAL;
import static org.infinispan.configuration.cache.PersistenceConfiguration.PASSIVATION;

import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -35,6 +38,35 @@ public PersistenceConfigurationBuilder passivation(boolean b) {
return this;
}

/**
* @param interval The time (in milliseconds) between each availability check that determines whether the PersistenceManager
* is available, i.e. how regularly are stores/loaders polled via their `org.infinispan.persistence.spi.CacheWriter#isAvailable`
* or `org.infinispan.persistence.spi.CacheLoader#isAvailable` implementation. If a single store/loader is unavailable,
* then an exception is thrown during cache operations.
*/
public PersistenceConfigurationBuilder availabilityInterval(int interval) {
attributes.attribute(AVAILABILITY_INTERVAL).set(interval);
return this;
}

/**
* @param attempts The maximum number of unsuccessful attempts to start each of the configured CacheWriter/CacheLoader
* before an exception is thrown and the cache fails to start.
*/
public PersistenceConfigurationBuilder connectionAttempts(int attempts) {
attributes.attribute(CONNECTION_ATTEMPTS).set(attempts);
return this;
}

/**
* @param interval The time (in milliseconds) to wait between subsequent connection attempts on startup. A negative or zero
* interval results in no wait period being observed.
*/
public PersistenceConfigurationBuilder connectionInterval(int interval) {
attributes.attribute(CONNECTION_INTERVAL).set(interval);
return this;
}

/**
* If true, data is only written to the cache store when it is evicted from memory, a phenomenon
* known as 'passivation'. Next time the data is requested, it will be 'activated' which means
Expand Down
Expand Up @@ -25,6 +25,7 @@ public enum Attribute {
AUDIT_LOGGER("audit-logger"),
AUTO_COMMIT("auto-commit"),
AUTO_CONFIG("auto-config"),
AVAILABILITY_INTERVAL("availability-interval"),
AWAIT_INITIAL_TRANSFER("await-initial-transfer"),
BACKUP_FAILURE_POLICY("failure-policy"),
BEFORE("before"),
Expand All @@ -35,6 +36,8 @@ public enum Attribute {
COMPLETED_TX_TIMEOUT("complete-timeout"),
CONCURRENCY_LEVEL("concurrency-level"),
CONFIGURATION("configuration"),
CONNECTION_ATTEMPTS("connection-attempts"),
CONNECTION_INTERVAL("connection-interval"),
CONSISTENT_HASH_FACTORY("consistent-hash-factory"),
CORE_THREADS("core-threads"),
DATA_CONTAINER("data-container"),
Expand Down
Expand Up @@ -2316,6 +2316,15 @@ private void parsePersistence(final XMLExtendedStreamReader reader, final Config
case PASSIVATION:
builder.persistence().passivation(Boolean.parseBoolean(value));
break;
case AVAILABILITY_INTERVAL:
builder.persistence().availabilityInterval(Integer.parseInt(value));
break;
case CONNECTION_ATTEMPTS:
builder.persistence().connectionAttempts(Integer.parseInt(value));
break;
case CONNECTION_INTERVAL:
builder.persistence().connectionInterval(Integer.parseInt(value));
break;
default:
throw ParseUtils.unexpectedAttribute(reader, i);
}
Expand Down
Expand Up @@ -576,6 +576,9 @@ private void writePersistence(XMLExtendedStreamWriter writer, Configuration conf
if (attributes.isModified() || persistence.stores().size() > 0) {
writer.writeStartElement(Element.PERSISTENCE);
attributes.write(writer, PersistenceConfiguration.PASSIVATION, Attribute.PASSIVATION);
attributes.write(writer, PersistenceConfiguration.AVAILABILITY_INTERVAL, Attribute.AVAILABILITY_INTERVAL);
attributes.write(writer, PersistenceConfiguration.CONNECTION_ATTEMPTS, Attribute.CONNECTION_ATTEMPTS);
attributes.write(writer, PersistenceConfiguration.CONNECTION_INTERVAL, Attribute.CONNECTION_INTERVAL);
for (StoreConfiguration store : persistence.stores()) {
writeStore(writer, store);
}
Expand Down
Expand Up @@ -72,7 +72,7 @@ public <T> T construct(Class<T> componentType, String componentName) {
persistenceExecutor = createExecutorService(
globalConfiguration.persistenceThreadPool(),
PERSISTENCE_EXECUTOR,
ExecutorServiceType.DEFAULT);
ExecutorServiceType.SCHEDULED);
}
}
return (T) persistenceExecutor;
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/infinispan/notifications/Listener.java
Expand Up @@ -188,6 +188,8 @@
* @see org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidated
* @see org.infinispan.notifications.cachelistener.annotation.DataRehashed
* @see org.infinispan.notifications.cachelistener.annotation.TopologyChanged
* @see org.infinispan.notifications.cachelistener.annotation.PartitionStatusChanged
* @see org.infinispan.notifications.cachelistener.annotation.PersistenceAvailabilityChanged
* @since 4.0
*/
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Expand Up @@ -112,4 +112,5 @@ void notifyCacheEntryPassivated(K key, V value, boolean pre,

void notifyPartitionStatusChanged(AvailabilityMode mode, boolean pre);

void notifyPersistenceAvailabilityChanged(boolean available);
}
Expand Up @@ -12,6 +12,7 @@
import static org.infinispan.notifications.cachelistener.event.Event.Type.CACHE_ENTRY_VISITED;
import static org.infinispan.notifications.cachelistener.event.Event.Type.DATA_REHASHED;
import static org.infinispan.notifications.cachelistener.event.Event.Type.PARTITION_STATUS_CHANGED;
import static org.infinispan.notifications.cachelistener.event.Event.Type.PERSISTENCE_AVAILABILITY_CHANGED;
import static org.infinispan.notifications.cachelistener.event.Event.Type.TOPOLOGY_CHANGED;
import static org.infinispan.notifications.cachelistener.event.Event.Type.TRANSACTION_COMPLETED;
import static org.infinispan.notifications.cachelistener.event.Event.Type.TRANSACTION_REGISTERED;
Expand Down Expand Up @@ -83,6 +84,7 @@
import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited;
import org.infinispan.notifications.cachelistener.annotation.DataRehashed;
import org.infinispan.notifications.cachelistener.annotation.PartitionStatusChanged;
import org.infinispan.notifications.cachelistener.annotation.PersistenceAvailabilityChanged;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.annotation.TransactionCompleted;
import org.infinispan.notifications.cachelistener.annotation.TransactionRegistered;
Expand All @@ -105,6 +107,7 @@
import org.infinispan.notifications.cachelistener.event.DataRehashedEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.notifications.cachelistener.event.PartitionStatusChangedEvent;
import org.infinispan.notifications.cachelistener.event.PersistenceAvailabilityChangedEvent;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.notifications.cachelistener.event.TransactionCompletedEvent;
import org.infinispan.notifications.cachelistener.event.TransactionRegisteredEvent;
Expand Down Expand Up @@ -165,6 +168,7 @@ public final class CacheNotifierImpl<K, V> extends AbstractListenerImpl<Event<K,
allowedListeners.put(DataRehashed.class, DataRehashedEvent.class);
allowedListeners.put(TopologyChanged.class, TopologyChangedEvent.class);
allowedListeners.put(PartitionStatusChanged.class, PartitionStatusChangedEvent.class);
allowedListeners.put(PersistenceAvailabilityChanged.class, PersistenceAvailabilityChangedEvent.class);

clusterAllowedListeners.put(CacheEntryCreated.class, CacheEntryCreatedEvent.class);
clusterAllowedListeners.put(CacheEntryModified.class, CacheEntryModifiedEvent.class);
Expand All @@ -187,6 +191,7 @@ public final class CacheNotifierImpl<K, V> extends AbstractListenerImpl<Event<K,
final List<CacheEntryListenerInvocation<K, V>> dataRehashedListeners = new CopyOnWriteArrayList<>();
final List<CacheEntryListenerInvocation<K, V>> topologyChangedListeners = new CopyOnWriteArrayList<>();
final List<CacheEntryListenerInvocation<K, V>> partitionChangedListeners = new CopyOnWriteArrayList<>();
final List<CacheEntryListenerInvocation<K, V>> persistenceChangedListeners = new CopyOnWriteArrayList<>();

@Inject private Cache<K, V> cache;
@Inject private ClusteringDependentLogic clusteringDependentLogic;
Expand Down Expand Up @@ -233,6 +238,7 @@ public CacheNotifierImpl() {
listenersMap.put(DataRehashed.class, dataRehashedListeners);
listenersMap.put(TopologyChanged.class, topologyChangedListeners);
listenersMap.put(PartitionStatusChanged.class, partitionChangedListeners);
listenersMap.put(PersistenceAvailabilityChanged.class, persistenceChangedListeners);
}

@Override
Expand Down Expand Up @@ -670,6 +676,15 @@ public void notifyPartitionStatusChanged(AvailabilityMode mode, boolean pre) {
}
}

@Override
public void notifyPersistenceAvailabilityChanged(boolean available) {
if (!persistenceChangedListeners.isEmpty()) {
EventImpl<K, V> e = EventImpl.createEvent(cache, PERSISTENCE_AVAILABILITY_CHANGED);
e.setAvailable(available);
for (CacheEntryListenerInvocation<K, V> listener : persistenceChangedListeners) listener.invoke(e);
}
}

@Override
public void notifyClusterListeners(Collection<? extends CacheEntryEvent<K, V>> events, UUID uuid) {
// We don't need to unwrap key or value as the node where the event originated did this already
Expand Down
@@ -0,0 +1,29 @@
package org.infinispan.notifications.cachelistener.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* This annotation should be used on methods that need to be notified when the availability of the PersistenceManager
* changes. When Cache stores are configured, but the connection to at least one store is lost, the PersistenceManager becomes
* unavailable. This results in a {@link org.infinispan.persistence.spi.StoreUnavailableException} being thrown on all read/write
* operations which require the PersistenceManager until all stores once again become available.
* <p/>
* Methods annotated with this annotation should be public and take in a single parameter, a {@link
* org.infinispan.notifications.cachelistener.event.CacheEntryActivatedEvent} otherwise an {@link
* org.infinispan.notifications.IncorrectListenerException} will be thrown when registering your cache listener.
* Locking: notification is performed WITH locks on the given key.
* <p/>
* Any exceptions thrown by the listener will abort the call. Any other listeners not yet called will not be called,
* and any transactions in progress will be rolled back.
*
* @author Ryan Emerson
* @see org.infinispan.notifications.Listener
* @since 9.3
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface PersistenceAvailabilityChanged {
}
Expand Up @@ -13,7 +13,7 @@ enum Type {
CACHE_ENTRY_ACTIVATED, CACHE_ENTRY_PASSIVATED, CACHE_ENTRY_VISITED,
CACHE_ENTRY_LOADED, CACHE_ENTRY_EVICTED, CACHE_ENTRY_CREATED, CACHE_ENTRY_REMOVED, CACHE_ENTRY_MODIFIED,
TRANSACTION_COMPLETED, TRANSACTION_REGISTERED, CACHE_ENTRY_INVALIDATED, CACHE_ENTRY_EXPIRED, DATA_REHASHED,
TOPOLOGY_CHANGED, PARTITION_STATUS_CHANGED;
TOPOLOGY_CHANGED, PARTITION_STATUS_CHANGED, PERSISTENCE_AVAILABILITY_CHANGED;

private static final Type[] CACHED_VALUES = values();

Expand Down
@@ -0,0 +1,8 @@
package org.infinispan.notifications.cachelistener.event;

public interface PersistenceAvailabilityChangedEvent<K, V> extends Event<K, V> {
/**
* @return true if the {@link org.infinispan.persistence.manager.PersistenceManager} is available.
*/
boolean isAvailable();
}
Expand Up @@ -20,6 +20,7 @@
import org.infinispan.notifications.cachelistener.event.CacheEntryVisitedEvent;
import org.infinispan.notifications.cachelistener.event.DataRehashedEvent;
import org.infinispan.notifications.cachelistener.event.PartitionStatusChangedEvent;
import org.infinispan.notifications.cachelistener.event.PersistenceAvailabilityChangedEvent;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.notifications.cachelistener.event.TransactionCompletedEvent;
import org.infinispan.notifications.cachelistener.event.TransactionRegisteredEvent;
Expand All @@ -38,7 +39,8 @@
@NotThreadSafe
public class EventImpl<K, V> implements CacheEntryActivatedEvent, CacheEntryCreatedEvent, CacheEntriesEvictedEvent, CacheEntryLoadedEvent, CacheEntryModifiedEvent,
CacheEntryPassivatedEvent, CacheEntryRemovedEvent, CacheEntryVisitedEvent, TransactionCompletedEvent, TransactionRegisteredEvent,
CacheEntryInvalidatedEvent, DataRehashedEvent, TopologyChangedEvent, CacheEntryExpiredEvent, PartitionStatusChangedEvent, Cloneable {
CacheEntryInvalidatedEvent, DataRehashedEvent, TopologyChangedEvent, CacheEntryExpiredEvent, PartitionStatusChangedEvent,
PersistenceAvailabilityChangedEvent, Cloneable {
private boolean pre = false; // by default events are after the fact
private transient Cache<K, V> cache;
private K key;
Expand All @@ -57,6 +59,7 @@ public class EventImpl<K, V> implements CacheEntryActivatedEvent, CacheEntryCrea
private boolean commandRetried;
private boolean isCurrentState;
private AvailabilityMode mode;
private boolean available;

public EventImpl() {
}
Expand Down Expand Up @@ -221,6 +224,14 @@ public void setOldValue(V oldValue) {
this.oldValue = oldValue;
}

public boolean isAvailable() {
return available;
}

public void setAvailable(boolean available) {
this.available = available;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -246,7 +257,7 @@ public boolean equals(Object o) {
if (isCurrentState != event.isCurrentState) return false;
if (oldValue != null ? !oldValue.equals(event.oldValue) : event.oldValue != null) return false;

return true;
return available == event.available;
}

@Override
Expand All @@ -267,6 +278,7 @@ public int hashCode() {
result = 31 * result + newTopologyId;
result = 31 * result + (created ? 1 : 0) + (isCurrentState ? 2 : 0);
result = 31 * result + (oldValue != null ? oldValue.hashCode() : 0);
result = 31 * result + (available ? 1 : 0);
return result;
}

Expand Down Expand Up @@ -297,6 +309,7 @@ public String toString() {
", entries=" + entries +
", created=" + created +
", isCurrentState=" + isCurrentState +
", available=" + available +
'}';
}

Expand Down
Expand Up @@ -216,4 +216,10 @@ void prepareAllTxStores(Transaction transaction, BatchModification batchModifica
* @param flags Flags used during command invocation
*/
void deleteBatchFromAllNonTxStores(Iterable<Object> keys, AccessMode accessMode, long flags);


/**
* @return true if all configured stores are available and ready for read/write operations.
*/
boolean isAvailable();
}

0 comments on commit 63fad6f

Please sign in to comment.