Skip to content

Commit

Permalink
Skip expired events in journal when reading
Browse files Browse the repository at this point in the history
The event journal read operation would clean up expired events in two
places - before executing the operation and each time events would be
added to the result set. As the read operation is loss-tolerant, the
requested sequence was corrected only after the first cleanup. Since
there is a time window between these two cleanups, the sequence needs to
be readjusted after the second cleanup as well.
This might also mean that the result set may contain "gaps" if there
were big pauses and the items expired after they have been added to the
journal and before they were added to the result set.

Fixes: hazelcast#13746
  • Loading branch information
Matko Medenjak committed Sep 26, 2018
1 parent d570d7c commit e0385c2
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 12 deletions.
Expand Up @@ -85,18 +85,7 @@ public void beforeRun() {
final int partitionId = getPartitionId();
journal.cleanup(namespace, partitionId);

final long oldestSequence = journal.oldestSequence(namespace, partitionId);
final long newestSequence = journal.newestSequence(namespace, partitionId);

// fast forward if late and no store is configured
if (startSequence < oldestSequence && !journal.isPersistenceEnabled(namespace, partitionId)) {
startSequence = oldestSequence;
}

// jump back if too far in future
if (startSequence > newestSequence + 1) {
startSequence = newestSequence + 1;
}
startSequence = setWithinBounds(journal, partitionId, startSequence);

journal.isAvailableOrNextSequence(namespace, partitionId, startSequence);
// we'll store the wait notify key because ICache destroys the record store
Expand Down Expand Up @@ -125,6 +114,8 @@ public boolean shouldWait() {
final EventJournal<J> journal = getJournal();
final int partitionId = getPartitionId();
journal.cleanup(namespace, partitionId);
sequence = setWithinBounds(journal, partitionId, sequence);

if (minSize == 0) {
if (!journal.isNextAvailableSequence(namespace, partitionId, sequence)) {
sequence = journal.readMany(namespace, partitionId, sequence, resultSet);
Expand Down Expand Up @@ -189,4 +180,33 @@ protected void readInternal(ObjectDataInput in) throws IOException {
protected abstract ReadResultSetImpl<J, T> createResultSet();

protected abstract EventJournal<J> getJournal();

/**
* Checks if the provided {@code requestedSequence} is within bounds of the
* oldest and newest sequence in the event journal. If the
* {@code requestedSequence} is too old or too new, it will return the
* current oldest or newest journal sequence.
* This method can be used for a loss-tolerant reader when trying to avoid a
* {@link com.hazelcast.ringbuffer.StaleSequenceException}.
*
* @param journal the event journal
* @param partitionId the partition ID to read
* @param requestedSequence the requested sequence to read
* @return the bounded journal sequence
*/
private long setWithinBounds(EventJournal<J> journal, int partitionId, long requestedSequence) {
final long oldestSequence = journal.oldestSequence(namespace, partitionId);
final long newestSequence = journal.newestSequence(namespace, partitionId);

// fast forward if late and no store is configured
if (requestedSequence < oldestSequence && !journal.isPersistenceEnabled(namespace, partitionId)) {
return oldestSequence;
}

// jump back if too far in future
if (requestedSequence > newestSequence + 1) {
return newestSequence + 1;
}
return requestedSequence;
}
}
@@ -0,0 +1,171 @@
/*
* Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.journal;

import com.hazelcast.config.Config;
import com.hazelcast.config.EventJournalConfig;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.projection.Projection;
import com.hazelcast.projection.Projections;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.util.function.Predicate;
import org.junit.Test;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

/**
* Base class for implementing data-structure specific event journal test where the journal is expiring.
*
* @param <EJ_TYPE> the type of the event journal event
*/
public abstract class AbstractEventJournalExpiringTest<EJ_TYPE> extends HazelcastTestSupport {
private static final Random RANDOM = new Random();

protected HazelcastInstance[] instances;

private int partitionId;
private TruePredicate<EJ_TYPE> TRUE_PREDICATE = new TruePredicate<EJ_TYPE>();
private Projection<EJ_TYPE, EJ_TYPE> IDENTITY_PROJECTION = Projections.identity();

private void init() {
instances = createInstances();
partitionId = 1;
warmUpPartitions(instances);
}

@Override
protected Config getConfig() {
int defaultPartitionCount = Integer.parseInt(GroupProperty.PARTITION_COUNT.getDefaultValue());
EventJournalConfig eventJournalConfig = new EventJournalConfig()
.setEnabled(true)
.setMapName("default")
.setCacheName("default")
.setTimeToLiveSeconds(1)
.setCapacity(500 * defaultPartitionCount);

return smallInstanceConfig()
.addEventJournalConfig(eventJournalConfig);
}

@Test
public void skipsEventsWhenExpired() throws Throwable {
init();

final EventJournalTestContext<String, Integer, EJ_TYPE> context = createContext();

String key = randomPartitionKey();
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();

readFromJournal(context, exception, 0);

for (int i = 0; i < 100000; i++) {
context.dataAdapter.put(key, i);
if (exception.get() != null) {
throw exception.get();
}
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1));
}
}

private void readFromJournal(final EventJournalTestContext<String, Integer, EJ_TYPE> context,
final AtomicReference<Throwable> exception,
long seq) {
readFromEventJournal(context.dataAdapter, seq, 128, partitionId, TRUE_PREDICATE,
IDENTITY_PROJECTION).andThen(new ExecutionCallback<ReadResultSet<EJ_TYPE>>() {
@Override
public void onResponse(ReadResultSet<EJ_TYPE> response) {
readFromJournal(context, exception, response.getNextSequenceToReadFrom());
// ignore response
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
}

@Override
public void onFailure(Throwable t) {
exception.set(t);
}
});
}


/**
* Returns a random key belonging to the partition with ID {@link #partitionId}.
*/
private String randomPartitionKey() {
return generateKeyForPartition(instances[0], partitionId);
}

/**
* Creates hazelcast instances used to run the tests.
*
* @return the array of hazelcast instances
*/
protected HazelcastInstance[] createInstances() {
return createHazelcastInstanceFactory(2).newInstances(getConfig());
}

/**
* Reads from the event journal a set of events.
*
* @param adapter the adapter for a specific data structure
* @param startSequence the sequence of the first item to read
* @param maxSize the maximum number of items to read
* @param partitionId the partition ID of the entries in the journal
* @param predicate the predicate which the events must pass to be included in the response.
* May be {@code null} in which case all events pass the predicate
* @param projection the projection which is applied to the events before returning.
* May be {@code null} in which case the event is returned without being
* projected
* @param <K> the data structure entry key type
* @param <V>the data structure entry value type
* @param <PROJ_TYPE> the return type of the projection. It is equal to the journal event type
* if the projection is {@code null} or it is the identity projection
* @return the future with the filtered and projected journal items
*/
private <K, V, PROJ_TYPE> ICompletableFuture<ReadResultSet<PROJ_TYPE>> readFromEventJournal(
EventJournalDataStructureAdapter<K, V, EJ_TYPE> adapter,
long startSequence,
int maxSize,
int partitionId,
Predicate<EJ_TYPE> predicate,
Projection<EJ_TYPE, PROJ_TYPE> projection) {
return adapter.readFromEventJournal(startSequence, 1, maxSize, partitionId, predicate, projection);
}

/**
* Returns a random hazelcast instance
*/
protected HazelcastInstance getRandomInstance() {
return instances[RANDOM.nextInt(instances.length)];
}

/**
* Creates the data structure specific {@link EventJournalTestContext} used
* by the event journal tests.
*
* @param <K> key type of the created {@link EventJournalTestContext}
* @param <V> value type of the created {@link EventJournalTestContext}
* @return a {@link EventJournalTestContext} used by the event journal tests
*/
protected abstract <K, V> EventJournalTestContext<K, V, EJ_TYPE> createContext();
}
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.map.impl.journal;

import com.hazelcast.config.Config;
import com.hazelcast.config.InMemoryFormat;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MapStoreConfig;
import com.hazelcast.core.MapStore;
import com.hazelcast.journal.AbstractEventJournalExpiringTest;
import com.hazelcast.journal.EventJournalTestContext;
import com.hazelcast.map.journal.EventJournalMapEvent;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.test.annotation.SlowTest;
import com.hazelcast.util.MapUtil;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;

import static com.hazelcast.config.MapStoreConfig.InitialLoadMode.EAGER;

@RunWith(HazelcastParallelClassRunner.class)
@Category({SlowTest.class, ParallelTest.class})
public class MapEventJournalExpiringTest<K, V> extends AbstractEventJournalExpiringTest<EventJournalMapEvent> {

private static final String MAP_NAME = "mappy";

@Override
protected Config getConfig() {
final MapConfig nonExpiringMapConfig = new MapConfig(MAP_NAME)
.setInMemoryFormat(getInMemoryFormat());

return super.getConfig()
.addMapConfig(nonExpiringMapConfig);
}

protected InMemoryFormat getInMemoryFormat() {
return MapConfig.DEFAULT_IN_MEMORY_FORMAT;
}

@Override
protected EventJournalTestContext<K, V, EventJournalMapEvent<K, V>> createContext() {
return new EventJournalTestContext<K, V, EventJournalMapEvent<K, V>>(
new EventJournalMapDataStructureAdapter<K, V>(getRandomInstance().<K, V>getMap(MAP_NAME)),
null,
new EventJournalMapEventAdapter<K, V>()
);
}
}

0 comments on commit e0385c2

Please sign in to comment.