Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update #224 to add FSFetcher as a standalone fetcher #232

Merged
merged 13 commits into from
Apr 14, 2017
Merged
20 changes: 10 additions & 10 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@
</params>
</fetcher>
<!--
This is an experimental replacement for the MapReduceFetcherHadoop2 that attempts to burn
through queues of jobs faster by pulling data directly from HDFS rather than going through
the job history server.
This is an experimental replacement for the MapReduceFetcherHadoop2 that attempts to burn
through queues of jobs faster by pulling data directly from HDFS rather than going through
the job history server.

Increasing the param history_log_size_limit_in_mb allows this fetcher to accept larger log
files, but also increase the risk of OutOfMemory error. The default heap size of Dr. Elephant
is 1024MB. To increase this, e.g. to 2048MB, run this before start.sh:
export OPTS="-mem 2048"
Increasing the param history_log_size_limit_in_mb allows this fetcher to accept larger log
files, but also increase the risk of OutOfMemory error. The default heap size of Dr. Elephant
is 1024MB. To increase this, e.g. to 2048MB, run this before start.sh:
export OPTS="-mem 2048"

To work properly, this fetcher should use the same timezone with the job history server.
If not set, the local timezone will be used.
-->
To work properly, this fetcher should use the same timezone with the job history server.
If not set, the local timezone will be used.
-->
<!--
<fetcher>
<applicationtype>mapreduce</applicationtype>
Expand Down
53 changes: 31 additions & 22 deletions app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package com.linkedin.drelephant.spark.fetchers
import scala.async.Async
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.{Duration, SECONDS}
import scala.util.Try
import scala.util.{Try, Success, Failure}
import scala.util.control.NonFatal

import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.legacyfetchers.FSFetcher
import com.linkedin.drelephant.util.SparkUtils
import org.apache.hadoop.conf.Configuration
import org.apache.log4j.Logger
Expand All @@ -37,6 +38,7 @@ import org.apache.spark.SparkConf
class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
extends ElephantFetcher[SparkApplicationData] {
import SparkFetcher._
import Async.{async, await}
import ExecutionContext.Implicits.global

private val logger: Logger = Logger.getLogger(classOf[SparkFetcher])
Expand All @@ -47,7 +49,7 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)

private[fetchers] lazy val sparkConf: SparkConf = {
val sparkConf = new SparkConf()
sparkUtils.getDefaultPropertiesFile(sparkUtils.defaultEnv) match {
sparkUtils.getDefaultPropertiesFile() match {
case Some(filename) => sparkConf.setAll(sparkUtils.getPropertiesFromFile(filename))
case None => throw new IllegalStateException("can't find Spark conf; please set SPARK_HOME or SPARK_CONF_DIR")
}
Expand All @@ -62,31 +64,31 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
}

override def fetchData(analyticJob: AnalyticJob): SparkApplicationData = {
doFetchData(analyticJob) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this match block is not needed? Just call doFetchData method which either returns SparkApplicationData or throws an exception if an error occurs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

case Success(data) => data
case Failure(e) => throw e
}
}

private def doFetchData(analyticJob: AnalyticJob): Try[SparkApplicationData] = {
val appId = analyticJob.getAppId
logger.info(s"Fetching data for ${appId}")
try {
Await.result(doFetchData(sparkRestClient, sparkLogClient, appId), DEFAULT_TIMEOUT)
} catch {
case NonFatal(e) =>
Try {
Await.result(doFetchDataUsingRestAndLogClients(analyticJob), DEFAULT_TIMEOUT)
}.transform(
data => {
logger.info(s"Succeeded fetching data for ${appId}")
Success(data)
},
e => {
logger.error(s"Failed fetching data for ${appId}", e)
throw e
}
Failure(e)
}
)
}
}

object SparkFetcher {
import Async.{async, await}

val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled"
val DEFAULT_TIMEOUT = Duration(30, SECONDS)

private def doFetchData(
sparkRestClient: SparkRestClient,
sparkLogClient: Option[SparkLogClient],
appId: String
)(
implicit ec: ExecutionContext
): Future[SparkApplicationData] = async {
private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = async {
val appId = analyticJob.getAppId
val restDerivedData = await(sparkRestClient.fetchData(appId))
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId

Expand All @@ -98,4 +100,11 @@ object SparkFetcher {

SparkApplicationData(appId, restDerivedData, logDerivedData)
}

}

object SparkFetcher {

val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled"
val DEFAULT_TIMEOUT = Duration(60, SECONDS)
}
142 changes: 22 additions & 120 deletions app/com/linkedin/drelephant/spark/fetchers/SparkLogClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,82 +16,63 @@

package com.linkedin.drelephant.spark.fetchers

import java.io.{BufferedInputStream, FileNotFoundException, InputStream}
import java.net.URI
import java.io.InputStream
import java.security.PrivilegedAction

import scala.async.Async
import scala.collection.mutable.HashMap
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source

import com.linkedin.drelephant.security.HadoopSecurity
import com.linkedin.drelephant.spark.data.SparkLogDerivedData
import com.linkedin.drelephant.util.SparkUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec, LZFCompressionCodec, SnappyCompressionCodec}
import org.apache.spark.scheduler.{SparkListenerEnvironmentUpdate, SparkListenerEvent}
import org.json4s.{DefaultFormats, JsonAST}
import org.json4s.jackson.JsonMethods


/**
* A client for getting data from the Spark event logs, using the location configured for spark.eventLog.dir.
*
* This client uses webhdfs to access the location, even if spark.eventLog.dir is provided as an hdfs URL.
*
* The codecs used by this client use JNI, which results in some weird classloading issues (at least when testing in the console),
* so some of the client's implementation is non-lazy or synchronous when needed.
* A client for getting data from the Spark event logs.
*/
class SparkLogClient(hadoopConfiguration: Configuration, sparkConf: SparkConf) {
import SparkLogClient._
import Async.async

private val logger: Logger = Logger.getLogger(classOf[SparkLogClient])

private[fetchers] val webhdfsEventLogUri: URI = {
val eventLogUri = sparkConf.getOption(SPARK_EVENT_LOG_DIR_KEY).map(new URI(_))
val dfsNamenodeHttpAddress = Option(hadoopConfiguration.get(HADOOP_DFS_NAMENODE_HTTP_ADDRESS_KEY))
(eventLogUri, dfsNamenodeHttpAddress) match {
case (Some(eventLogUri), _) if eventLogUri.getScheme == "webhdfs" =>
eventLogUri
case (Some(eventLogUri), Some(dfsNamenodeHttpAddress)) if eventLogUri.getScheme == "hdfs" =>
val dfsNamenodeHttpUri = new URI(null, dfsNamenodeHttpAddress, null, null, null)
new URI(s"webhdfs://${eventLogUri.getHost}:${dfsNamenodeHttpUri.getPort}${eventLogUri.getPath}")
case _ =>
throw new IllegalArgumentException(
s"""|${SPARK_EVENT_LOG_DIR_KEY} must be provided as webhdfs:// or hdfs://;
|if hdfs, ${HADOOP_DFS_NAMENODE_HTTP_ADDRESS_KEY} must also be provided for port""".stripMargin.replaceAll("\n", " ")
)
}
}
private lazy val security: HadoopSecurity = new HadoopSecurity()

private[fetchers] lazy val fs: FileSystem = FileSystem.get(webhdfsEventLogUri, hadoopConfiguration)
protected lazy val sparkUtils: SparkUtils = SparkUtils

private lazy val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", defaultValue = false)
private lazy val compressionCodec = if (shouldCompress) Some(compressionCodecFromConf(sparkConf)) else None
private lazy val compressionCodecShortName = compressionCodec.map(shortNameOfCompressionCodec)
def fetchData(appId: String, attemptId: Option[String])(implicit ec: ExecutionContext): Future[SparkLogDerivedData] =
doAsPrivilegedAction { () => doFetchData(appId, attemptId) }

def fetchData(appId: String, attemptId: Option[String])(implicit ec: ExecutionContext): Future[SparkLogDerivedData] = {
val logPath = getLogPath(webhdfsEventLogUri, appId, attemptId, compressionCodecShortName)
logger.info(s"looking for logs at ${logPath}")
protected def doAsPrivilegedAction[T](action: () => T): T =
security.doAs[T](new PrivilegedAction[T] { override def run(): T = action() })

val codec = compressionCodecForLogPath(sparkConf, logPath)
protected def doFetchData(
appId: String,
attemptId: Option[String]
)(
implicit ec: ExecutionContext
): Future[SparkLogDerivedData] = {
val (eventLogFileSystem, baseEventLogPath) =
sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration, sparkConf)
val (eventLogPath, eventLogCodec) =
sparkUtils.pathAndCodecforEventLog(sparkConf, eventLogFileSystem, baseEventLogPath, appId, attemptId)

// Limit scope of async.
async {
resource.managed { openEventLog(sparkConf, logPath, fs) }
.acquireAndGet { in => findDerivedData(codec.map { _.compressedInputStream(in) }.getOrElse(in)) }
sparkUtils.withEventLog(eventLogFileSystem, eventLogPath, eventLogCodec)(findDerivedData(_))
}
}
}

object SparkLogClient {
import JsonAST._

val SPARK_EVENT_LOG_DIR_KEY = "spark.eventLog.dir"
val HADOOP_DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"

private implicit val formats: DefaultFormats = DefaultFormats

def findDerivedData(in: InputStream, eventsLimit: Option[Int] = None): SparkLogDerivedData = {
Expand Down Expand Up @@ -123,85 +104,6 @@ object SparkLogClient {
// https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
// https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/util/Utils.scala

private val IN_PROGRESS = ".inprogress"
private val DEFAULT_COMPRESSION_CODEC = "snappy"

private val compressionCodecClassNamesByShortName = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName
)

// A cache for compression codecs to avoid creating the same codec many times
private val compressionCodecMap = HashMap.empty[String, CompressionCodec]

private def compressionCodecFromConf(conf: SparkConf): CompressionCodec = {
val codecName = conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC)
loadCompressionCodec(conf, codecName)
}

private def loadCompressionCodec(conf: SparkConf, codecName: String): CompressionCodec = {
val codecClass = compressionCodecClassNamesByShortName.getOrElse(codecName.toLowerCase, codecName)
val classLoader = Option(Thread.currentThread().getContextClassLoader).getOrElse(getClass.getClassLoader)
val codec = try {
val ctor = Class.forName(codecClass, true, classLoader).getConstructor(classOf[SparkConf])
Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
} catch {
case e: ClassNotFoundException => None
case e: IllegalArgumentException => None
}
codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. "))
}

private def shortNameOfCompressionCodec(compressionCodec: CompressionCodec): String = {
val codecName = compressionCodec.getClass.getName
if (compressionCodecClassNamesByShortName.contains(codecName)) {
codecName
} else {
compressionCodecClassNamesByShortName
.collectFirst { case (k, v) if v == codecName => k }
.getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") }
}
}

private def getLogPath(
logBaseDir: URI,
appId: String,
appAttemptId: Option[String],
compressionCodecName: Option[String] = None
): Path = {
val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
val codec = compressionCodecName.map("." + _).getOrElse("")
if (appAttemptId.isDefined) {
new Path(base + "_" + sanitize(appAttemptId.get) + codec)
} else {
new Path(base + codec)
}
}

private def openEventLog(conf: SparkConf, logPath: Path, fs: FileSystem): InputStream = {
// It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
// IOException when a file does not exist, so try our best to throw a proper exception.
if (!fs.exists(logPath)) {
throw new FileNotFoundException(s"File ${logPath} does not exist.")
}

new BufferedInputStream(fs.open(logPath))
}

private def compressionCodecForLogPath(conf: SparkConf, logPath: Path): Option[CompressionCodec] = {
// Compression codec is encoded as an extension, e.g. app_123.lzf
// Since we sanitize the app ID to not include periods, it is safe to split on it
val logBaseName = logPath.getName.stripSuffix(IN_PROGRESS)
logBaseName.split("\\.").tail.lastOption.map { codecName =>
compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(conf, codecName))
}
}

private def sanitize(str: String): String = {
str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
}

private def sparkEventFromJson(json: JValue): Option[SparkListenerEvent] = {
val environmentUpdate = getFormattedClassName(SparkListenerEnvironmentUpdate)

Expand Down
Loading