From 38ced8ad6f5f90ab659a0fce5b4573c68ff4b7bf Mon Sep 17 00:00:00 2001 From: swasti Date: Fri, 6 Apr 2018 11:43:56 +0530 Subject: [PATCH 1/2] removing blocking --- .../spark/fetchers/SparkFetcher.scala | 34 ++++++------- .../spark/fetchers/SparkRestClient.scala | 51 ++++++++----------- 2 files changed, 38 insertions(+), 47 deletions(-) diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala index cf14a29fb..31d046bf5 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala @@ -18,7 +18,7 @@ package com.linkedin.drelephant.spark.fetchers import java.util.concurrent.TimeoutException -import scala.concurrent.{Await, ExecutionContext, Future, blocking} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.{Duration, SECONDS} import scala.util.{Failure, Success, Try} import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher} @@ -110,24 +110,20 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData) } private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = Future { - blocking { - val appId = analyticJob.getAppId - val restDerivedData = Await.result(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest), DEFAULT_TIMEOUT) - - val logDerivedData = eventLogSource match { - case EventLogSource.None => None - case EventLogSource.Rest => restDerivedData.logDerivedData - case EventLogSource.WebHdfs => - val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { - _.startTime - }.attemptId - Some(Await.result(sparkLogClient.fetchData(appId, lastAttemptId), DEFAULT_TIMEOUT)) - } - - SparkApplicationData(appId, restDerivedData, logDerivedData) + val appId = analyticJob.getAppId + val restDerivedData = Await.result(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest), DEFAULT_TIMEOUT) + + val logDerivedData = eventLogSource match { + case EventLogSource.None => None + case EventLogSource.Rest => restDerivedData.logDerivedData + case EventLogSource.WebHdfs => + val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { + _.startTime + }.attemptId + Some(Await.result(sparkLogClient.fetchData(appId, lastAttemptId), DEFAULT_TIMEOUT)) } + SparkApplicationData(appId, restDerivedData, logDerivedData) } - } object SparkFetcher { @@ -135,12 +131,16 @@ object SparkFetcher { sealed trait EventLogSource object EventLogSource { + /** Fetch event logs through REST API. */ case object Rest extends EventLogSource + /** Fetch event logs through WebHDFS. */ case object WebHdfs extends EventLogSource + /** Event logs are not available. */ case object None extends EventLogSource + } val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled" diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala index 8eb1144e6..7192a3b11 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala @@ -81,38 +81,29 @@ class SparkRestClient(sparkConf: SparkConf) { val (applicationInfo, attemptTarget) = getApplicationMetaData(appId) Future { - blocking { - val futureJobDatas = Future { - blocking { - getJobDatas(attemptTarget) - } - } - val futureStageDatas = Future { - blocking { - getStageDatas(attemptTarget) - } - } - val futureExecutorSummaries = Future { - blocking { - getExecutorSummaries(attemptTarget) - } + val futureJobDatas = Future { + getJobDatas(attemptTarget) + } + val futureStageDatas = Future { + getStageDatas(attemptTarget) + } + val futureExecutorSummaries = Future { + getExecutorSummaries(attemptTarget) + } + val futureLogData = if (fetchLogs) { + Future { + getLogData(attemptTarget) } - val futureLogData = if (fetchLogs) { - Future { - blocking { - getLogData(attemptTarget) - } - } - } else Future.successful(None) + } else Future.successful(None) + + SparkRestDerivedData( + applicationInfo, + Await.result(futureJobDatas, DEFAULT_TIMEOUT), + Await.result(futureStageDatas, DEFAULT_TIMEOUT), + Await.result(futureExecutorSummaries, Duration(5, SECONDS)), + Await.result(futureLogData, Duration(5, SECONDS)) + ) - SparkRestDerivedData( - applicationInfo, - Await.result(futureJobDatas, DEFAULT_TIMEOUT), - Await.result(futureStageDatas, DEFAULT_TIMEOUT), - Await.result(futureExecutorSummaries, Duration(5, SECONDS)), - Await.result(futureLogData, Duration(5, SECONDS)) - ) - } } } From a63a4077104a01aee928a6ee8b72334ba3f39074 Mon Sep 17 00:00:00 2001 From: swasti Date: Fri, 6 Apr 2018 14:35:37 +0530 Subject: [PATCH 2/2] removed blocking from logClient as well --- .../linkedin/drelephant/spark/fetchers/SparkLogClient.scala | 5 ++--- .../linkedin/drelephant/spark/fetchers/SparkRestClient.scala | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkLogClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkLogClient.scala index 2bdefa15c..2a6464627 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkLogClient.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkLogClient.scala @@ -19,7 +19,7 @@ package com.linkedin.drelephant.spark.fetchers import java.io.InputStream import java.security.PrivilegedAction -import scala.concurrent.{ExecutionContext, Future, blocking} +import scala.concurrent.{ExecutionContext, Future} import scala.io.Source import com.linkedin.drelephant.security.HadoopSecurity @@ -62,9 +62,8 @@ class SparkLogClient(hadoopConfiguration: Configuration, sparkConf: SparkConf, e val (eventLogPath, eventLogCodec) = sparkUtils.pathAndCodecforEventLog(sparkConf, eventLogFileSystem, baseEventLogPath, appId, attemptId) - Future { blocking { + Future { sparkUtils.withEventLog(eventLogFileSystem, eventLogPath, eventLogCodec)(findDerivedData(_)) - } } } } diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala index 7192a3b11..325db5d6d 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala @@ -25,7 +25,7 @@ import java.util.{Calendar, SimpleTimeZone} import com.linkedin.drelephant.spark.legacydata.LegacyDataConverters import org.apache.spark.deploy.history.SparkDataCollection -import scala.concurrent.{Await, ExecutionContext, Future, blocking} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.control.NonFatal import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.module.scala.DefaultScalaModule