Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changed async to future/blocking and changed the error to warn #353

Merged
merged 2 commits into from
Mar 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/com/linkedin/drelephant/ElephantRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@ public void run() {
logger.error(ExceptionUtils.getStackTrace(e));

if (_analyticJob != null && _analyticJob.retry()) {
logger.error("Add analytic job id [" + _analyticJob.getAppId() + "] into the retry list.");
logger.warn("Add analytic job id [" + _analyticJob.getAppId() + "] into the retry list.");
_analyticJobGenerator.addIntoRetries(_analyticJob);
} else if (_analyticJob != null && _analyticJob.isSecondPhaseRetry()) {
//Putting the job into a second retry queue which fetches jobs after some interval. Some spark jobs may need more time than usual to process, hence the queue.
logger.error("Add analytic job id [" + _analyticJob.getAppId() + "] into the second retry list.");
logger.warn("Add analytic job id [" + _analyticJob.getAppId() + "] into the second retry list.");
_analyticJobGenerator.addIntoSecondRetryQueue(_analyticJob);
} else {
if (_analyticJob != null) {
Expand Down
40 changes: 21 additions & 19 deletions app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

package com.linkedin.drelephant.spark.fetchers

import scala.async.Async
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.{Await, ExecutionContext, Future, blocking}
import scala.concurrent.duration.{Duration, SECONDS}
import scala.util.{Try, Success, Failure}
import scala.util.control.NonFatal

import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData
Expand All @@ -35,9 +33,9 @@ import org.apache.spark.SparkConf
* A fetcher that gets Spark-related data from a combination of the Spark monitoring REST API and Spark event logs.
*/
class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
extends ElephantFetcher[SparkApplicationData] {
extends ElephantFetcher[SparkApplicationData] {

import SparkFetcher._
import Async.{async, await}
import ExecutionContext.Implicits.global

private val logger: Logger = Logger.getLogger(classOf[SparkFetcher])
Expand Down Expand Up @@ -94,35 +92,39 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
Success(data)
},
e => {
logger.error(s"Failed fetching data for ${appId}", e)
logger.warn(s"Failed fetching data for ${appId}." + " I will retry after some time! " + "Exception Message is: " + e.getMessage)
Failure(e)
}
)
}

private def doFetchSparkApplicationData(analyticJob: AnalyticJob): Future[SparkApplicationData] = {
if (shouldProcessLogsLocally) {
async {
Future {
sparkRestClient.fetchEventLogAndParse(analyticJob.getAppId)
}
} else {
doFetchDataUsingRestAndLogClients(analyticJob)
}
}

private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = async {
val appId = analyticJob.getAppId
val restDerivedData = await(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest))

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
case EventLogSource.WebHdfs =>
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId
Some(await(sparkLogClient.fetchData(appId, lastAttemptId)))
}
private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = Future {
blocking {
val appId = analyticJob.getAppId
val restDerivedData = Await.result(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest), DEFAULT_TIMEOUT)

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
case EventLogSource.WebHdfs =>
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy {
_.startTime
}.attemptId
Some(Await.result(sparkLogClient.fetchData(appId, lastAttemptId), DEFAULT_TIMEOUT))
}

SparkApplicationData(appId, restDerivedData, logDerivedData)
SparkApplicationData(appId, restDerivedData, logDerivedData)
}
}

}
Expand Down
78 changes: 53 additions & 25 deletions app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.linkedin.drelephant.spark.fetchers

import java.io.{InputStream, BufferedInputStream}
import java.io.{BufferedInputStream, InputStream}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.zip.ZipInputStream
Expand All @@ -25,8 +25,7 @@ import java.util.{Calendar, SimpleTimeZone}
import com.linkedin.drelephant.spark.legacydata.LegacyDataConverters
import org.apache.spark.deploy.history.SparkDataCollection

import scala.async.Async
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{Await, ExecutionContext, Future, blocking}
import scala.util.control.NonFatal
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
Expand All @@ -41,15 +40,17 @@ import javax.ws.rs.core.MediaType
import org.apache.log4j.Logger
import org.apache.spark.SparkConf

import scala.concurrent.duration.{Duration, SECONDS}

/**
* A client for getting data from the Spark monitoring REST API, e.g. <https://spark.apache.org/docs/1.4.1/monitoring.html#rest-api>.
*
* Jersey classloading seems to be brittle (at least when testing in the console), so some of the implementation is non-lazy
* or synchronous when needed.
*/
class SparkRestClient(sparkConf: SparkConf) {

import SparkRestClient._
import Async.{async, await}

private val logger: Logger = Logger.getLogger(classOf[SparkRestClient])

Expand All @@ -58,8 +59,8 @@ class SparkRestClient(sparkConf: SparkConf) {
private val historyServerUri: URI = sparkConf.getOption(HISTORY_SERVER_ADDRESS_KEY) match {
case Some(historyServerAddress) =>
val baseUri: URI =
// Latest versions of CDH include http in their history server address configuration.
// However, it is not recommended by Spark documentation(http://spark.apache.org/docs/latest/running-on-yarn.html)
// Latest versions of CDH include http in their history server address configuration.
// However, it is not recommended by Spark documentation(http://spark.apache.org/docs/latest/running-on-yarn.html)
if (historyServerAddress.contains(s"http://")) {
new URI(historyServerAddress)
} else {
Expand All @@ -79,29 +80,49 @@ class SparkRestClient(sparkConf: SparkConf) {
val (applicationInfo, attemptTarget) = getApplicationMetaData(appId)

// Limit the scope of async.
async {
val futureJobDatas = async { getJobDatas(attemptTarget) }
val futureStageDatas = async { getStageDatas(attemptTarget) }
val futureExecutorSummaries = async { getExecutorSummaries(attemptTarget) }
val futureLogData = if (fetchLogs) {
async { getLogData(attemptTarget)}
} else Future.successful(None)

SparkRestDerivedData(
applicationInfo,
await(futureJobDatas),
await(futureStageDatas),
await(futureExecutorSummaries),
await(futureLogData)
)
Future {
blocking {
val futureJobDatas = Future {
blocking {
getJobDatas(attemptTarget)
}
}
val futureStageDatas = Future {
blocking {
getStageDatas(attemptTarget)
}
}
val futureExecutorSummaries = Future {
blocking {
getExecutorSummaries(attemptTarget)
}
}
val futureLogData = if (fetchLogs) {
Future {
blocking {
getLogData(attemptTarget)
}
}
} else Future.successful(None)

SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, Duration(5, SECONDS)),
Await.result(futureLogData, Duration(5, SECONDS))
)
}
}
}

def fetchEventLogAndParse(appId: String): SparkApplicationData = {
val (_, attemptTarget) = getApplicationMetaData(appId)
val logTarget = attemptTarget.path("logs")
logger.info(s"creating SparkApplication by calling REST API at ${logTarget.getUri} to get eventlogs")
resource.managed { getApplicationLogs(logTarget) }.acquireAndGet { zipInputStream =>
resource.managed {
getApplicationLogs(logTarget)
}.acquireAndGet { zipInputStream =>
getLogInputStream(zipInputStream, logTarget) match {
case (None, _) => throw new RuntimeException(s"Failed to read log for application ${appId}")
case (Some(inputStream), fileName) => {
Expand All @@ -121,7 +142,9 @@ class SparkRestClient(sparkConf: SparkConf) {

// These are pure and cannot fail, therefore it is safe to have
// them outside of the async block.
val lastAttemptId = applicationInfo.attempts.maxBy {_.startTime}.attemptId
val lastAttemptId = applicationInfo.attempts.maxBy {
_.startTime
}.attemptId
val attemptTarget = lastAttemptId.map(appTarget.path).getOrElse(appTarget)
(applicationInfo, attemptTarget)
}
Expand All @@ -140,7 +163,9 @@ class SparkRestClient(sparkConf: SparkConf) {
private def getLogData(attemptTarget: WebTarget): Option[SparkLogDerivedData] = {
val target = attemptTarget.path("logs")
logger.info(s"calling REST API at ${target.getUri} to get eventlogs")
resource.managed { getApplicationLogs(target) }.acquireAndGet { zis =>
resource.managed {
getApplicationLogs(target)
}.acquireAndGet { zis =>
val (inputStream, _) = getLogInputStream(zis, target)
inputStream.map(SparkLogClient.findDerivedData(_))
}
Expand Down Expand Up @@ -174,7 +199,9 @@ class SparkRestClient(sparkConf: SparkConf) {
throw new RuntimeException(s"Application for the log ${entryName} has not finished yet.")
}
val codec = SparkUtils.compressionCodecForLogName(sparkConf, entryName)
(Some(codec.map { _.compressedInputStream(zis)}.getOrElse(zis)), entryName)
(Some(codec.map {
_.compressedInputStream(zis)
}.getOrElse(zis)), entryName)
}
}

Expand Down Expand Up @@ -219,6 +246,7 @@ object SparkRestClient {
val HISTORY_SERVER_ADDRESS_KEY = "spark.yarn.historyServer.address"
val API_V1_MOUNT_PATH = "api/v1"
val IN_PROGRESS = ".inprogress"
val DEFAULT_TIMEOUT = Duration(5, SECONDS);

val SparkRestObjectMapper = {
val dateFormat = {
Expand Down