From 4132130af71257624d2ac52147435a644b375587 Mon Sep 17 00:00:00 2001 From: Matko Medenjak Date: Wed, 28 Feb 2018 13:31:24 +0100 Subject: [PATCH] Tolerate event journal reads of stale and future items Previously when the reader requested an event from the event journal which was already overwritten, the reader would get a StaleSequenceException. Now we tolerate reading items with stale sequences and return the oldest events instead, together with a nextSeq field indicating the sequence from where further reads can continue. Also, allowed the reader to request a future, non existent sequence. This can happen when some of the partitions have been lost, and it will try to read from an sequence which doesn't exist. In this case, the read silently returns from the event following the newest event (i.e. always blocks and waits for the next event in the journal). Also, reformatted some javadoc to adhere to a ~72 width limit and removed some unnecessary rolling upgrade checks. Fixes: https://github.com/hazelcast/hazelcast/issues/11895 --- .../client/cache/impl/ClientCacheProxy.java | 2 +- .../client/proxy/ClientMapProxy.java | 2 +- .../client/proxy/ClientRingbufferProxy.java | 2 +- .../task/cache/CacheEventJournalReadTask.java | 2 +- .../task/map/MapEventJournalReadTask.java | 2 +- .../RingbufferReadManyMessageTask.java | 2 +- .../journal/EventJournalReadOperation.java | 2 +- .../hazelcast/ringbuffer/ReadResultSet.java | 20 ++++++++++++++----- .../ringbuffer/impl/ReadResultSetImpl.java | 16 +++++++-------- .../impl/client/PortableReadResultSet.java | 14 ++++++------- .../impl/operations/ReadManyOperation.java | 2 +- .../ClientCompatibilityNullTest_1_0.java | 2 +- .../ClientCompatibilityNullTest_1_1.java | 2 +- .../ClientCompatibilityNullTest_1_2.java | 2 +- .../ClientCompatibilityNullTest_1_3.java | 2 +- .../ClientCompatibilityNullTest_1_4.java | 2 +- .../ClientCompatibilityNullTest_1_5.java | 6 +++--- .../ClientCompatibilityNullTest_1_6.java | 6 +++--- .../ClientCompatibilityTest_1_0.java | 2 +- .../ClientCompatibilityTest_1_1.java | 2 +- .../ClientCompatibilityTest_1_2.java | 2 +- .../ClientCompatibilityTest_1_3.java | 2 +- .../ClientCompatibilityTest_1_4.java | 2 +- .../ClientCompatibilityTest_1_5.java | 6 +++--- .../ClientCompatibilityTest_1_6.java | 6 +++--- .../EncodeDecodeCompatibilityNullTest.java | 6 +++--- .../EncodeDecodeCompatibilityTest.java | 6 +++--- .../AbstractEventJournalBasicTest.java | 2 +- 28 files changed, 67 insertions(+), 57 deletions(-) diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/cache/impl/ClientCacheProxy.java b/hazelcast-client/src/main/java/com/hazelcast/client/cache/impl/ClientCacheProxy.java index 42ece8e844278..a7a945e233190 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/cache/impl/ClientCacheProxy.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/cache/impl/ClientCacheProxy.java @@ -112,7 +112,7 @@ public ReadResultSet decodeClientMessage(ClientMessage message) { final CacheEventJournalReadCodec.ResponseParameters params = CacheEventJournalReadCodec.decodeResponse(message); final PortableReadResultSet resultSet = new PortableReadResultSet( params.readCount, params.items, params.itemSeqs, - params.lostCountExist ? params.lostCount : -1); + params.nextSeqExist ? params.nextSeq : -1); resultSet.setSerializationService(getSerializationService()); return resultSet; } diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientMapProxy.java b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientMapProxy.java index 6c395b160bf40..b1cada988bbf0 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientMapProxy.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientMapProxy.java @@ -254,7 +254,7 @@ public ReadResultSet decodeClientMessage(ClientMessage message) { final MapEventJournalReadCodec.ResponseParameters params = MapEventJournalReadCodec.decodeResponse(message); final PortableReadResultSet resultSet = new PortableReadResultSet( params.readCount, params.items, params.itemSeqs, - params.lostCountExist ? params.lostCount : -1); + params.nextSeqExist ? params.nextSeq : -1); resultSet.setSerializationService(getSerializationService()); return resultSet; } diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientRingbufferProxy.java b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientRingbufferProxy.java index ce8c98e8ee17b..19e193c5434cd 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientRingbufferProxy.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientRingbufferProxy.java @@ -96,7 +96,7 @@ public PortableReadResultSet decodeClientMessage(ClientMessage clientMessage) { final RingbufferReadManyCodec.ResponseParameters params = RingbufferReadManyCodec.decodeResponse(clientMessage); final PortableReadResultSet readResultSet = new PortableReadResultSet( params.readCount, params.items, params.itemSeqs, - params.lostCountExist ? params.lostCount : -1); + params.nextSeqExist ? params.nextSeq : -1); readResultSet.setSerializationService(getSerializationService()); return readResultSet; } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheEventJournalReadTask.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheEventJournalReadTask.java index 033c6c5075ece..ac39bafbf2ebd 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheEventJournalReadTask.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheEventJournalReadTask.java @@ -86,7 +86,7 @@ protected ClientMessage encodeResponse(Object response) { seqs[k] = resultSet.getSequence(k); } - return CacheEventJournalReadCodec.encodeResponse(resultSet.readCount(), items, seqs, resultSet.getLostCount()); + return CacheEventJournalReadCodec.encodeResponse(resultSet.readCount(), items, seqs, resultSet.getNextSequenceToReadFrom()); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/map/MapEventJournalReadTask.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/map/MapEventJournalReadTask.java index ca891966e8b1c..fbb936b61fac8 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/map/MapEventJournalReadTask.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/map/MapEventJournalReadTask.java @@ -85,7 +85,7 @@ protected ClientMessage encodeResponse(Object response) { seqs[k] = resultSet.getSequence(k); } - return MapEventJournalReadCodec.encodeResponse(resultSet.readCount(), items, seqs, resultSet.getLostCount()); + return MapEventJournalReadCodec.encodeResponse(resultSet.readCount(), items, seqs, resultSet.getNextSequenceToReadFrom()); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/ringbuffer/RingbufferReadManyMessageTask.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/ringbuffer/RingbufferReadManyMessageTask.java index 0a6b2454c680f..e8d42301a77ce 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/ringbuffer/RingbufferReadManyMessageTask.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/ringbuffer/RingbufferReadManyMessageTask.java @@ -69,7 +69,7 @@ protected ClientMessage encodeResponse(Object response) { seqs[k] = resultSet.getSequence(k); } - return RingbufferReadManyCodec.encodeResponse(resultSet.readCount(), items, seqs, resultSet.getLostCount()); + return RingbufferReadManyCodec.encodeResponse(resultSet.readCount(), items, seqs, resultSet.getNextSequenceToReadFrom()); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/internal/journal/EventJournalReadOperation.java b/hazelcast/src/main/java/com/hazelcast/internal/journal/EventJournalReadOperation.java index 6f9e22deb0ce7..1faa5357b01b5 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/journal/EventJournalReadOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/journal/EventJournalReadOperation.java @@ -119,7 +119,7 @@ public void beforeRun() { public boolean shouldWait() { if (resultSet == null) { resultSet = createResultSet(); - resultSet.setLostCount(lostEventCount); + resultSet.setNextSequenceToReadFrom(lostEventCount); sequence = startSequence; } diff --git a/hazelcast/src/main/java/com/hazelcast/ringbuffer/ReadResultSet.java b/hazelcast/src/main/java/com/hazelcast/ringbuffer/ReadResultSet.java index 27a3891ad82cc..7533f21c348ec 100644 --- a/hazelcast/src/main/java/com/hazelcast/ringbuffer/ReadResultSet.java +++ b/hazelcast/src/main/java/com/hazelcast/ringbuffer/ReadResultSet.java @@ -76,13 +76,23 @@ public interface ReadResultSet extends Iterable { int size(); /** - * Returns the number of lost items. Items can be skipped if the items - * with the requested sequence were already overwritten with newer - * items and fast-forwarding to the oldest item is permitted. - * If the information is not available, returns {@code -1}. + * Returns the sequence of the item following the last read item. This + * sequence can then be used to read items following the ones returned by + * this result set. + * Usually this sequence is equal to the sequence used to retrieve this + * result set incremented by the {@link #readCount()}. In cases when the + * reader tolerates lost items, this is not the case. + * For instance, if the reader requests an item with a stale sequence (one + * which has already been overwritten), the read will jump to the oldest + * sequence and read from there. + * Similarly, if the reader requests an item in the future (e.g. because + * the partition was lost and the reader was unaware of this), the read + * method will jump back to the newest available sequence. + * Because of these jumps and only in the case when the reader is loss + * tolerant, the next sequence must be retrieved using this method. * * @return the number of lost items * @since 3.10 */ - long getLostCount(); + long getNextSequenceToReadFrom(); } diff --git a/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/ReadResultSetImpl.java b/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/ReadResultSetImpl.java index 8e104ad07b401..902d876c3f55e 100644 --- a/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/ReadResultSetImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/ReadResultSetImpl.java @@ -65,7 +65,7 @@ public class ReadResultSetImpl extends AbstractList private long[] seqs; private int size; private int readCount; - private long lostCount; + private long nextSeq; public ReadResultSetImpl() { } @@ -196,12 +196,12 @@ public int getId() { } @Override - public long getLostCount() { - return lostCount; + public long getNextSequenceToReadFrom() { + return nextSeq; } - public void setLostCount(long lostCount) { - this.lostCount = lostCount; + public void setNextSequenceToReadFrom(long nextSeq) { + this.nextSeq = nextSeq; } @Override @@ -214,7 +214,7 @@ public void writeData(ObjectDataOutput out) throws IOException { out.writeLongArray(seqs); // RU_COMPAT_3_9 if (out.getVersion().isGreaterOrEqual(Versions.V3_10)) { - out.writeLong(lostCount); + out.writeLong(nextSeq); } } @@ -229,9 +229,9 @@ public void readData(ObjectDataInput in) throws IOException { seqs = in.readLongArray(); // RU_COMPAT_3_9 if (in.getVersion().isGreaterOrEqual(Versions.V3_10)) { - lostCount = in.readLong(); + nextSeq = in.readLong(); } else { - lostCount = -1; + nextSeq = -1; } } } diff --git a/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/client/PortableReadResultSet.java b/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/client/PortableReadResultSet.java index bf6c5cbb7b516..b1175a7fa0e68 100644 --- a/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/client/PortableReadResultSet.java +++ b/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/client/PortableReadResultSet.java @@ -37,7 +37,7 @@ import static java.util.Collections.unmodifiableList; public class PortableReadResultSet implements Portable, ReadResultSet { - private long lostCount; + private long nextSeq; private List items; private int readCount; private SerializationService serializationService; @@ -47,11 +47,11 @@ public PortableReadResultSet() { } @SuppressFBWarnings("EI_EXPOSE_REP2") - public PortableReadResultSet(int readCount, List items, long[] seqs, long lostCount) { + public PortableReadResultSet(int readCount, List items, long[] seqs, long nextSeq) { this.readCount = readCount; this.items = items; this.seqs = seqs; - this.lostCount = lostCount; + this.nextSeq = nextSeq; } public List getDataItems() { @@ -93,8 +93,8 @@ public int size() { } @Override - public long getLostCount() { - return lostCount; + public long getNextSequenceToReadFrom() { + return nextSeq; } @Override @@ -120,7 +120,7 @@ public void writePortable(PortableWriter writer) throws IOException { rawDataOutput.writeLongArray(seqs); // RU_COMPAT_3_9 if (rawDataOutput.getVersion().isGreaterOrEqual(Versions.V3_10)) { - rawDataOutput.writeLong(lostCount); + rawDataOutput.writeLong(nextSeq); } } @@ -139,7 +139,7 @@ public void readPortable(PortableReader reader) throws IOException { seqs = rawDataInput.readLongArray(); // RU_COMPAT_3_9 if (rawDataInput.getVersion().isGreaterOrEqual(Versions.V3_10)) { - lostCount = rawDataInput.readLong(); + nextSeq = rawDataInput.readLong(); } } } diff --git a/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/operations/ReadManyOperation.java b/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/operations/ReadManyOperation.java index 336cc1898ec3d..cc6605bc4f5ff 100644 --- a/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/operations/ReadManyOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/ringbuffer/impl/operations/ReadManyOperation.java @@ -115,7 +115,7 @@ public Object getResponse() { } final PortableReadResultSet portableReadResultSet = new PortableReadResultSet( - resultSet.readCount(), items, seqs, resultSet.getLostCount()); + resultSet.readCount(), items, seqs, resultSet.getNextSequenceToReadFrom()); return getNodeEngine().toData(portableReadResultSet); } diff --git a/hazelcast/src/test/java/com/hazelcast/client/protocol/compatibility/ClientCompatibilityNullTest_1_0.java b/hazelcast/src/test/java/com/hazelcast/client/protocol/compatibility/ClientCompatibilityNullTest_1_0.java index 35593abfddaa6..d793f1545fc73 100644 --- a/hazelcast/src/test/java/com/hazelcast/client/protocol/compatibility/ClientCompatibilityNullTest_1_0.java +++ b/hazelcast/src/test/java/com/hazelcast/client/protocol/compatibility/ClientCompatibilityNullTest_1_0.java @@ -5853,7 +5853,7 @@ public void handle( java.util.Collection assertTrue(isEqual(anInt, params.readCount)); assertTrue(isEqual(datas, params.items)); assertTrue(isEqual(null, params.itemSeqs)); - assertFalse(params.lostCountExist); + assertFalse(params.nextSeqExist); } @@ -5752,7 +5752,7 @@ public void handle( java.lang.String assertTrue(isEqual(anInt, params.readCount)); assertTrue(isEqual(datas, params.items)); assertTrue(isEqual(null, params.itemSeqs)); - assertFalse(params.lostCountExist); + assertFalse(params.nextSeqExist); } @@ -6187,7 +6187,7 @@ public void handle( java.util.Collection assertTrue(isEqual(anInt, params.readCount)); assertTrue(isEqual(datas, params.items)); assertTrue(isEqual(null, params.itemSeqs)); - assertTrue(isEqual(aLong, params.lostCount)); + assertTrue(isEqual(aLong, params.nextSeq)); } @@ -5755,7 +5755,7 @@ public void handle( java.lang.String assertTrue(isEqual(anInt, params.readCount)); assertTrue(isEqual(datas, params.items)); assertTrue(isEqual(null, params.itemSeqs)); - assertTrue(isEqual(aLong, params.lostCount)); + assertTrue(isEqual(aLong, params.nextSeq)); } @@ -6185,7 +6185,7 @@ public void handle( java.util.Collection assertTrue(isEqual(anInt, params.readCount)); assertTrue(isEqual(datas, params.items)); assertTrue(isEqual(arrLongs, params.itemSeqs)); - assertFalse(params.lostCountExist); + assertFalse(params.nextSeqExist); } @@ -6043,7 +6043,7 @@ public void handle( java.lang.String assertTrue(isEqual(anInt, params.readCount)); assertTrue(isEqual(datas, params.items)); assertTrue(isEqual(arrLongs, params.itemSeqs)); - assertFalse(params.lostCountExist); + assertFalse(params.nextSeqExist); } @@ -6499,7 +6499,7 @@ public void handle( java.util.Collection assertTrue(isEqual(anInt, params.readCount)); assertTrue(isEqual(datas, params.items)); assertTrue(isEqual(arrLongs, params.itemSeqs)); - assertTrue(isEqual(aLong, params.lostCount)); + assertTrue(isEqual(aLong, params.nextSeq)); } @@ -6047,7 +6047,7 @@ public void handle( java.lang.String assertTrue(isEqual(anInt, params.readCount)); assertTrue(isEqual(datas, params.items)); assertTrue(isEqual(arrLongs, params.itemSeqs)); - assertTrue(isEqual(aLong, params.lostCount)); + assertTrue(isEqual(aLong, params.nextSeq)); } @@ -6498,7 +6498,7 @@ public void handle( java.util.Collection assertTrue(isEqual(anInt, params.readCount)); assertTrue(isEqual(datas, params.items)); assertTrue(isEqual(null, params.itemSeqs)); - assertTrue(isEqual(aLong, params.lostCount)); + assertTrue(isEqual(aLong, params.nextSeq)); } { ClientMessage clientMessage = MultiMapPutCodec.encodeRequest( aString , aData , aData , aLong ); @@ -4314,7 +4314,7 @@ public void handle( java.lang.String assertTrue(isEqual(anInt, params.readCount)); assertTrue(isEqual(datas, params.items)); assertTrue(isEqual(null, params.itemSeqs)); - assertTrue(isEqual(aLong, params.lostCount)); + assertTrue(isEqual(aLong, params.nextSeq)); } { ClientMessage clientMessage = XATransactionClearRemoteCodec.encodeRequest( anXid ); @@ -4621,7 +4621,7 @@ public void handle( java.util.Collection assertTrue(isEqual(anInt, params.readCount)); assertTrue(isEqual(datas, params.items)); assertTrue(isEqual(arrLongs, params.itemSeqs)); - assertTrue(isEqual(aLong, params.lostCount)); + assertTrue(isEqual(aLong, params.nextSeq)); } { ClientMessage clientMessage = MultiMapPutCodec.encodeRequest( aString , aData , aData , aLong ); @@ -4314,7 +4314,7 @@ public void handle( java.lang.String assertTrue(isEqual(anInt, params.readCount)); assertTrue(isEqual(datas, params.items)); assertTrue(isEqual(arrLongs, params.itemSeqs)); - assertTrue(isEqual(aLong, params.lostCount)); + assertTrue(isEqual(aLong, params.nextSeq)); } { ClientMessage clientMessage = XATransactionClearRemoteCodec.encodeRequest( anXid ); @@ -4621,7 +4621,7 @@ public void handle( java.util.Collection