diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index ea065b8c8126..0b9da1012c71 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -84,13 +84,12 @@ import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.ListIterator; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; @@ -525,6 +524,32 @@ private void maybeUpdateReadOffset(UnifiedLog log) throws RemoteStorageException } } + /** + * Segments which match the following criteria are eligible for copying to remote storage: + * 1) Segment is not the active segment and + * 2) Segment end-offset is less than the last-stable-offset as remote storage should contain only + * committed/acked messages + * @param log The log from which the segments are to be copied + * @param fromOffset The offset from which the segments are to be copied + * @param lastStableOffset The last stable offset of the log + * @return candidate log segments to be copied to remote storage + */ + List candidateLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) { + List candidateLogSegments = new ArrayList<>(); + List segments = JavaConverters.seqAsJavaList(log.logSegments(fromOffset, Long.MAX_VALUE).toSeq()); + if (!segments.isEmpty()) { + for (int idx = 1; idx < segments.size(); idx++) { + LogSegment previousSeg = segments.get(idx - 1); + LogSegment currentSeg = segments.get(idx); + if (currentSeg.baseOffset() <= lastStableOffset) { + candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset())); + } + } + // Discard the last active segment + } + return candidateLogSegments; + } + public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException { if (isCancelled()) return; @@ -538,35 +563,24 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException if (lso < 0) { logger.warn("lastStableOffset for partition {} is {}, which should not be negative.", topicIdPartition, lso); } else if (lso > 0 && copiedOffset < lso) { - // Copy segments only till the last-stable-offset as remote storage should contain only committed/acked - // messages - long toOffset = lso; - logger.debug("Checking for segments to copy, copiedOffset: {} and toOffset: {}", copiedOffset, toOffset); - long activeSegBaseOffset = log.activeSegment().baseOffset(); - // log-start-offset can be ahead of the read-offset, when: + // log-start-offset can be ahead of the copied-offset, when: // 1) log-start-offset gets incremented via delete-records API (or) // 2) enabling the remote log for the first time long fromOffset = Math.max(copiedOffset + 1, log.logStartOffset()); - ArrayList sortedSegments = new ArrayList<>(JavaConverters.asJavaCollection(log.logSegments(fromOffset, toOffset))); - sortedSegments.sort(Comparator.comparingLong(LogSegment::baseOffset)); - List sortedBaseOffsets = sortedSegments.stream().map(LogSegment::baseOffset).collect(Collectors.toList()); - int activeSegIndex = Collections.binarySearch(sortedBaseOffsets, activeSegBaseOffset); - - // sortedSegments becomes empty list when fromOffset and toOffset are same, and activeSegIndex becomes -1 - if (activeSegIndex < 0) { + List candidateLogSegments = candidateLogSegments(log, fromOffset, lso); + logger.debug("Candidate log segments, logStartOffset: {}, copiedOffset: {}, fromOffset: {}, lso: {} " + + "and candidateLogSegments: {}", log.logStartOffset(), copiedOffset, fromOffset, lso, candidateLogSegments); + if (candidateLogSegments.isEmpty()) { logger.debug("No segments found to be copied for partition {} with copiedOffset: {} and active segment's base-offset: {}", - topicIdPartition, copiedOffset, activeSegBaseOffset); + topicIdPartition, copiedOffset, log.activeSegment().baseOffset()); } else { - ListIterator logSegmentsIter = sortedSegments.subList(0, activeSegIndex).listIterator(); - while (logSegmentsIter.hasNext()) { - LogSegment segment = logSegmentsIter.next(); + for (EnrichedLogSegment candidateLogSegment : candidateLogSegments) { if (isCancelled() || !isLeader()) { logger.info("Skipping copying log segments as the current task state is changed, cancelled: {} leader:{}", isCancelled(), isLeader()); return; } - - copyLogSegment(log, segment, getNextSegmentBaseOffset(activeSegBaseOffset, logSegmentsIter)); + copyLogSegment(log, candidateLogSegment.logSegment, candidateLogSegment.nextSegmentOffset); } } } else { @@ -583,18 +597,6 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } - private long getNextSegmentBaseOffset(long activeSegBaseOffset, ListIterator logSegmentsIter) { - long nextSegmentBaseOffset; - if (logSegmentsIter.hasNext()) { - nextSegmentBaseOffset = logSegmentsIter.next().baseOffset(); - logSegmentsIter.previous(); - } else { - nextSegmentBaseOffset = activeSegBaseOffset; - } - - return nextSegmentBaseOffset; - } - private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException { File logFile = segment.log().file(); String logFileName = logFile.getName(); @@ -1002,4 +1004,37 @@ public void close() { } } + // Visible for testing + static class EnrichedLogSegment { + private final LogSegment logSegment; + private final long nextSegmentOffset; + + public EnrichedLogSegment(LogSegment logSegment, + long nextSegmentOffset) { + this.logSegment = logSegment; + this.nextSegmentOffset = nextSegmentOffset; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + EnrichedLogSegment that = (EnrichedLogSegment) o; + return nextSegmentOffset == that.nextSegmentOffset && Objects.equals(logSegment, that.logSegment); + } + + @Override + public int hashCode() { + return Objects.hash(logSegment, nextSegmentOffset); + } + + @Override + public String toString() { + return "EnrichedLogSegment{" + + "logSegment=" + logSegment + + ", nextSegmentOffset=" + nextSegmentOffset + + '}'; + } + } + } diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index c51a34fe3648..36a41f967147 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -269,15 +269,37 @@ void testStartup() { void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception { long oldSegmentStartOffset = 0L; long nextSegmentStartOffset = 150L; - long oldSegmentEndOffset = nextSegmentStartOffset - 1; + long lso = 250L; + long leo = 300L; + assertCopyExpectedLogSegmentsToRemote(oldSegmentStartOffset, nextSegmentStartOffset, lso, leo); + } + + /** + * The following values will be equal when the active segment gets rotated to passive and there are no new messages: + * last-stable-offset = high-water-mark = log-end-offset = base-offset-of-active-segment. + * This test asserts that the active log segment that was rotated after log.roll.ms are copied to remote storage. + */ + @Test + void testCopyLogSegmentToRemoteForStaleTopic() throws Exception { + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + long lso = 150L; + long leo = 150L; + assertCopyExpectedLogSegmentsToRemote(oldSegmentStartOffset, nextSegmentStartOffset, lso, leo); + } + private void assertCopyExpectedLogSegmentsToRemote(long oldSegmentStartOffset, + long nextSegmentStartOffset, + long lastStableOffset, + long logEndOffset) throws Exception { + long oldSegmentEndOffset = nextSegmentStartOffset - 1; when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); // leader epoch preparation checkpoint.write(totalEpochEntries); LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); - when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(-1L)); File tempFile = TestUtils.tempFile(); File mockProducerSnapshotIndex = TestUtils.tempFile(); @@ -288,6 +310,8 @@ void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + verify(oldSegment, times(0)).readNextOffset(); + verify(activeSegment, times(0)).readNextOffset(); FileRecords fileRecords = mock(FileRecords.class); when(oldSegment.log()).thenReturn(fileRecords); @@ -302,7 +326,8 @@ void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception ProducerStateManager mockStateManager = mock(ProducerStateManager.class); when(mockLog.producerStateManager()).thenReturn(mockStateManager); when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); - when(mockLog.lastStableOffset()).thenReturn(250L); + when(mockLog.lastStableOffset()).thenReturn(lastStableOffset); + when(mockLog.logEndOffset()).thenReturn(logEndOffset); LazyIndex idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000); LazyIndex timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500); @@ -823,6 +848,56 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() { } } + @Test + public void testCandidateLogSegmentsSkipsActiveSegment() { + UnifiedLog log = mock(UnifiedLog.class); + LogSegment segment1 = mock(LogSegment.class); + LogSegment segment2 = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + when(segment1.baseOffset()).thenReturn(5L); + when(segment2.baseOffset()).thenReturn(10L); + when(activeSegment.baseOffset()).thenReturn(15L); + + when(log.logSegments(5L, Long.MAX_VALUE)) + .thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, activeSegment))); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition); + List expected = + Arrays.asList( + new RemoteLogManager.EnrichedLogSegment(segment1, 10L), + new RemoteLogManager.EnrichedLogSegment(segment2, 15L) + ); + List actual = task.candidateLogSegments(log, 5L, 20L); + assertEquals(expected, actual); + } + + @Test + public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() { + UnifiedLog log = mock(UnifiedLog.class); + LogSegment segment1 = mock(LogSegment.class); + LogSegment segment2 = mock(LogSegment.class); + LogSegment segment3 = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + when(segment1.baseOffset()).thenReturn(5L); + when(segment2.baseOffset()).thenReturn(10L); + when(segment3.baseOffset()).thenReturn(15L); + when(activeSegment.baseOffset()).thenReturn(20L); + + when(log.logSegments(5L, Long.MAX_VALUE)) + .thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, segment3, activeSegment))); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition); + List expected = + Arrays.asList( + new RemoteLogManager.EnrichedLogSegment(segment1, 10L), + new RemoteLogManager.EnrichedLogSegment(segment2, 15L) + ); + List actual = task.candidateLogSegments(log, 5L, 15L); + assertEquals(expected, actual); + } + private Partition mockPartition(TopicIdPartition topicIdPartition) { TopicPartition tp = topicIdPartition.topicPartition(); Partition partition = mock(Partition.class);