Skip to content

Commit

Permalink
KAFKA-15511: Handle CorruptIndexException in RemoteIndexCache (apache…
Browse files Browse the repository at this point in the history
…#14459)

A bug in the RemoteIndexCache leads to a situation where the cache does not replace the corrupted index with a new index instance fetched from remote storage. This commit fixes the bug by adding correct handling for `CorruptIndexException`.

Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Alexandre Dupriez <duprie@amazon.com>
  • Loading branch information
iit2009060 authored and k-wall committed Oct 10, 2023
1 parent 27828f3 commit 777dc9b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.slf4j.{Logger, LoggerFactory}

import java.io.{File, FileInputStream, IOException}
import java.io.{File, FileInputStream, IOException, PrintWriter}
import java.nio.file.Files
import java.util
import java.util.Collections
Expand Down Expand Up @@ -524,6 +524,23 @@ class RemoteIndexCacheTest {
}
}

@Test
def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
// create Corrupt Offset Index File
createCorruptRemoteIndexCacheOffsetFile()
val entry = cache.getIndexEntry(rlsMetadata)
// Test would fail if it throws corrupt Exception
val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata)
val offsetIndexFile = entry.offsetIndex.file().toPath

assertEquals(expectedOffsetIndexFileName, offsetIndexFile.getFileName.toString)
// assert that parent directory for the index files is correct
assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString,
s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
// file is corrupted it should fetch from remote storage again
verifyFetchIndexInvocation(count = 1)
}

private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
= RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
Expand Down Expand Up @@ -598,4 +615,13 @@ class RemoteIndexCacheTest {
timeIndex.flush()
}
}

private def createCorruptRemoteIndexCacheOffsetFile(): Unit = {
val pw = new PrintWriter(remoteOffsetIndexFile(new File(tpDir, RemoteIndexCache.DIR_NAME), rlsMetadata))
pw.write("Hello, world")
// The size of the string written in the file is 12 bytes,
// but it should be multiple of Offset Index EntrySIZE which is equal to 8.
pw.close()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.github.benmanes.caffeine.cache.RemovalCause;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
Expand Down Expand Up @@ -310,7 +309,7 @@ private <T> T loadIndexFile(File file, RemoteLogSegmentMetadata remoteLogSegment
if (Files.exists(indexFile.toPath())) {
try {
index = readIndex.apply(indexFile);
} catch (CorruptRecordException ex) {
} catch (CorruptIndexException ex) {
log.info("Error occurred while loading the stored index file {}", indexFile.getPath(), ex);
}
}
Expand Down

0 comments on commit 777dc9b

Please sign in to comment.