Skip to content

Commit

Permalink
Tolerate event journal reads of stale and future items
Browse files Browse the repository at this point in the history
Previously when the reader requested an event from the event journal
which was already overwritten, the reader would get a
StaleSequenceException. Now we tolerate reading items with stale
sequences and return the oldest events instead, together with a
nextSeq field indicating the sequence from where further reads can
continue.
Also, allowed the reader to request a future, non existent sequence.
This can happen when some of the partitions have been lost, and it will
try to read from an sequence which doesn't exist. In this case, the read
silently returns from the event following the newest event (i.e. always
blocks and waits for the next event in the journal).
Also, reformatted some javadoc to adhere to a ~72 width limit and
removed some unnecessary rolling upgrade checks.

Fixes: hazelcast#11895
  • Loading branch information
Matko Medenjak committed Mar 8, 2018
1 parent 8f373db commit 4132130
Show file tree
Hide file tree
Showing 28 changed files with 67 additions and 57 deletions.
Expand Up @@ -112,7 +112,7 @@ public ReadResultSet<?> decodeClientMessage(ClientMessage message) {
final CacheEventJournalReadCodec.ResponseParameters params = CacheEventJournalReadCodec.decodeResponse(message);
final PortableReadResultSet<?> resultSet = new PortableReadResultSet<Object>(
params.readCount, params.items, params.itemSeqs,
params.lostCountExist ? params.lostCount : -1);
params.nextSeqExist ? params.nextSeq : -1);
resultSet.setSerializationService(getSerializationService());
return resultSet;
}
Expand Down
Expand Up @@ -254,7 +254,7 @@ public ReadResultSet<?> decodeClientMessage(ClientMessage message) {
final MapEventJournalReadCodec.ResponseParameters params = MapEventJournalReadCodec.decodeResponse(message);
final PortableReadResultSet<?> resultSet = new PortableReadResultSet<Object>(
params.readCount, params.items, params.itemSeqs,
params.lostCountExist ? params.lostCount : -1);
params.nextSeqExist ? params.nextSeq : -1);
resultSet.setSerializationService(getSerializationService());
return resultSet;
}
Expand Down
Expand Up @@ -96,7 +96,7 @@ public PortableReadResultSet decodeClientMessage(ClientMessage clientMessage) {
final RingbufferReadManyCodec.ResponseParameters params = RingbufferReadManyCodec.decodeResponse(clientMessage);
final PortableReadResultSet readResultSet = new PortableReadResultSet(
params.readCount, params.items, params.itemSeqs,
params.lostCountExist ? params.lostCount : -1);
params.nextSeqExist ? params.nextSeq : -1);
readResultSet.setSerializationService(getSerializationService());
return readResultSet;
}
Expand Down
Expand Up @@ -86,7 +86,7 @@ protected ClientMessage encodeResponse(Object response) {
seqs[k] = resultSet.getSequence(k);
}

return CacheEventJournalReadCodec.encodeResponse(resultSet.readCount(), items, seqs, resultSet.getLostCount());
return CacheEventJournalReadCodec.encodeResponse(resultSet.readCount(), items, seqs, resultSet.getNextSequenceToReadFrom());
}

@Override
Expand Down
Expand Up @@ -85,7 +85,7 @@ protected ClientMessage encodeResponse(Object response) {
seqs[k] = resultSet.getSequence(k);
}

return MapEventJournalReadCodec.encodeResponse(resultSet.readCount(), items, seqs, resultSet.getLostCount());
return MapEventJournalReadCodec.encodeResponse(resultSet.readCount(), items, seqs, resultSet.getNextSequenceToReadFrom());
}

@Override
Expand Down
Expand Up @@ -69,7 +69,7 @@ protected ClientMessage encodeResponse(Object response) {
seqs[k] = resultSet.getSequence(k);
}

return RingbufferReadManyCodec.encodeResponse(resultSet.readCount(), items, seqs, resultSet.getLostCount());
return RingbufferReadManyCodec.encodeResponse(resultSet.readCount(), items, seqs, resultSet.getNextSequenceToReadFrom());
}

@Override
Expand Down
Expand Up @@ -119,7 +119,7 @@ public void beforeRun() {
public boolean shouldWait() {
if (resultSet == null) {
resultSet = createResultSet();
resultSet.setLostCount(lostEventCount);
resultSet.setNextSequenceToReadFrom(lostEventCount);
sequence = startSequence;
}

Expand Down
Expand Up @@ -76,13 +76,23 @@ public interface ReadResultSet<E> extends Iterable<E> {
int size();

/**
* Returns the number of lost items. Items can be skipped if the items
* with the requested sequence were already overwritten with newer
* items and fast-forwarding to the oldest item is permitted.
* If the information is not available, returns {@code -1}.
* Returns the sequence of the item following the last read item. This
* sequence can then be used to read items following the ones returned by
* this result set.
* Usually this sequence is equal to the sequence used to retrieve this
* result set incremented by the {@link #readCount()}. In cases when the
* reader tolerates lost items, this is not the case.
* For instance, if the reader requests an item with a stale sequence (one
* which has already been overwritten), the read will jump to the oldest
* sequence and read from there.
* Similarly, if the reader requests an item in the future (e.g. because
* the partition was lost and the reader was unaware of this), the read
* method will jump back to the newest available sequence.
* Because of these jumps and only in the case when the reader is loss
* tolerant, the next sequence must be retrieved using this method.
*
* @return the number of lost items
* @since 3.10
*/
long getLostCount();
long getNextSequenceToReadFrom();
}
Expand Up @@ -65,7 +65,7 @@ public class ReadResultSetImpl<O, E> extends AbstractList<E>
private long[] seqs;
private int size;
private int readCount;
private long lostCount;
private long nextSeq;

public ReadResultSetImpl() {
}
Expand Down Expand Up @@ -196,12 +196,12 @@ public int getId() {
}

@Override
public long getLostCount() {
return lostCount;
public long getNextSequenceToReadFrom() {
return nextSeq;
}

public void setLostCount(long lostCount) {
this.lostCount = lostCount;
public void setNextSequenceToReadFrom(long nextSeq) {
this.nextSeq = nextSeq;
}

@Override
Expand All @@ -214,7 +214,7 @@ public void writeData(ObjectDataOutput out) throws IOException {
out.writeLongArray(seqs);
// RU_COMPAT_3_9
if (out.getVersion().isGreaterOrEqual(Versions.V3_10)) {
out.writeLong(lostCount);
out.writeLong(nextSeq);
}
}

Expand All @@ -229,9 +229,9 @@ public void readData(ObjectDataInput in) throws IOException {
seqs = in.readLongArray();
// RU_COMPAT_3_9
if (in.getVersion().isGreaterOrEqual(Versions.V3_10)) {
lostCount = in.readLong();
nextSeq = in.readLong();
} else {
lostCount = -1;
nextSeq = -1;
}
}
}
Expand Up @@ -37,7 +37,7 @@
import static java.util.Collections.unmodifiableList;

public class PortableReadResultSet<E> implements Portable, ReadResultSet<E> {
private long lostCount;
private long nextSeq;
private List<Data> items;
private int readCount;
private SerializationService serializationService;
Expand All @@ -47,11 +47,11 @@ public PortableReadResultSet() {
}

@SuppressFBWarnings("EI_EXPOSE_REP2")
public PortableReadResultSet(int readCount, List<Data> items, long[] seqs, long lostCount) {
public PortableReadResultSet(int readCount, List<Data> items, long[] seqs, long nextSeq) {
this.readCount = readCount;
this.items = items;
this.seqs = seqs;
this.lostCount = lostCount;
this.nextSeq = nextSeq;
}

public List<Data> getDataItems() {
Expand Down Expand Up @@ -93,8 +93,8 @@ public int size() {
}

@Override
public long getLostCount() {
return lostCount;
public long getNextSequenceToReadFrom() {
return nextSeq;
}

@Override
Expand All @@ -120,7 +120,7 @@ public void writePortable(PortableWriter writer) throws IOException {
rawDataOutput.writeLongArray(seqs);
// RU_COMPAT_3_9
if (rawDataOutput.getVersion().isGreaterOrEqual(Versions.V3_10)) {
rawDataOutput.writeLong(lostCount);
rawDataOutput.writeLong(nextSeq);
}
}

Expand All @@ -139,7 +139,7 @@ public void readPortable(PortableReader reader) throws IOException {
seqs = rawDataInput.readLongArray();
// RU_COMPAT_3_9
if (rawDataInput.getVersion().isGreaterOrEqual(Versions.V3_10)) {
lostCount = rawDataInput.readLong();
nextSeq = rawDataInput.readLong();
}
}
}
Expand Up @@ -115,7 +115,7 @@ public Object getResponse() {
}

final PortableReadResultSet portableReadResultSet = new PortableReadResultSet(
resultSet.readCount(), items, seqs, resultSet.getLostCount());
resultSet.readCount(), items, seqs, resultSet.getNextSequenceToReadFrom());
return getNodeEngine().toData(portableReadResultSet);
}

Expand Down
Expand Up @@ -5853,7 +5853,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertFalse(params.itemSeqsExist);
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down
Expand Up @@ -5898,7 +5898,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertFalse(params.itemSeqsExist);
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down
Expand Up @@ -5828,7 +5828,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertFalse(params.itemSeqsExist);
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down
Expand Up @@ -5818,7 +5818,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertFalse(params.itemSeqsExist);
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down
Expand Up @@ -6047,7 +6047,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertFalse(params.itemSeqsExist);
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down
Expand Up @@ -1732,7 +1732,7 @@ public void handle( java.util.Collection<com.hazelcast.nio.serialization.Data>
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertTrue(isEqual(null, params.itemSeqs));
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down Expand Up @@ -5752,7 +5752,7 @@ public void handle( java.lang.String
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertTrue(isEqual(null, params.itemSeqs));
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down Expand Up @@ -6187,7 +6187,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertTrue(isEqual(null, params.itemSeqs));
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down
Expand Up @@ -1727,7 +1727,7 @@ public void handle( java.util.Collection<com.hazelcast.nio.serialization.Data>
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertTrue(isEqual(null, params.itemSeqs));
assertTrue(isEqual(aLong, params.lostCount));
assertTrue(isEqual(aLong, params.nextSeq));
}


Expand Down Expand Up @@ -5755,7 +5755,7 @@ public void handle( java.lang.String
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertTrue(isEqual(null, params.itemSeqs));
assertTrue(isEqual(aLong, params.lostCount));
assertTrue(isEqual(aLong, params.nextSeq));
}


Expand Down Expand Up @@ -6185,7 +6185,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertTrue(isEqual(null, params.itemSeqs));
assertTrue(isEqual(aLong, params.lostCount));
assertTrue(isEqual(aLong, params.nextSeq));
}


Expand Down
Expand Up @@ -6143,7 +6143,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertFalse(params.itemSeqsExist);
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down
Expand Up @@ -6191,7 +6191,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertFalse(params.itemSeqsExist);
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down
Expand Up @@ -6121,7 +6121,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertFalse(params.itemSeqsExist);
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down
Expand Up @@ -6111,7 +6111,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertFalse(params.itemSeqsExist);
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down
Expand Up @@ -6351,7 +6351,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertFalse(params.itemSeqsExist);
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down
Expand Up @@ -1816,7 +1816,7 @@ public void handle( java.util.Collection<com.hazelcast.nio.serialization.Data>
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertTrue(isEqual(arrLongs, params.itemSeqs));
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down Expand Up @@ -6043,7 +6043,7 @@ public void handle( java.lang.String
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertTrue(isEqual(arrLongs, params.itemSeqs));
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down Expand Up @@ -6499,7 +6499,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertTrue(isEqual(arrLongs, params.itemSeqs));
assertFalse(params.lostCountExist);
assertFalse(params.nextSeqExist);
}


Expand Down
Expand Up @@ -1811,7 +1811,7 @@ public void handle( java.util.Collection<com.hazelcast.nio.serialization.Data>
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertTrue(isEqual(arrLongs, params.itemSeqs));
assertTrue(isEqual(aLong, params.lostCount));
assertTrue(isEqual(aLong, params.nextSeq));
}


Expand Down Expand Up @@ -6047,7 +6047,7 @@ public void handle( java.lang.String
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertTrue(isEqual(arrLongs, params.itemSeqs));
assertTrue(isEqual(aLong, params.lostCount));
assertTrue(isEqual(aLong, params.nextSeq));
}


Expand Down Expand Up @@ -6498,7 +6498,7 @@ public void handle( java.util.Collection<com.hazelcast.map.impl.querycache.even
assertTrue(isEqual(anInt, params.readCount));
assertTrue(isEqual(datas, params.items));
assertTrue(isEqual(arrLongs, params.itemSeqs));
assertTrue(isEqual(aLong, params.lostCount));
assertTrue(isEqual(aLong, params.nextSeq));
}


Expand Down

0 comments on commit 4132130

Please sign in to comment.