Skip to content

Commit

Permalink
Adding tests for legacy store offset file (apache#1019)
Browse files Browse the repository at this point in the history
* Adding parameterized tests for legacy-offset file

* Adding test for checking precedence between offset files
  • Loading branch information
rmatharu-zz authored and cameronlee314 committed May 2, 2019
1 parent 12e4263 commit fca8437
Showing 1 changed file with 82 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import org.apache.samza.system._
import org.apache.samza.task.TaskInstanceCollector
import org.apache.samza.util.{FileUtil, SystemClock}
import org.junit.Assert._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
import org.junit.{After, Before, Test}
import org.mockito.Matchers._
import org.mockito.{Matchers, Mockito}
Expand All @@ -47,7 +50,14 @@ import scala.collection.JavaConverters._
import scala.collection.immutable.HashMap
import scala.collection.mutable

class TestTaskStorageManager extends MockitoSugar {
/**
* This test is parameterized on the offsetFileName and is run for both
* StorageManagerUtil.OFFSET_FILE_NAME_LEGACY and StorageManagerUtil.OFFSET_FILE_NAME_NEW.
*
* @param offsetFileName the name of the offset file.
*/
@RunWith(value = classOf[Parameterized])
class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {

val store = "store1"
val loggedStore = "loggedStore1"
Expand Down Expand Up @@ -84,7 +94,7 @@ class TestTaskStorageManager extends MockitoSugar {
val ssp = new SystemStreamPartition(ss, partition)
val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
val storeFile = new File(storeDirectory, "store.sst")
val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
val offsetFile = new File(storeDirectory, offsetFileName)

val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, storeFile)

Expand Down Expand Up @@ -125,14 +135,14 @@ class TestTaskStorageManager extends MockitoSugar {
// Test 2: flush should update the offset file
taskManager.flush()
assertTrue(offsetFile.exists())
assertEquals("{\"kafka.testStream-loggedStore1.0\":\"50\"}", FileUtil.readWithChecksum(offsetFile))
validateOffsetFileContents(offsetFile, "kafka.testStream-loggedStore1.0", "50")

// Test 3: Update sspMetadata before shutdown and verify that offset file is updated correctly
when(mockSSPMetadataCache.getMetadata(ssp)).thenReturn(new SystemStreamPartitionMetadata("0", "100", "101"))
taskManager.stop()
assertTrue(storeFile.exists())
assertTrue(offsetFile.exists())
assertEquals("{\"kafka.testStream-loggedStore1.0\":\"100\"}", FileUtil.readWithChecksum(offsetFile))
validateOffsetFileContents(offsetFile, "kafka.testStream-loggedStore1.0", "100")

// Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the correct offset
sspMetadata = new SystemStreamPartitionMetadata("0", "150", "151")
Expand All @@ -151,7 +161,7 @@ class TestTaskStorageManager extends MockitoSugar {
.setStreamMetadataCache(mockStreamMetadataCache)
.setSSPMetadataCache(mockSSPMetadataCache)
.setSystemAdmin("kafka", mockSystemAdmin)
.initializeContainerStorageManager()
.initializeContainerStorageManager()
.build

taskManager.init
Expand Down Expand Up @@ -263,7 +273,7 @@ class TestTaskStorageManager extends MockitoSugar {

@Test
def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() {
val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), StorageManagerUtil.OFFSET_FILE_NAME_NEW)
val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), offsetFileName)
FileUtil.writeWithChecksum(offsetFilePath, "100")

val taskStorageManager = new TaskStorageManagerBuilder()
Expand All @@ -281,7 +291,7 @@ class TestTaskStorageManager extends MockitoSugar {
// is older than deletionRetention of the changeLog.
val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
storeDirectory.setLastModified(0)
val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
val offsetFile = new File(storeDirectory, offsetFileName)
offsetFile.createNewFile()
FileUtil.writeWithChecksum(offsetFile, "Test Offset Data")
offsetFile.setLastModified(0)
Expand All @@ -298,7 +308,7 @@ class TestTaskStorageManager extends MockitoSugar {

@Test
def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), StorageManagerUtil.OFFSET_FILE_NAME_NEW)
val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), offsetFileName)
FileUtil.writeWithChecksum(offsetFilePath, "100")

val taskStorageManager = new TaskStorageManagerBuilder()
Expand All @@ -315,7 +325,7 @@ class TestTaskStorageManager extends MockitoSugar {
val partition = new Partition(0)

val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
val offsetFile = new File(storeDirectory, offsetFileName)

val sspMetadataCache = mock[SSPMetadataCache]
val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
Expand Down Expand Up @@ -345,7 +355,7 @@ class TestTaskStorageManager extends MockitoSugar {

//Check conditions
assertTrue("Offset file doesn't exist!", offsetFile.exists())
assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"100\"}", FileUtil.readWithChecksum(offsetFile))
validateOffsetFileContents(offsetFile, "kafka.testStream-loggedStore1.0", "100")
}

/**
Expand All @@ -355,9 +365,9 @@ class TestTaskStorageManager extends MockitoSugar {
def testFlushCreatesOffsetFileForLoggedStore() {
val partition = new Partition(0)

val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
val anotherOffsetPath = new File(
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName, TaskMode.Active) + File.separator + offsetFileName)

val sspMetadataCache = mock[SSPMetadataCache]
val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
Expand All @@ -381,7 +391,7 @@ class TestTaskStorageManager extends MockitoSugar {

//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "100")

assertTrue("Offset file got created for a store that is not persisted to the disk!!", !anotherOffsetPath.exists())
}
Expand All @@ -393,7 +403,7 @@ class TestTaskStorageManager extends MockitoSugar {
def testFlushDeletesOffsetFileForLoggedStoreForEmptyPartition() {
val partition = new Partition(0)

val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)

val sspMetadataCache = mock[SSPMetadataCache]
val sspMetadata = new SystemStreamPartitionMetadata("0", "100", "101")
Expand Down Expand Up @@ -426,7 +436,7 @@ class TestTaskStorageManager extends MockitoSugar {

//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "100")

//Invoke test method again
taskStorageManager.flush()
Expand All @@ -440,7 +450,7 @@ class TestTaskStorageManager extends MockitoSugar {
val partition = new Partition(0)
val ssp = new SystemStreamPartition("kafka", getStreamName(loggedStore), partition)

val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
FileUtil.writeWithChecksum(offsetFilePath, "100")

val sspMetadataCache = mock[SSPMetadataCache]
Expand Down Expand Up @@ -470,7 +480,7 @@ class TestTaskStorageManager extends MockitoSugar {

//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"139\"}", FileUtil.readWithChecksum(offsetFilePath))
validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "139")

// Flush again
when(sspMetadataCache.getMetadata(ssp)).thenReturn(new SystemStreamPartitionMetadata("20", "193", "194"))
Expand All @@ -480,14 +490,32 @@ class TestTaskStorageManager extends MockitoSugar {

//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"193\"}", FileUtil.readWithChecksum(offsetFilePath))
validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "193")
}

/**
* Validates the contents of the offsetFile against the given ssp and offset.
* The legacy offset file only contains the offset as a string, while the new offset file contains a map of
* ssp to offset in json format.
* The name of the two offset files are given in {@link StorageManagerUtil.OFFSET_FILE_NAME_NEW} and
* {@link StorageManagerUtil.OFFSET_FILE_LEGACY}.
*/
private def validateOffsetFileContents(offsetFile: File, ssp: String, offset: String): Unit = {

if (offsetFile.getCanonicalFile.getName.equals(StorageManagerUtil.OFFSET_FILE_NAME_NEW)) {
assertEquals("Found incorrect value in offset file!", "{\"" + ssp + "\":\"" + offset + "\"}", FileUtil.readWithChecksum(offsetFile))
} else if (offsetFile.getCanonicalFile.getName.equals(StorageManagerUtil.OFFSET_FILE_NAME_LEGACY)) {
assertEquals("Found incorrect value in offset file!", offset, FileUtil.readWithChecksum(offsetFile))
} else {
throw new IllegalArgumentException("Invalid offset file name");
}
}

@Test
def testStopShouldNotCreateOffsetFileForEmptyStore() {
val partition = new Partition(0)

val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)


val sspMetadataCache = mock[SSPMetadataCache]
Expand Down Expand Up @@ -567,7 +595,7 @@ class TestTaskStorageManager extends MockitoSugar {
// Create a file in old single-offset format, with a sample offset
val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
val storeFile = new File(storeDirectory, "store.sst")
val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
val offsetFile = new File(storeDirectory, offsetFileName)
val sampleOldOffset = "912321"
FileUtil.writeWithChecksum(offsetFile, sampleOldOffset)

Expand All @@ -578,6 +606,28 @@ class TestTaskStorageManager extends MockitoSugar {
assertTrue(offsets.get(ssp).equals(sampleOldOffset))
}

@Test
def testReadOfOffsetInCaseOfBothFilesPresent(): Unit = {
// Create a file in old single-offset format, with a sample offset, and another with the new-offset format
val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
val storeFile = new File(storeDirectory, "store.sst")
val sampleOldOffset = "100000001"
val sampleNewOffset = "{\"kafka.test-stream.0\":\"200000002\"}"
FileUtil.writeWithChecksum(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_LEGACY), sampleOldOffset)
FileUtil.writeWithChecksum(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW), sampleNewOffset)

// Ensure that the files exist
assertTrue(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_LEGACY).exists())
assertTrue(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW).exists())

// read offset against a given ssp from the file, and check that the one in the new file should be read
var ssp = new SystemStreamPartition("kafka", "test-stream", new Partition(0))
val offsets = StorageManagerUtil.readOffsetFile(storeDirectory, Set(ssp).asJava, false)

assertEquals(1, offsets.size())
assertEquals("200000002", offsets.get(ssp))
}

private def testChangelogConsumerOffsetRegistration(oldestOffset: String, newestOffset: String, upcomingOffset: String, expectedRegisteredOffset: String, fileOffset: String, writeOffsetFile: Boolean): Unit = {
val systemName = "kafka"
val streamName = getStreamName(loggedStore)
Expand All @@ -590,7 +640,7 @@ class TestTaskStorageManager extends MockitoSugar {
val storeFile = new File(storeDirectory, "store.sst")

if (writeOffsetFile) {
val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
val offsetFile = new File(storeDirectory, offsetFileName)
if (fileOffset != null) {
FileUtil.writeWithChecksum(offsetFile, fileOffset)
} else {
Expand Down Expand Up @@ -711,6 +761,17 @@ class TestTaskStorageManager extends MockitoSugar {
}
}

object TestTaskStorageManager {

@Parameters def parameters: util.Collection[Array[String]] = {
val offsetFileNames = new util.ArrayList[Array[String]]()
offsetFileNames.add(Array(StorageManagerUtil.OFFSET_FILE_NAME_NEW))
offsetFileNames.add(Array(StorageManagerUtil.OFFSET_FILE_NAME_LEGACY))
offsetFileNames
}
}


object TaskStorageManagerBuilder {
val defaultStoreBaseDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "store")
val defaultLoggedStoreBaseDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "loggedStore")
Expand Down

0 comments on commit fca8437

Please sign in to comment.