Skip to content

Commit

Permalink
Fix log cleaner, add test.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Apr 24, 2015
1 parent 7c381ec commit 76a3651
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,20 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}

/**
* A class that provides application history from event logs stored in the file system.
* This provider checks for new finished applications in the background periodically and
* renders the history application UI by parsing the associated event logs.
*/
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {
private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
extends ApplicationHistoryProvider with Logging {

def this(conf: SparkConf) = {
this(conf, new SystemClock())
}

import FsHistoryProvider._

Expand Down Expand Up @@ -75,8 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()

// List of applications to be deleted by event log cleaner.
private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]

// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
Expand Down Expand Up @@ -289,42 +293,51 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
/**
* Delete event logs from the log directory according to the clean policy defined by the user.
*/
private def cleanLogs(): Unit = {
private[history] def cleanLogs(): Unit = {
try {
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000

val now = System.currentTimeMillis()
val now = clock.getTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()

def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
now - attempt.lastUpdated > maxAge && attempt.completed
}

// Scan all logs from the log directory.
// Only completed applications older than the specified max age will be deleted.
applications.values.foreach { info =>
if (now - info.attempts.head.lastUpdated <= maxAge || !info.attempts.head.completed) {
appsToRetain += (info.id -> info)
} else {
appsToClean += info
applications.values.foreach { app =>
val toClean = app.attempts.filter(shouldClean)
attemptsToClean ++= toClean

if (toClean.isEmpty) {
appsToRetain += (app.id -> app)
} else if (toClean.size < app.attempts.size) {
appsToRetain += (app.id ->
new FsApplicationHistoryInfo(app.id, app.name,
app.attempts.filter(!shouldClean(_)).toList))
}
}

applications = appsToRetain

val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
appsToClean.foreach { info =>
val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
attemptsToClean.foreach { attempt =>
try {
val path = new Path(logDir, info.logPath)
val path = new Path(logDir, attempt.logPath)
if (fs.exists(path)) {
fs.delete(path, true)
}
} catch {
case e: AccessControlException =>
logInfo(s"No permission to delete ${info.logPath}, ignoring.")
logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
case t: IOException =>
logError(s"IOException in cleaning logs of ${info.logPath}", t)
leftToClean += info
logError(s"IOException in cleaning ${attempt.logPath}", t)
leftToClean += attempt
}
}

appsToClean = leftToClean
attemptsToClean = leftToClean
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.history

import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
import java.net.URI
import java.util.concurrent.TimeUnit

import scala.io.Source

Expand All @@ -30,7 +31,7 @@ import org.scalatest.Matchers
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.io._
import org.apache.spark.scheduler._
import org.apache.spark.util.{JsonProtocol, Utils}
import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}

class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {

Expand Down Expand Up @@ -283,6 +284,50 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
}

test("log cleaner") {
val maxAge = TimeUnit.SECONDS.toMillis(10)
val clock = new ManualClock(maxAge / 2)
val provider = new FsHistoryProvider(
createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)

val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
SparkListenerApplicationEnd(2L)
)
log1.setLastModified(0L)

val log2 = newLogFile("app1", Some("attempt2"), inProgress = false)
writeFile(log2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
SparkListenerApplicationEnd(4L)
)
log2.setLastModified(clock.getTimeMillis())

updateAndCheck(provider) { list =>
list.size should be (1)
list.head.attempts.size should be (2)
}

// Move the clock forward so log1 exceeds the max age.
clock.advance(maxAge)

updateAndCheck(provider) { list =>
list.size should be (1)
list.head.attempts.size should be (1)
list.head.attempts.head.attemptId should be ("attempt2")
}
assert(!log1.exists())

// Do the same for the other log.
clock.advance(maxAge)

updateAndCheck(provider) { list =>
list.size should be (0)
}
assert(!log2.exists())
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand All @@ -294,6 +339,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
private def updateAndCheck(provider: FsHistoryProvider)
(checkFn: Seq[ApplicationHistoryInfo] => Unit): Unit = {
provider.checkForLogs()
provider.cleanLogs()
checkFn(provider.getListing().toSeq)
}

Expand Down

0 comments on commit 76a3651

Please sign in to comment.