Skip to content

Commit

Permalink
[SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing
Browse files Browse the repository at this point in the history
Revise HDFSMetadataLog API such that metadata object serialization and final batch file write are separated. This will allow serialization checks without worrying about batch file name formats. marmbrus zsxwing

Existing tests already ensure this API faithfully support core functionality i.e., creation of batch files.

Author: Tyson Condie <tcondie@gmail.com>

Closes apache#15924 from tcondie/SPARK-18498.

Signed-off-by: Michael Armbrust <michael@databricks.com>
  • Loading branch information
tcondie authored and Robert Kruszewski committed Dec 2, 2016
1 parent 31a8c8a commit 157430e
Showing 1 changed file with 66 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,48 +138,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
}
}

/**
* Write a batch to a temp file then rename it to the batch file.
*
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
* valid behavior, we still need to prevent it from destroying the files.
*/
private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
// Use nextId to create a temp file
def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = {
var nextId = 0
while (true) {
val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
try {
val output = fileManager.create(tempPath)
try {
writer(metadata, output)
return Some(tempPath)
} finally {
IOUtils.closeQuietly(output)
}
try {
// Try to commit the batch
// It will fail if there is an existing file (someone has committed the batch)
logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
fileManager.rename(tempPath, batchIdToPath(batchId))

// SPARK-17475: HDFSMetadataLog should not leak CRC files
// If the underlying filesystem didn't rename the CRC file, delete it.
val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
return
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
// If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
// So throw an exception to tell the user this is not a valid behavior.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
case e: FileNotFoundException =>
// Sometimes, "create" will succeed when multiple writers are calling it at the same
// time. However, only one writer can call "rename" successfully, others will get
// FileNotFoundException because the first writer has removed it.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
}
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
// Failed to create "tempPath". There are two cases:
Expand All @@ -195,10 +165,45 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
// metadata path. In addition, the old Streaming also have this issue, people can create
// malicious checkpoint files to crash a Streaming application too.
nextId += 1
} finally {
fileManager.delete(tempPath)
}
}
None
}

/**
* Write a batch to a temp file then rename it to the batch file.
*
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
* valid behavior, we still need to prevent it from destroying the files.
*/
private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
val tempPath = writeTempBatch(metadata, writer).getOrElse(
throw new IllegalStateException(s"Unable to create temp batch file $batchId"))
try {
// Try to commit the batch
// It will fail if there is an existing file (someone has committed the batch)
logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
fileManager.rename(tempPath, batchIdToPath(batchId))

// SPARK-17475: HDFSMetadataLog should not leak CRC files
// If the underlying filesystem didn't rename the CRC file, delete it.
val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
// If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
// So throw an exception to tell the user this is not a valid behavior.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
case e: FileNotFoundException =>
// Sometimes, "create" will succeed when multiple writers are calling it at the same
// time. However, only one writer can call "rename" successfully, others will get
// FileNotFoundException because the first writer has removed it.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
} finally {
fileManager.delete(tempPath)
}
}

private def isFileAlreadyExistsException(e: IOException): Boolean = {
Expand All @@ -208,6 +213,22 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
(e.getMessage != null && e.getMessage.startsWith("File already exists: "))
}

/**
* @return the deserialized metadata in a batch file, or None if file not exist.
* @throws IllegalArgumentException when path does not point to a batch file.
*/
def get(batchFile: Path): Option[T] = {
if (fileManager.exists(batchFile)) {
if (isBatchFile(batchFile)) {
get(pathToBatchId(batchFile))
} else {
throw new IllegalArgumentException(s"File ${batchFile} is not a batch file!")
}
} else {
None
}
}

override def get(batchId: Long): Option[T] = {
val batchMetadataFile = batchIdToPath(batchId)
if (fileManager.exists(batchMetadataFile)) {
Expand Down Expand Up @@ -250,6 +271,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
None
}

/**
* Get an array of [FileStatus] referencing batch files.
* The array is sorted by most recent batch file first to
* oldest batch file.
*/
def getOrderedBatchFiles(): Array[FileStatus] = {
fileManager.list(metadataPath, batchFilesFilter)
.sortBy(f => pathToBatchId(f.getPath))
.reverse
}

/**
* Removes all the log entry earlier than thresholdBatchId (exclusive).
*/
Expand Down

0 comments on commit 157430e

Please sign in to comment.