Skip to content

Commit

Permalink
Merge pull request hazelcast#11363 from mmedenjak/journal-compatibility
Browse files Browse the repository at this point in the history
Disable event journal when cluster version is less than 3.9
  • Loading branch information
mdogan committed Sep 27, 2017
2 parents 180dd83 + 7a1d333 commit 93016e6
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 6 deletions.
Expand Up @@ -16,13 +16,14 @@

package com.hazelcast.cache.impl.journal;

import com.hazelcast.cache.CacheNotExistsException;
import com.hazelcast.cache.CacheEventType;
import com.hazelcast.cache.CacheNotExistsException;
import com.hazelcast.cache.impl.CacheService;
import com.hazelcast.config.CacheConfig;
import com.hazelcast.config.EventJournalConfig;
import com.hazelcast.config.InMemoryFormat;
import com.hazelcast.config.RingbufferConfig;
import com.hazelcast.internal.cluster.Versions;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.serialization.Data;
Expand All @@ -42,6 +43,7 @@
import static com.hazelcast.cache.CacheEventType.EXPIRED;
import static com.hazelcast.cache.CacheEventType.REMOVED;
import static com.hazelcast.cache.CacheEventType.UPDATED;
import static java.lang.String.format;


/**
Expand Down Expand Up @@ -147,6 +149,12 @@ public boolean hasEventJournal(ObjectNamespace namespace) {

@Override
public EventJournalConfig getEventJournalConfig(ObjectNamespace namespace) {
// when the cluster version is less than 3.9 we act as if the journal is disabled
// this is because some members might not know how to save journal events
if (nodeEngine.getClusterService().getClusterVersion().isLessThan(Versions.V3_9)) {
return null;
}

final String name = namespace.getObjectName();
final CacheConfig cacheConfig = getCacheService().getCacheConfig(name);
if (cacheConfig == null) {
Expand Down Expand Up @@ -213,16 +221,17 @@ private RingbufferContainer<InternalEventJournalCacheEvent> getRingbufferOrFail(

final EventJournalConfig config = getEventJournalConfig(namespace);
if (config == null) {
throw new IllegalStateException("There is no event journal configured for cache with name: "
+ namespace.getObjectName());
throw new IllegalStateException(format(
"There is no event journal configured for cache %s or the journal is disabled",
namespace.getObjectName()));
}
return getOrCreateRingbufferContainer(namespace, partitionId, config);
}

/**
* Gets or creates a ringbuffer for an event journal or returns {@link null} if no
* event journal is configured. The cache record store should have been already created
* at the point when this method is invoked.
* event journal is configured, it is disabled or not available. The cache record
* store should have been already created at the point when this method is invoked.
* This method can be used to get the ringbuffer when we already know that the cache record
* store has been created.
* <p>
Expand All @@ -233,6 +242,7 @@ private RingbufferContainer<InternalEventJournalCacheEvent> getRingbufferOrFail(
* @param partitionId the cache partition ID
* @return the cache partition event journal or {@code null} if no journal is configured for this cache
* @throws CacheNotExistsException if the cache hasn't been already created
* @see #getEventJournalConfig(ObjectNamespace)
*/
private RingbufferContainer<InternalEventJournalCacheEvent> getRingbufferOrNull(ObjectNamespace namespace, int partitionId) {
final RingbufferService ringbufferService = getRingbufferService();
Expand Down
23 changes: 23 additions & 0 deletions hazelcast/src/main/java/com/hazelcast/journal/EventJournal.java
Expand Up @@ -18,6 +18,7 @@

import com.hazelcast.config.EventJournalConfig;
import com.hazelcast.config.RingbufferConfig;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.ringbuffer.StaleSequenceException;
import com.hazelcast.ringbuffer.impl.ReadResultSetImpl;
import com.hazelcast.spi.ObjectNamespace;
Expand Down Expand Up @@ -151,7 +152,29 @@ public interface EventJournal<E> {
*/
boolean hasEventJournal(ObjectNamespace namespace);

/**
* Returns the event journal configuration or {@code null} if there is none or the journal is disabled
* for the given {@code namespace}.
* <p>
* <b>NOTE</b>
* If the {@link ClusterService#getClusterVersion()} is less
* than {@link com.hazelcast.internal.cluster.Versions#V3_9},
* this method will return {@code null}, regardless of whether
* the journal is actually enabled by the configuration. This
* is because some members might not know how to save journal
* events and respond to subscribe/read operations.
*
* @param namespace the object namespace of the specific distributed object
* @return the journal configuration or {@code null} if the journal is not enabled or available
*/
EventJournalConfig getEventJournalConfig(ObjectNamespace namespace);

/**
* Creates a new {@link RingbufferConfig} for a ringbuffer that will keep
* event journal events for a single partition.
*
* @param config the event journal config
* @return the ringbuffer config for a single partition of the event journal
*/
RingbufferConfig toRingbufferConfig(EventJournalConfig config);
}
Expand Up @@ -20,6 +20,7 @@
import com.hazelcast.config.InMemoryFormat;
import com.hazelcast.config.RingbufferConfig;
import com.hazelcast.core.EntryEventType;
import com.hazelcast.internal.cluster.Versions;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.serialization.Data;
Expand Down Expand Up @@ -130,7 +131,11 @@ public boolean hasEventJournal(ObjectNamespace namespace) {

@Override
public EventJournalConfig getEventJournalConfig(ObjectNamespace namespace) {
return nodeEngine.getConfig().findMapEventJournalConfig(namespace.getObjectName());
// when the cluster version is less than 3.9 we act as if the journal is disabled
// this is because some members might not know how to save journal events
return nodeEngine.getClusterService().getClusterVersion().isLessThan(Versions.V3_9)
? null
: nodeEngine.getConfig().findMapEventJournalConfig(namespace.getObjectName());
}

@Override
Expand Down

0 comments on commit 93016e6

Please sign in to comment.