diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index f6a622a3b1f93..21a2c6a58b0c1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -139,7 +139,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogManager - write rotating logs") { // Write data using manager - val dataToWrite = generateRandomData(10) + val dataToWrite = generateRandomData() val dir = pathForTest writeDataUsingManager(dir, dataToWrite) @@ -154,7 +154,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte // Write data manually for testing reading through manager val dir = pathForTest val writtenData = (1 to 10).map { i => - val data = generateRandomData(10) + val data = generateRandomData() val file = dir + s"/log-$i-$i" writeDataManually(data, file) data @@ -171,7 +171,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogManager - recover past logs when creating new manager") { // Write data with manager, recover with new manager and verify - val dataToWrite = generateRandomData(100) + val dataToWrite = generateRandomData() val dir = pathForTest writeDataUsingManager(dir, dataToWrite) val logFiles = getLogFilesInDirectory(dir) @@ -183,7 +183,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte test("WriteAheadLogManager - cleanup old logs") { // Write data with manager, recover with new manager and verify val dir = pathForTest - val dataToWrite = generateRandomData(100) + val dataToWrite = generateRandomData() val fakeClock = new ManualClock val manager = new WriteAheadLogManager(dir, hadoopConf, rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) @@ -239,8 +239,10 @@ object WriteAheadLogSuite { def writeDataUsingManager(logDirectory: String, data: Seq[String]) { val fakeClock = new ManualClock + fakeClock.setTime(1000000) val manager = new WriteAheadLogManager(logDirectory, hadoopConf, rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) + // Ensure that 500 does not get sorted after 2000, so put a high base value. data.foreach { item => fakeClock.addToTime(500) manager.writeToLog(item) @@ -290,8 +292,8 @@ object WriteAheadLogSuite { data } - def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = { - (1 to numItems).map { + def generateRandomData(): Seq[String] = { + (1 to 50).map { _.toString } } @@ -300,11 +302,8 @@ object WriteAheadLogSuite { val logDirectoryPath = new Path(directory) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) - implicit def fileStatusOrdering[A <: FileStatus]: Ordering[A] = Ordering - .by(f => f.getModificationTime) - if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { - fileSystem.listStatus(logDirectoryPath).sorted.map { + fileSystem.listStatus(logDirectoryPath).map { _.getPath.toString } } else {