Skip to content

Commit

Permalink
Sort listed files by name. Use local files for WAL tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed Oct 23, 2014
1 parent 82ce56e commit 4705fff
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 65 deletions.
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,6 @@
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
package org.apache.spark.streaming.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path}

private[streaming] object HdfsUtils {

def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that

val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
// If the file exists and we have append support, append instead of creating a new file
Expand Down Expand Up @@ -63,6 +62,10 @@ private[streaming] object HdfsUtils {
}

def getFileSystemForPath(path: Path, conf: Configuration) = synchronized {
path.getFileSystem(conf)
val fs = path.getFileSystem(conf)
fs match {
case localFs: LocalFileSystem => localFs.getRawFileSystem
case _ => fs
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
}

private def flush() {
stream.getWrappedStream.flush()
hadoopFlushMethod.foreach {
_.invoke(stream)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.streaming.util
import java.io._
import java.nio.ByteBuffer

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.Path

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
Expand All @@ -31,63 +31,53 @@ import WriteAheadLogSuite._
import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.MiniDFSCluster
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.util.Utils
import org.scalatest.concurrent.Eventually._

class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll {
class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {

val hadoopConf = new Configuration()
val dfsDir = Files.createTempDir()
val TEST_BUILD_DATA_KEY: String = "test.build.data"
val oldTestBuildDataProp = Option(System.getProperty(TEST_BUILD_DATA_KEY))
System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString)
val cluster = new MiniDFSCluster(new Configuration, 2, true, null)
val nnPort = cluster.getNameNode.getNameNodeAddress.getPort
val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/"
var pathForTest: String = null

override def beforeAll() {
cluster.waitActive()
}
var tempDir: File = null
var dirForTest: String = null
var fileForTest: String = null

before {
pathForTest = hdfsUrl + getRandomString()
tempDir = Files.createTempDir()
dirForTest = "file:///" + tempDir.toString
fileForTest = "file:///" + new File(tempDir, getRandomString()).toString
}

override def afterAll() {
cluster.shutdown()
FileUtils.deleteDirectory(dfsDir)
oldTestBuildDataProp.foreach(System.setProperty(TEST_BUILD_DATA_KEY, _))
after {
FileUtils.deleteDirectory(tempDir)
}

test("WriteAheadLogWriter - writing data") {
val dataToWrite = generateRandomData()
val writer = new WriteAheadLogWriter(pathForTest, hadoopConf)
val writer = new WriteAheadLogWriter(fileForTest, hadoopConf)
val segments = dataToWrite.map(data => writer.write(data))
writer.close()
val writtenData = readDataManually(pathForTest, segments)
val writtenData = readDataManually(fileForTest, segments)
assert(writtenData.toArray === dataToWrite.toArray)
}

test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
val dataToWrite = generateRandomData()
val writer = new WriteAheadLogWriter(pathForTest, hadoopConf)
val writer = new WriteAheadLogWriter(fileForTest, hadoopConf)
dataToWrite.foreach { data =>
val segment = writer.write(ByteBuffer.wrap(data.getBytes()))
val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf)
val segment = writer.write(stringToByteBuffer(data))
val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf)
val dataRead = reader.read(segment)
assert(data === new String(dataRead.array()))
assert(data === byteBufferToString(dataRead))
}
writer.close()
}

test("WriteAheadLogReader - sequentially reading data") {
// Write data manually for testing the sequential reader
val writtenData = generateRandomData()
writeDataManually(writtenData, pathForTest)
val reader = new WriteAheadLogReader(pathForTest, hadoopConf)
writeDataManually(writtenData, fileForTest)
val reader = new WriteAheadLogReader(fileForTest, hadoopConf)
val readData = reader.toSeq.map(byteBufferToString)
assert(readData.toList === writtenData.toList)
assert(reader.hasNext === false)
Expand All @@ -100,9 +90,9 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogReader - sequentially reading data written with writer") {
// Write data manually for testing the sequential reader
val dataToWrite = generateRandomData()
writeDataUsingWriter(pathForTest, dataToWrite)
writeDataUsingWriter(fileForTest, dataToWrite)
val iter = dataToWrite.iterator
val reader = new WriteAheadLogReader(pathForTest, hadoopConf)
val reader = new WriteAheadLogReader(fileForTest, hadoopConf)
reader.foreach { byteBuffer =>
assert(byteBufferToString(byteBuffer) === iter.next())
}
Expand All @@ -112,11 +102,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogRandomReader - reading data using random reader") {
// Write data manually for testing the random reader
val writtenData = generateRandomData()
val segments = writeDataManually(writtenData, pathForTest)
val segments = writeDataManually(writtenData, fileForTest)

// Get a random order of these segments and read them back
val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf)
val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf)
writtenDataAndSegments.foreach { case (data, segment) =>
assert(data === byteBufferToString(reader.read(segment)))
}
Expand All @@ -126,11 +116,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
// Write data using writer for testing the random reader
val data = generateRandomData()
val segments = writeDataUsingWriter(pathForTest, data)
val segments = writeDataUsingWriter(fileForTest, data)

// Read a random sequence of segments and verify read data
val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
val reader = new WriteAheadLogRandomReader(pathForTest, hadoopConf)
val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf)
dataAndSegments.foreach { case (data, segment) =>
assert(data === byteBufferToString(reader.read(segment)))
}
Expand All @@ -140,62 +130,58 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogManager - write rotating logs") {
// Write data using manager
val dataToWrite = generateRandomData()
val dir = pathForTest
writeDataUsingManager(dir, dataToWrite)
writeDataUsingManager(dirForTest, dataToWrite)

// Read data manually to verify the written data
val logFiles = getLogFilesInDirectory(dir)
val logFiles = getLogFilesInDirectory(dirForTest)
assert(logFiles.size > 1)
val writtenData = logFiles.flatMap { file => readDataManually(file)}
assert(writtenData.toList === dataToWrite.toList)
}

test("WriteAheadLogManager - read rotating logs") {
// Write data manually for testing reading through manager
val dir = pathForTest
val writtenData = (1 to 10).map { i =>
val data = generateRandomData()
val file = dir + s"/log-$i-$i"
val file = dirForTest + s"/log-$i-$i"
writeDataManually(data, file)
data
}.flatten

val logDirectoryPath = new Path(dir)
val logDirectoryPath = new Path(dirForTest)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
assert(fileSystem.exists(logDirectoryPath) === true)

// Read data using manager and verify
val readData = readDataUsingManager(dir)
val readData = readDataUsingManager(dirForTest)
assert(readData.toList === writtenData.toList)
}

test("WriteAheadLogManager - recover past logs when creating new manager") {
// Write data with manager, recover with new manager and verify
val dataToWrite = generateRandomData()
val dir = pathForTest
writeDataUsingManager(dir, dataToWrite)
val logFiles = getLogFilesInDirectory(dir)
writeDataUsingManager(dirForTest, dataToWrite)
val logFiles = getLogFilesInDirectory(dirForTest)
assert(logFiles.size > 1)
val readData = readDataUsingManager(dir)
val readData = readDataUsingManager(dirForTest)
assert(dataToWrite.toList === readData.toList)
}

test("WriteAheadLogManager - cleanup old logs") {
// Write data with manager, recover with new manager and verify
val dir = pathForTest
val dataToWrite = generateRandomData()
val fakeClock = new ManualClock
val manager = new WriteAheadLogManager(dir, hadoopConf,
val manager = new WriteAheadLogManager(dirForTest, hadoopConf,
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
dataToWrite.foreach { item =>
fakeClock.addToTime(500) // half second for each
manager.writeToLog(item)
}
val logFiles = getLogFilesInDirectory(dir)
val logFiles = getLogFilesInDirectory(dirForTest)
assert(logFiles.size > 1)
manager.cleanupOldLogs(fakeClock.currentTime() / 2)
eventually(timeout(1 second), interval(10 milliseconds)) {
assert(getLogFilesInDirectory(dir).size < logFiles.size)
assert(getLogFilesInDirectory(dirForTest).size < logFiles.size)
}
}
// TODO (Hari, TD): Test different failure conditions of writers and readers.
Expand Down Expand Up @@ -305,7 +291,7 @@ object WriteAheadLogSuite {
if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
fileSystem.listStatus(logDirectoryPath).map {
_.getPath.toString
}
}.sorted
} else {
Seq.empty
}
Expand Down

0 comments on commit 4705fff

Please sign in to comment.