Skip to content

Commit

Permalink
KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced (
Browse files Browse the repository at this point in the history
apache#14156)

Reviewers: Chris Egerton <chrise@aiven.io>
  • Loading branch information
gharris1727 authored and jeqo committed Aug 15, 2023
1 parent 62c0617 commit 239a470
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSy
// Make a copy of the array before mutating it, so that readers do not see inconsistent data
// TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics.
OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
updateSyncArray(mutableSyncs, offsetSync);
updateSyncArray(mutableSyncs, syncs, offsetSync);
if (log.isTraceEnabled()) {
log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs));
}
Expand Down Expand Up @@ -227,7 +227,7 @@ private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
}
}

private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
private void updateSyncArray(OffsetSync[] syncs, OffsetSync[] original, OffsetSync offsetSync) {
long upstreamOffset = offsetSync.upstreamOffset();
// While reading to the end of the topic, ensure that our earliest sync is later than
// any earlier sync that could have been used for translation, to preserve monotonicity
Expand All @@ -237,29 +237,49 @@ private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
return;
}
OffsetSync replacement = offsetSync;
// The most-recently-discarded offset sync
// We track this since it may still be eligible for use in the syncs array at a later index
OffsetSync oldValue = syncs[0];
// Index into the original syncs array to the replacement we'll consider next.
int replacementIndex = 0;
// Invariant A is always violated once a new sync appears.
// Repair Invariant A: the latest sync must always be updated
syncs[0] = replacement;
for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
int previous = current - 1;

// We can potentially use oldValue instead of replacement, allowing us to keep more distinct values stored
// If oldValue is not recent, it should be expired from the store
boolean isRecent = invariantB(syncs[previous], oldValue, previous, current);
// Ensure that this value is sufficiently separated from the previous value
// We prefer to keep more recent syncs of similar precision (i.e. the value in replacement)
boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous);
// Ensure that this value is sufficiently separated from the next value
// We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next])
int next = current + 1;
boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current);
// If this condition is false, oldValue will be expired from the store and lost forever.
if (isRecent && separatedFromPrevious && separatedFromNext) {
replacement = oldValue;
}
// Try to choose a value from the old array as the replacement
// This allows us to keep more distinct values stored overall, improving translation.
boolean skipOldValue;
do {
OffsetSync oldValue = original[replacementIndex];
// If oldValue is not recent enough, then it is not valid to use at the current index.
// It may still be valid when used in a later index where values are allowed to be older.
boolean isRecent = invariantB(syncs[previous], oldValue, previous, current);
// Ensure that this value is sufficiently separated from the previous value
// We prefer to keep more recent syncs of similar precision (i.e. the value in replacement)
// If this value is too close to the previous value, it will never be valid in a later position.
boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous);
// Ensure that this value is sufficiently separated from the next value
// We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next])
int next = current + 1;
boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current);
// If the next value in the old array is a duplicate of the current one, then they are equivalent
// This value will not need to be considered again
int nextReplacement = replacementIndex + 1;
boolean duplicate = nextReplacement < SYNCS_PER_PARTITION && oldValue == original[nextReplacement];

// Promoting the oldValue to the replacement only happens if it satisfies all invariants.
boolean promoteOldValueToReplacement = isRecent && separatedFromPrevious && separatedFromNext;
if (promoteOldValueToReplacement) {
replacement = oldValue;
}
// The index should be skipped without promoting if we know that it will not be used at a later index
// based only on the observed part of the array so far.
skipOldValue = duplicate || !separatedFromPrevious;
if (promoteOldValueToReplacement || skipOldValue) {
replacementIndex++;
}
// We may need to skip past multiple indices, so keep looping until we're done skipping forward.
// The index must not get ahead of the current index, as we only promote from low index to high index.
} while (replacementIndex < current && skipOldValue);

// The replacement variable always contains a value which satisfies the invariants for this index.
// This replacement may or may not be used, since the invariants could already be satisfied,
Expand All @@ -273,8 +293,7 @@ private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
break;
} else {
// Invariant B violated for syncs[current]: sync is now too old and must be updated
// Repair Invariant B: swap in replacement, and save the old value for the next iteration
oldValue = syncs[current];
// Repair Invariant B: swap in replacement
syncs[current] = replacement;

assert invariantB(syncs[previous], syncs[current], previous, current);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.junit.jupiter.api.Test;

import java.util.OptionalLong;
import java.util.PrimitiveIterator;
import java.util.Random;
import java.util.stream.LongStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -153,31 +156,85 @@ public void testPastOffsetTranslation() {
}

@Test
public void testKeepMostDistinctSyncs() {
public void testConsistentlySpacedSyncs() {
// Under normal operation, the incoming syncs will be regularly spaced and the store should keep a set of syncs
// which provide the best translation accuracy (expires as few syncs as possible)
// Each new sync should be added to the cache and expire at most one other sync from the cache
long iterations = 10000;
long iterations = 100;
long maxStep = Long.MAX_VALUE / iterations;
// Test a variety of steps (corresponding to the offset.lag.max configuration)
for (long step = 1; step < maxStep; step = (step * 2) + 1) {
for (long firstOffset = 0; firstOffset < 30; firstOffset++) {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
int lastCount = 1;
store.start();
for (long offset = firstOffset; offset <= iterations; offset += step) {
store.sync(tp, offset, offset);
// Invariant A: the latest sync is present
assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
// Invariant D: the earliest sync is present
assertEquals(firstOffset, store.syncFor(tp, 63).upstreamOffset());
int count = countDistinctStoredSyncs(store, tp);
int diff = count - lastCount;
assertTrue(diff >= 0,
"Store expired too many syncs: " + diff + " after receiving offset " + offset);
lastCount = count;
}
}
long finalStep = step;
// Generate a stream of consistently spaced syncs
// Each new sync should be added to the cache and expire at most one other sync from the cache
assertSyncSpacingHasBoundedExpirations(firstOffset, LongStream.generate(() -> finalStep).limit(iterations), 1);
}
}
}

@Test
public void testRandomlySpacedSyncs() {
Random random = new Random(0L); // arbitrary but deterministic seed
int iterationBits = 10;
long iterations = 1 << iterationBits;
for (int n = 1; n < Long.SIZE - iterationBits; n++) {
// A stream with at most n bits of difference between the largest and smallest steps
// will expire n + 2 syncs at once in the worst case, because the sync store is laid out exponentially.
long maximumDifference = 1L << n;
int maximumExpirations = n + 2;
assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, 0L, maximumDifference), maximumExpirations);
// This holds true even if there is a larger minimum step size, such as caused by offsetLagMax
long offsetLagMax = 1L << 16;
assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, offsetLagMax, offsetLagMax + maximumDifference), maximumExpirations);
}
}

@Test
public void testDroppedSyncsSpacing() {
Random random = new Random(0L); // arbitrary but deterministic seed
long iterations = 10000;
long offsetLagMax = 100;
// Half of the gaps will be offsetLagMax, and half will be double that, as if one intervening sync was dropped.
LongStream stream = random.doubles()
.mapToLong(d -> (d < 0.5 ? 2 : 1) * offsetLagMax)
.limit(iterations);
// This will cause up to 2 syncs to be discarded, because a sequence of two adjacent syncs followed by a
// dropped sync will set up the following situation
// before [d....d,c,b,a....]
// after [e......e,d,a....]
// and syncs b and c are discarded to make room for e and the demoted sync d.
assertSyncSpacingHasBoundedExpirations(0, stream, 2);
}

/**
* Simulate an OffsetSyncStore receiving a sequence of offset syncs as defined by their start offset and gaps.
* After processing each simulated sync, assert that the store has not expired more unique syncs than the bound.
* @param firstOffset First offset to give to the sync store after starting
* @param steps A finite stream of gaps between syncs with some known distribution
* @param maximumExpirations The maximum number of distinct syncs allowed to be expired after a single update.
*/
private void assertSyncSpacingHasBoundedExpirations(long firstOffset, LongStream steps, int maximumExpirations) {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
store.start();
store.sync(tp, firstOffset, firstOffset);
PrimitiveIterator.OfLong iterator = steps.iterator();
long offset = firstOffset;
int lastCount = 1;
while (iterator.hasNext()) {
offset += iterator.nextLong();
assertTrue(offset >= 0, "Test is invalid, offset overflowed");
store.sync(tp, offset, offset);
// Invariant A: the latest sync is present
assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
// Invariant D: the earliest sync is present
assertEquals(firstOffset, store.syncFor(tp, 63).upstreamOffset());
int count = countDistinctStoredSyncs(store, tp);
// We are adding one sync, so if the count didn't change, then exactly one sync expired.
int expiredSyncs = lastCount - count + 1;
assertTrue(expiredSyncs <= maximumExpirations,
"Store expired too many syncs: " + expiredSyncs + " > " + maximumExpirations
+ " after receiving offset " + offset);
lastCount = count;
}
}
}
Expand Down

0 comments on commit 239a470

Please sign in to comment.