Skip to content

Commit

Permalink
KAFKA-15272: Fix the logic which finds candidate log segments to uplo…
Browse files Browse the repository at this point in the history
…ad it to tiered storage (apache#14128)

In tiered storage, a segment is eligible for deletion from local disk when it gets uploaded to the remote storage.

If the topic active segment contains some messages and there are no new incoming messages, then the active segment gets rotated to passive segment after the configured log.roll.ms timeout.

The logic to find the candidate segment in RemoteLogManager does not include the recently rotated passive segment as eligible to upload it to remote storage so the passive segment won't be removed even after if it breaches by retention time/size. (ie) Topic won't be empty after it becomes stale.

Added unit test to cover the scenario which will fail without this patch.

Reviewers: Christo Lolov <lolovc@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
  • Loading branch information
kamalcph committed Aug 2, 2023
1 parent 0ce1640 commit ffe5f9f
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 36 deletions.
101 changes: 68 additions & 33 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,12 @@
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
Expand Down Expand Up @@ -525,6 +524,32 @@ private void maybeUpdateReadOffset(UnifiedLog log) throws RemoteStorageException
}
}

/**
* Segments which match the following criteria are eligible for copying to remote storage:
* 1) Segment is not the active segment and
* 2) Segment end-offset is less than the last-stable-offset as remote storage should contain only
* committed/acked messages
* @param log The log from which the segments are to be copied
* @param fromOffset The offset from which the segments are to be copied
* @param lastStableOffset The last stable offset of the log
* @return candidate log segments to be copied to remote storage
*/
List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) {
List<EnrichedLogSegment> candidateLogSegments = new ArrayList<>();
List<LogSegment> segments = JavaConverters.seqAsJavaList(log.logSegments(fromOffset, Long.MAX_VALUE).toSeq());
if (!segments.isEmpty()) {
for (int idx = 1; idx < segments.size(); idx++) {
LogSegment previousSeg = segments.get(idx - 1);
LogSegment currentSeg = segments.get(idx);
if (currentSeg.baseOffset() <= lastStableOffset) {
candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
}
}
// Discard the last active segment
}
return candidateLogSegments;
}

public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException {
if (isCancelled())
return;
Expand All @@ -538,35 +563,24 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
if (lso < 0) {
logger.warn("lastStableOffset for partition {} is {}, which should not be negative.", topicIdPartition, lso);
} else if (lso > 0 && copiedOffset < lso) {
// Copy segments only till the last-stable-offset as remote storage should contain only committed/acked
// messages
long toOffset = lso;
logger.debug("Checking for segments to copy, copiedOffset: {} and toOffset: {}", copiedOffset, toOffset);
long activeSegBaseOffset = log.activeSegment().baseOffset();
// log-start-offset can be ahead of the read-offset, when:
// log-start-offset can be ahead of the copied-offset, when:
// 1) log-start-offset gets incremented via delete-records API (or)
// 2) enabling the remote log for the first time
long fromOffset = Math.max(copiedOffset + 1, log.logStartOffset());
ArrayList<LogSegment> sortedSegments = new ArrayList<>(JavaConverters.asJavaCollection(log.logSegments(fromOffset, toOffset)));
sortedSegments.sort(Comparator.comparingLong(LogSegment::baseOffset));
List<Long> sortedBaseOffsets = sortedSegments.stream().map(LogSegment::baseOffset).collect(Collectors.toList());
int activeSegIndex = Collections.binarySearch(sortedBaseOffsets, activeSegBaseOffset);

// sortedSegments becomes empty list when fromOffset and toOffset are same, and activeSegIndex becomes -1
if (activeSegIndex < 0) {
List<EnrichedLogSegment> candidateLogSegments = candidateLogSegments(log, fromOffset, lso);
logger.debug("Candidate log segments, logStartOffset: {}, copiedOffset: {}, fromOffset: {}, lso: {} " +
"and candidateLogSegments: {}", log.logStartOffset(), copiedOffset, fromOffset, lso, candidateLogSegments);
if (candidateLogSegments.isEmpty()) {
logger.debug("No segments found to be copied for partition {} with copiedOffset: {} and active segment's base-offset: {}",
topicIdPartition, copiedOffset, activeSegBaseOffset);
topicIdPartition, copiedOffset, log.activeSegment().baseOffset());
} else {
ListIterator<LogSegment> logSegmentsIter = sortedSegments.subList(0, activeSegIndex).listIterator();
while (logSegmentsIter.hasNext()) {
LogSegment segment = logSegmentsIter.next();
for (EnrichedLogSegment candidateLogSegment : candidateLogSegments) {
if (isCancelled() || !isLeader()) {
logger.info("Skipping copying log segments as the current task state is changed, cancelled: {} leader:{}",
isCancelled(), isLeader());
return;
}

copyLogSegment(log, segment, getNextSegmentBaseOffset(activeSegBaseOffset, logSegmentsIter));
copyLogSegment(log, candidateLogSegment.logSegment, candidateLogSegment.nextSegmentOffset);
}
}
} else {
Expand All @@ -583,18 +597,6 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
}
}

private long getNextSegmentBaseOffset(long activeSegBaseOffset, ListIterator<LogSegment> logSegmentsIter) {
long nextSegmentBaseOffset;
if (logSegmentsIter.hasNext()) {
nextSegmentBaseOffset = logSegmentsIter.next().baseOffset();
logSegmentsIter.previous();
} else {
nextSegmentBaseOffset = activeSegBaseOffset;
}

return nextSegmentBaseOffset;
}

private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException {
File logFile = segment.log().file();
String logFileName = logFile.getName();
Expand Down Expand Up @@ -1002,4 +1004,37 @@ public void close() {
}
}

// Visible for testing
static class EnrichedLogSegment {
private final LogSegment logSegment;
private final long nextSegmentOffset;

public EnrichedLogSegment(LogSegment logSegment,
long nextSegmentOffset) {
this.logSegment = logSegment;
this.nextSegmentOffset = nextSegmentOffset;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EnrichedLogSegment that = (EnrichedLogSegment) o;
return nextSegmentOffset == that.nextSegmentOffset && Objects.equals(logSegment, that.logSegment);
}

@Override
public int hashCode() {
return Objects.hash(logSegment, nextSegmentOffset);
}

@Override
public String toString() {
return "EnrichedLogSegment{" +
"logSegment=" + logSegment +
", nextSegmentOffset=" + nextSegmentOffset +
'}';
}
}

}
81 changes: 78 additions & 3 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,37 @@ void testStartup() {
void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception {
long oldSegmentStartOffset = 0L;
long nextSegmentStartOffset = 150L;
long oldSegmentEndOffset = nextSegmentStartOffset - 1;
long lso = 250L;
long leo = 300L;
assertCopyExpectedLogSegmentsToRemote(oldSegmentStartOffset, nextSegmentStartOffset, lso, leo);
}

/**
* The following values will be equal when the active segment gets rotated to passive and there are no new messages:
* last-stable-offset = high-water-mark = log-end-offset = base-offset-of-active-segment.
* This test asserts that the active log segment that was rotated after log.roll.ms are copied to remote storage.
*/
@Test
void testCopyLogSegmentToRemoteForStaleTopic() throws Exception {
long oldSegmentStartOffset = 0L;
long nextSegmentStartOffset = 150L;
long lso = 150L;
long leo = 150L;
assertCopyExpectedLogSegmentsToRemote(oldSegmentStartOffset, nextSegmentStartOffset, lso, leo);
}

private void assertCopyExpectedLogSegmentsToRemote(long oldSegmentStartOffset,
long nextSegmentStartOffset,
long lastStableOffset,
long logEndOffset) throws Exception {
long oldSegmentEndOffset = nextSegmentStartOffset - 1;
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());

// leader epoch preparation
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(-1L));

File tempFile = TestUtils.tempFile();
File mockProducerSnapshotIndex = TestUtils.tempFile();
Expand All @@ -288,6 +310,8 @@ void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception

when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
verify(oldSegment, times(0)).readNextOffset();
verify(activeSegment, times(0)).readNextOffset();

FileRecords fileRecords = mock(FileRecords.class);
when(oldSegment.log()).thenReturn(fileRecords);
Expand All @@ -302,7 +326,8 @@ void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception
ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
when(mockLog.lastStableOffset()).thenReturn(250L);
when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
when(mockLog.logEndOffset()).thenReturn(logEndOffset);

LazyIndex idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000);
LazyIndex timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500);
Expand Down Expand Up @@ -823,6 +848,56 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
}
}

@Test
public void testCandidateLogSegmentsSkipsActiveSegment() {
UnifiedLog log = mock(UnifiedLog.class);
LogSegment segment1 = mock(LogSegment.class);
LogSegment segment2 = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);

when(segment1.baseOffset()).thenReturn(5L);
when(segment2.baseOffset()).thenReturn(10L);
when(activeSegment.baseOffset()).thenReturn(15L);

when(log.logSegments(5L, Long.MAX_VALUE))
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, activeSegment)));

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
List<RemoteLogManager.EnrichedLogSegment> expected =
Arrays.asList(
new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
new RemoteLogManager.EnrichedLogSegment(segment2, 15L)
);
List<RemoteLogManager.EnrichedLogSegment> actual = task.candidateLogSegments(log, 5L, 20L);
assertEquals(expected, actual);
}

@Test
public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() {
UnifiedLog log = mock(UnifiedLog.class);
LogSegment segment1 = mock(LogSegment.class);
LogSegment segment2 = mock(LogSegment.class);
LogSegment segment3 = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);

when(segment1.baseOffset()).thenReturn(5L);
when(segment2.baseOffset()).thenReturn(10L);
when(segment3.baseOffset()).thenReturn(15L);
when(activeSegment.baseOffset()).thenReturn(20L);

when(log.logSegments(5L, Long.MAX_VALUE))
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, segment3, activeSegment)));

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
List<RemoteLogManager.EnrichedLogSegment> expected =
Arrays.asList(
new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
new RemoteLogManager.EnrichedLogSegment(segment2, 15L)
);
List<RemoteLogManager.EnrichedLogSegment> actual = task.candidateLogSegments(log, 5L, 15L);
assertEquals(expected, actual);
}

private Partition mockPartition(TopicIdPartition topicIdPartition) {
TopicPartition tp = topicIdPartition.topicPartition();
Partition partition = mock(Partition.class);
Expand Down

0 comments on commit ffe5f9f

Please sign in to comment.