From e0385c237ca525d4d3e0fc6ff0bbc2979b58515f Mon Sep 17 00:00:00 2001 From: Matko Medenjak Date: Wed, 12 Sep 2018 14:47:04 +0200 Subject: [PATCH] Skip expired events in journal when reading The event journal read operation would clean up expired events in two places - before executing the operation and each time events would be added to the result set. As the read operation is loss-tolerant, the requested sequence was corrected only after the first cleanup. Since there is a time window between these two cleanups, the sequence needs to be readjusted after the second cleanup as well. This might also mean that the result set may contain "gaps" if there were big pauses and the items expired after they have been added to the journal and before they were added to the result set. Fixes: https://github.com/hazelcast/hazelcast/issues/13746 --- .../journal/EventJournalReadOperation.java | 44 +++-- .../AbstractEventJournalExpiringTest.java | 171 ++++++++++++++++++ .../journal/MapEventJournalExpiringTest.java | 68 +++++++ 3 files changed, 271 insertions(+), 12 deletions(-) create mode 100644 hazelcast/src/test/java/com/hazelcast/journal/AbstractEventJournalExpiringTest.java create mode 100644 hazelcast/src/test/java/com/hazelcast/map/impl/journal/MapEventJournalExpiringTest.java diff --git a/hazelcast/src/main/java/com/hazelcast/internal/journal/EventJournalReadOperation.java b/hazelcast/src/main/java/com/hazelcast/internal/journal/EventJournalReadOperation.java index f409569870ba..3961e4f7a703 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/journal/EventJournalReadOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/journal/EventJournalReadOperation.java @@ -85,18 +85,7 @@ public void beforeRun() { final int partitionId = getPartitionId(); journal.cleanup(namespace, partitionId); - final long oldestSequence = journal.oldestSequence(namespace, partitionId); - final long newestSequence = journal.newestSequence(namespace, partitionId); - - // fast forward if late and no store is configured - if (startSequence < oldestSequence && !journal.isPersistenceEnabled(namespace, partitionId)) { - startSequence = oldestSequence; - } - - // jump back if too far in future - if (startSequence > newestSequence + 1) { - startSequence = newestSequence + 1; - } + startSequence = setWithinBounds(journal, partitionId, startSequence); journal.isAvailableOrNextSequence(namespace, partitionId, startSequence); // we'll store the wait notify key because ICache destroys the record store @@ -125,6 +114,8 @@ public boolean shouldWait() { final EventJournal journal = getJournal(); final int partitionId = getPartitionId(); journal.cleanup(namespace, partitionId); + sequence = setWithinBounds(journal, partitionId, sequence); + if (minSize == 0) { if (!journal.isNextAvailableSequence(namespace, partitionId, sequence)) { sequence = journal.readMany(namespace, partitionId, sequence, resultSet); @@ -189,4 +180,33 @@ protected void readInternal(ObjectDataInput in) throws IOException { protected abstract ReadResultSetImpl createResultSet(); protected abstract EventJournal getJournal(); + + /** + * Checks if the provided {@code requestedSequence} is within bounds of the + * oldest and newest sequence in the event journal. If the + * {@code requestedSequence} is too old or too new, it will return the + * current oldest or newest journal sequence. + * This method can be used for a loss-tolerant reader when trying to avoid a + * {@link com.hazelcast.ringbuffer.StaleSequenceException}. + * + * @param journal the event journal + * @param partitionId the partition ID to read + * @param requestedSequence the requested sequence to read + * @return the bounded journal sequence + */ + private long setWithinBounds(EventJournal journal, int partitionId, long requestedSequence) { + final long oldestSequence = journal.oldestSequence(namespace, partitionId); + final long newestSequence = journal.newestSequence(namespace, partitionId); + + // fast forward if late and no store is configured + if (requestedSequence < oldestSequence && !journal.isPersistenceEnabled(namespace, partitionId)) { + return oldestSequence; + } + + // jump back if too far in future + if (requestedSequence > newestSequence + 1) { + return newestSequence + 1; + } + return requestedSequence; + } } diff --git a/hazelcast/src/test/java/com/hazelcast/journal/AbstractEventJournalExpiringTest.java b/hazelcast/src/test/java/com/hazelcast/journal/AbstractEventJournalExpiringTest.java new file mode 100644 index 000000000000..87be7aa5a8a8 --- /dev/null +++ b/hazelcast/src/test/java/com/hazelcast/journal/AbstractEventJournalExpiringTest.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.journal; + +import com.hazelcast.config.Config; +import com.hazelcast.config.EventJournalConfig; +import com.hazelcast.core.ExecutionCallback; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.ICompletableFuture; +import com.hazelcast.projection.Projection; +import com.hazelcast.projection.Projections; +import com.hazelcast.ringbuffer.ReadResultSet; +import com.hazelcast.spi.properties.GroupProperty; +import com.hazelcast.test.HazelcastTestSupport; +import com.hazelcast.util.function.Predicate; +import org.junit.Test; + +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; + +/** + * Base class for implementing data-structure specific event journal test where the journal is expiring. + * + * @param the type of the event journal event + */ +public abstract class AbstractEventJournalExpiringTest extends HazelcastTestSupport { + private static final Random RANDOM = new Random(); + + protected HazelcastInstance[] instances; + + private int partitionId; + private TruePredicate TRUE_PREDICATE = new TruePredicate(); + private Projection IDENTITY_PROJECTION = Projections.identity(); + + private void init() { + instances = createInstances(); + partitionId = 1; + warmUpPartitions(instances); + } + + @Override + protected Config getConfig() { + int defaultPartitionCount = Integer.parseInt(GroupProperty.PARTITION_COUNT.getDefaultValue()); + EventJournalConfig eventJournalConfig = new EventJournalConfig() + .setEnabled(true) + .setMapName("default") + .setCacheName("default") + .setTimeToLiveSeconds(1) + .setCapacity(500 * defaultPartitionCount); + + return smallInstanceConfig() + .addEventJournalConfig(eventJournalConfig); + } + + @Test + public void skipsEventsWhenExpired() throws Throwable { + init(); + + final EventJournalTestContext context = createContext(); + + String key = randomPartitionKey(); + final AtomicReference exception = new AtomicReference(); + + readFromJournal(context, exception, 0); + + for (int i = 0; i < 100000; i++) { + context.dataAdapter.put(key, i); + if (exception.get() != null) { + throw exception.get(); + } + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); + } + } + + private void readFromJournal(final EventJournalTestContext context, + final AtomicReference exception, + long seq) { + readFromEventJournal(context.dataAdapter, seq, 128, partitionId, TRUE_PREDICATE, + IDENTITY_PROJECTION).andThen(new ExecutionCallback>() { + @Override + public void onResponse(ReadResultSet response) { + readFromJournal(context, exception, response.getNextSequenceToReadFrom()); + // ignore response + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2)); + } + + @Override + public void onFailure(Throwable t) { + exception.set(t); + } + }); + } + + + /** + * Returns a random key belonging to the partition with ID {@link #partitionId}. + */ + private String randomPartitionKey() { + return generateKeyForPartition(instances[0], partitionId); + } + + /** + * Creates hazelcast instances used to run the tests. + * + * @return the array of hazelcast instances + */ + protected HazelcastInstance[] createInstances() { + return createHazelcastInstanceFactory(2).newInstances(getConfig()); + } + + /** + * Reads from the event journal a set of events. + * + * @param adapter the adapter for a specific data structure + * @param startSequence the sequence of the first item to read + * @param maxSize the maximum number of items to read + * @param partitionId the partition ID of the entries in the journal + * @param predicate the predicate which the events must pass to be included in the response. + * May be {@code null} in which case all events pass the predicate + * @param projection the projection which is applied to the events before returning. + * May be {@code null} in which case the event is returned without being + * projected + * @param the data structure entry key type + * @param the data structure entry value type + * @param the return type of the projection. It is equal to the journal event type + * if the projection is {@code null} or it is the identity projection + * @return the future with the filtered and projected journal items + */ + private ICompletableFuture> readFromEventJournal( + EventJournalDataStructureAdapter adapter, + long startSequence, + int maxSize, + int partitionId, + Predicate predicate, + Projection projection) { + return adapter.readFromEventJournal(startSequence, 1, maxSize, partitionId, predicate, projection); + } + + /** + * Returns a random hazelcast instance + */ + protected HazelcastInstance getRandomInstance() { + return instances[RANDOM.nextInt(instances.length)]; + } + + /** + * Creates the data structure specific {@link EventJournalTestContext} used + * by the event journal tests. + * + * @param key type of the created {@link EventJournalTestContext} + * @param value type of the created {@link EventJournalTestContext} + * @return a {@link EventJournalTestContext} used by the event journal tests + */ + protected abstract EventJournalTestContext createContext(); +} diff --git a/hazelcast/src/test/java/com/hazelcast/map/impl/journal/MapEventJournalExpiringTest.java b/hazelcast/src/test/java/com/hazelcast/map/impl/journal/MapEventJournalExpiringTest.java new file mode 100644 index 000000000000..ee64676ac519 --- /dev/null +++ b/hazelcast/src/test/java/com/hazelcast/map/impl/journal/MapEventJournalExpiringTest.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.map.impl.journal; + +import com.hazelcast.config.Config; +import com.hazelcast.config.InMemoryFormat; +import com.hazelcast.config.MapConfig; +import com.hazelcast.config.MapStoreConfig; +import com.hazelcast.core.MapStore; +import com.hazelcast.journal.AbstractEventJournalExpiringTest; +import com.hazelcast.journal.EventJournalTestContext; +import com.hazelcast.map.journal.EventJournalMapEvent; +import com.hazelcast.test.HazelcastParallelClassRunner; +import com.hazelcast.test.annotation.ParallelTest; +import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.test.annotation.SlowTest; +import com.hazelcast.util.MapUtil; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import static com.hazelcast.config.MapStoreConfig.InitialLoadMode.EAGER; + +@RunWith(HazelcastParallelClassRunner.class) +@Category({SlowTest.class, ParallelTest.class}) +public class MapEventJournalExpiringTest extends AbstractEventJournalExpiringTest { + + private static final String MAP_NAME = "mappy"; + + @Override + protected Config getConfig() { + final MapConfig nonExpiringMapConfig = new MapConfig(MAP_NAME) + .setInMemoryFormat(getInMemoryFormat()); + + return super.getConfig() + .addMapConfig(nonExpiringMapConfig); + } + + protected InMemoryFormat getInMemoryFormat() { + return MapConfig.DEFAULT_IN_MEMORY_FORMAT; + } + + @Override + protected EventJournalTestContext> createContext() { + return new EventJournalTestContext>( + new EventJournalMapDataStructureAdapter(getRandomInstance().getMap(MAP_NAME)), + null, + new EventJournalMapEventAdapter() + ); + } +}