diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala index e3ea982c1..d9913851b 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala @@ -18,7 +18,7 @@ package com.linkedin.drelephant.spark.fetchers import java.util.concurrent.TimeoutException -import scala.concurrent.{Await, ExecutionContext, Future, blocking} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.{Duration, SECONDS} import scala.util.{Failure, Success, Try} import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher} @@ -110,22 +110,20 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData) } private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = Future { - blocking { - val appId = analyticJob.getAppId - val restDerivedData = Await.result(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest), DEFAULT_TIMEOUT) - - val logDerivedData = eventLogSource match { - case EventLogSource.None => None - case EventLogSource.Rest => restDerivedData.logDerivedData - case EventLogSource.WebHdfs => - val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { - _.startTime - }.attemptId - Some(Await.result(sparkLogClient.fetchData(appId, lastAttemptId), DEFAULT_TIMEOUT)) - } - - SparkApplicationData(appId, restDerivedData, logDerivedData) + val appId = analyticJob.getAppId + val restDerivedData = Await.result(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest), DEFAULT_TIMEOUT) + + val logDerivedData = eventLogSource match { + case EventLogSource.None => None + case EventLogSource.Rest => restDerivedData.logDerivedData + case EventLogSource.WebHdfs => + val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { + _.startTime + }.attemptId + Some(Await.result(sparkLogClient.fetchData(appId, lastAttemptId), DEFAULT_TIMEOUT)) } + SparkApplicationData(appId, restDerivedData, logDerivedData) + } } @@ -135,12 +133,16 @@ object SparkFetcher { sealed trait EventLogSource object EventLogSource { + /** Fetch event logs through REST API. */ case object Rest extends EventLogSource + /** Fetch event logs through WebHDFS. */ case object WebHdfs extends EventLogSource + /** Event logs are not available. */ case object None extends EventLogSource + } val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled" diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkLogClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkLogClient.scala index 2bdefa15c..71023fab4 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkLogClient.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkLogClient.scala @@ -19,7 +19,7 @@ package com.linkedin.drelephant.spark.fetchers import java.io.InputStream import java.security.PrivilegedAction -import scala.concurrent.{ExecutionContext, Future, blocking} +import scala.concurrent.{ExecutionContext, Future} import scala.io.Source import com.linkedin.drelephant.security.HadoopSecurity @@ -62,9 +62,8 @@ class SparkLogClient(hadoopConfiguration: Configuration, sparkConf: SparkConf, e val (eventLogPath, eventLogCodec) = sparkUtils.pathAndCodecforEventLog(sparkConf, eventLogFileSystem, baseEventLogPath, appId, attemptId) - Future { blocking { + Future { sparkUtils.withEventLog(eventLogFileSystem, eventLogPath, eventLogCodec)(findDerivedData(_)) - } } } } diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala index 5f222e5d8..419c0bb39 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala @@ -25,7 +25,7 @@ import java.util.{Calendar, SimpleTimeZone} import com.linkedin.drelephant.spark.legacydata.LegacyDataConverters import org.apache.spark.deploy.history.SparkDataCollection -import scala.concurrent.{Await, ExecutionContext, Future, blocking} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.control.NonFatal import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.module.scala.DefaultScalaModule @@ -81,53 +81,40 @@ class SparkRestClient(sparkConf: SparkConf) { val (applicationInfo, attemptTarget) = getApplicationMetaData(appId) Future { - blocking { - val futureJobDatas = Future { - blocking { - getJobDatas(attemptTarget) - } - } - val futureStageDatas = Future { - blocking { - getStageDatas(attemptTarget) - } - } - val futureExecutorSummaries = Future { - blocking { - getExecutorSummaries(attemptTarget) - } + val futureJobDatas = Future { + getJobDatas(attemptTarget) + } + val futureStageDatas = Future { + getStageDatas(attemptTarget) + } + val futureExecutorSummaries = Future { + getExecutorSummaries(attemptTarget) + } + val futureLogData = if (fetchLogs) { + Future { + getLogData(attemptTarget) } - val futureLogData = if (fetchLogs) { - Future { - blocking { - getLogData(attemptTarget) - } - } - } else Future.successful(None) - - if (fetchFailedTasks) { - val futureFailedTasksDatas = Future { - blocking { - getStagesWithFailedTasks(attemptTarget) - } - } - SparkRestDerivedData( - applicationInfo, - Await.result(futureJobDatas, DEFAULT_TIMEOUT), - Await.result(futureStageDatas, DEFAULT_TIMEOUT), - Await.result(futureExecutorSummaries, DEFAULT_TIMEOUT), - Await.result(futureFailedTasksDatas, DEFAULT_TIMEOUT), - Await.result(futureLogData, DEFAULT_TIMEOUT)) - } else { - SparkRestDerivedData( - applicationInfo, - Await.result(futureJobDatas, DEFAULT_TIMEOUT), - Await.result(futureStageDatas, DEFAULT_TIMEOUT), - Await.result(futureExecutorSummaries, DEFAULT_TIMEOUT), - Seq.empty, - Await.result(futureLogData, DEFAULT_TIMEOUT) - ) + } else Future.successful(None) + if (fetchFailedTasks) { + val futureFailedTasksDatas = Future { + getStagesWithFailedTasks(attemptTarget) } + SparkRestDerivedData( + applicationInfo, + Await.result(futureJobDatas, DEFAULT_TIMEOUT), + Await.result(futureStageDatas, DEFAULT_TIMEOUT), + Await.result(futureExecutorSummaries, DEFAULT_TIMEOUT), + Await.result(futureFailedTasksDatas, DEFAULT_TIMEOUT), + Await.result(futureLogData, DEFAULT_TIMEOUT)) + } else { + SparkRestDerivedData( + applicationInfo, + Await.result(futureJobDatas, DEFAULT_TIMEOUT), + Await.result(futureStageDatas, DEFAULT_TIMEOUT), + Await.result(futureExecutorSummaries, DEFAULT_TIMEOUT), + Seq.empty, + Await.result(futureLogData, DEFAULT_TIMEOUT) + ) } } }