Skip to content

Commit

Permalink
Update linkedin#224 (credits: rayortigas) to add FSFetcher as a stand…
Browse files Browse the repository at this point in the history
…alone fetcher (linkedin#232)

Remove backup for Rest Fetcher and make Legacy FSFetcher as top level fetcher. Change the default fetcher in the config.
  • Loading branch information
shankar37 authored and akshayrai committed Apr 14, 2017
1 parent f7b8bf6 commit 6ff4132
Show file tree
Hide file tree
Showing 33 changed files with 3,060 additions and 286 deletions.
38 changes: 27 additions & 11 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@
</params>
</fetcher>
<!--
This is an experimental replacement for the MapReduceFetcherHadoop2 that attempts to burn
through queues of jobs faster by pulling data directly from HDFS rather than going through
the job history server.
This is an experimental replacement for the MapReduceFetcherHadoop2 that attempts to burn
through queues of jobs faster by pulling data directly from HDFS rather than going through
the job history server.
Increasing the param history_log_size_limit_in_mb allows this fetcher to accept larger log
files, but also increase the risk of OutOfMemory error. The default heap size of Dr. Elephant
is 1024MB. To increase this, e.g. to 2048MB, run this before start.sh:
export OPTS="-mem 2048"
Increasing the param history_log_size_limit_in_mb allows this fetcher to accept larger log
files, but also increase the risk of OutOfMemory error. The default heap size of Dr. Elephant
is 1024MB. To increase this, e.g. to 2048MB, run this before start.sh:
export OPTS="-mem 2048"
To work properly, this fetcher should use the same timezone with the job history server.
If not set, the local timezone will be used.
-->
To work properly, this fetcher should use the same timezone with the job history server.
If not set, the local timezone will be used.
-->
<!--
<fetcher>
<applicationtype>mapreduce</applicationtype>
Expand All @@ -61,8 +61,24 @@
</fetcher>
-->

<!--
FSFetcher for Spark. Loads the eventlog from HDFS and replays to get the metrics and application properties
Param Description:
*event_log_size_limit_in_mb* sets the threshold for the size of the eventlog. Increasing it will necessiate
increase in heap size. default is 100
*event_log_location_uri* can be used to specify the fully qualified uri for the location in hdfs for eventlogs
if this is not specified, the fetcher will try to deduce it from the spark-conf
eg:
<params>
<event_log_size_limit_in_mb>500</event_log_size_limit_in_mb>
<event_log_location_uri>webhdfs://localhost:50070/system/spark-history</event_log_location_uri>
</params>
-->
<fetcher>
<applicationtype>spark</applicationtype>
<classname>com.linkedin.drelephant.spark.fetchers.SparkFetcher</classname>
<classname>com.linkedin.drelephant.spark.fetchers.FSFetcher</classname>
</fetcher>
</fetchers>
40 changes: 40 additions & 0 deletions app/com/linkedin/drelephant/spark/fetchers/FSFetcher.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.spark.fetchers

import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.legacydata.LegacyDataConverters
import org.apache.spark.deploy.history.SparkFSFetcher

/**
* Wraps the SparkFSFetcher which has the actual logic to comply to the new SparkApplicationData interface
* @param fetcherConfigurationData
*/
class FSFetcher(fetcherConfigurationData: FetcherConfigurationData)
extends ElephantFetcher[SparkApplicationData] {
lazy val legacyFetcher = new SparkFSFetcher(fetcherConfigurationData)

override def fetchData(analyticJob: AnalyticJob): SparkApplicationData = {
val legacyData = legacyFetcher.fetchData(analyticJob)
LegacyDataConverters.convert(legacyData)
}
}

object FSFetcher {
}
74 changes: 42 additions & 32 deletions app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.linkedin.drelephant.spark.fetchers
import scala.async.Async
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.{Duration, SECONDS}
import scala.util.{Try, Success, Failure}
import scala.util.control.NonFatal

import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
Expand All @@ -36,17 +37,21 @@ import org.apache.spark.SparkConf
class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
extends ElephantFetcher[SparkApplicationData] {
import SparkFetcher._
import Async.{async, await}
import ExecutionContext.Implicits.global

private val logger: Logger = Logger.getLogger(classOf[SparkFetcher])

val eventLogUri = Option(fetcherConfigurationData.getParamMap.get(LOG_LOCATION_URI_XML_FIELD))
logger.info("The event log location of Spark application is set to " + eventLogUri)

private[fetchers] lazy val hadoopConfiguration: Configuration = new Configuration()

private[fetchers] lazy val sparkUtils: SparkUtils = SparkUtils

private[fetchers] lazy val sparkConf: SparkConf = {
val sparkConf = new SparkConf()
sparkUtils.getDefaultPropertiesFile(sparkUtils.defaultEnv) match {
sparkUtils.getDefaultPropertiesFile() match {
case Some(filename) => sparkConf.setAll(sparkUtils.getPropertiesFromFile(filename))
case None => throw new IllegalStateException("can't find Spark conf; please set SPARK_HOME or SPARK_CONF_DIR")
}
Expand All @@ -65,25 +70,51 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
private[fetchers] lazy val sparkRestClient: SparkRestClient = new SparkRestClient(sparkConf)

private[fetchers] lazy val sparkLogClient: SparkLogClient = {
new SparkLogClient(hadoopConfiguration, sparkConf)
new SparkLogClient(hadoopConfiguration, sparkConf, eventLogUri)
}

override def fetchData(analyticJob: AnalyticJob): SparkApplicationData = {
doFetchData(analyticJob) match {
case Success(data) => data
case Failure(e) => throw e
}
}

private def doFetchData(analyticJob: AnalyticJob): Try[SparkApplicationData] = {
val appId = analyticJob.getAppId
logger.info(s"Fetching data for ${appId}")
try {
Await.result(doFetchData(sparkRestClient, sparkLogClient, appId, eventLogSource),
DEFAULT_TIMEOUT)
} catch {
case NonFatal(e) =>
Try {
Await.result(doFetchDataUsingRestAndLogClients(analyticJob), DEFAULT_TIMEOUT)
}.transform(
data => {
logger.info(s"Succeeded fetching data for ${appId}")
Success(data)
},
e => {
logger.error(s"Failed fetching data for ${appId}", e)
throw e
Failure(e)
}
)
}

private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = async {
val appId = analyticJob.getAppId
val restDerivedData = await(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest))

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
case EventLogSource.WebHdfs =>
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId
Some(await(sparkLogClient.fetchData(appId, lastAttemptId)))
}

SparkApplicationData(appId, restDerivedData, logDerivedData)
}

}

object SparkFetcher {
import Async.{async, await}

sealed trait EventLogSource

Expand All @@ -97,27 +128,6 @@ object SparkFetcher {
}

val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled"
val DEFAULT_TIMEOUT = Duration(30, SECONDS)

private def doFetchData(
sparkRestClient: SparkRestClient,
sparkLogClient: SparkLogClient,
appId: String,
eventLogSource: EventLogSource
)(
implicit ec: ExecutionContext
): Future[SparkApplicationData] = async {
val restDerivedData = await(sparkRestClient.fetchData(
appId, eventLogSource == EventLogSource.Rest))

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
case EventLogSource.WebHdfs =>
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId
Some(await(sparkLogClient.fetchData(appId, lastAttemptId)))
}

SparkApplicationData(appId, restDerivedData, logDerivedData)
}
val DEFAULT_TIMEOUT = Duration(60, SECONDS)
val LOG_LOCATION_URI_XML_FIELD = "event_log_location_uri"
}
Loading

0 comments on commit 6ff4132

Please sign in to comment.