Skip to content

Commit

Permalink
Removing blocking keyword so as to prevent a large number of threads …
Browse files Browse the repository at this point in the history
…being spawned (#362)
  • Loading branch information
skakker authored and akshayrai committed Apr 6, 2018
1 parent 3436b55 commit 23f4af9
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 65 deletions.
34 changes: 18 additions & 16 deletions app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.linkedin.drelephant.spark.fetchers

import java.util.concurrent.TimeoutException

import scala.concurrent.{Await, ExecutionContext, Future, blocking}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.{Duration, SECONDS}
import scala.util.{Failure, Success, Try}
import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
Expand Down Expand Up @@ -110,22 +110,20 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
}

private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = Future {
blocking {
val appId = analyticJob.getAppId
val restDerivedData = Await.result(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest), DEFAULT_TIMEOUT)

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
case EventLogSource.WebHdfs =>
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy {
_.startTime
}.attemptId
Some(Await.result(sparkLogClient.fetchData(appId, lastAttemptId), DEFAULT_TIMEOUT))
}

SparkApplicationData(appId, restDerivedData, logDerivedData)
val appId = analyticJob.getAppId
val restDerivedData = Await.result(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest), DEFAULT_TIMEOUT)

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
case EventLogSource.WebHdfs =>
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy {
_.startTime
}.attemptId
Some(Await.result(sparkLogClient.fetchData(appId, lastAttemptId), DEFAULT_TIMEOUT))
}
SparkApplicationData(appId, restDerivedData, logDerivedData)

}

}
Expand All @@ -135,12 +133,16 @@ object SparkFetcher {
sealed trait EventLogSource

object EventLogSource {

/** Fetch event logs through REST API. */
case object Rest extends EventLogSource

/** Fetch event logs through WebHDFS. */
case object WebHdfs extends EventLogSource

/** Event logs are not available. */
case object None extends EventLogSource

}

val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.linkedin.drelephant.spark.fetchers
import java.io.InputStream
import java.security.PrivilegedAction

import scala.concurrent.{ExecutionContext, Future, blocking}
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source

import com.linkedin.drelephant.security.HadoopSecurity
Expand Down Expand Up @@ -62,9 +62,8 @@ class SparkLogClient(hadoopConfiguration: Configuration, sparkConf: SparkConf, e
val (eventLogPath, eventLogCodec) =
sparkUtils.pathAndCodecforEventLog(sparkConf, eventLogFileSystem, baseEventLogPath, appId, attemptId)

Future { blocking {
Future {
sparkUtils.withEventLog(eventLogFileSystem, eventLogPath, eventLogCodec)(findDerivedData(_))
}
}
}
}
Expand Down
79 changes: 33 additions & 46 deletions app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.{Calendar, SimpleTimeZone}
import com.linkedin.drelephant.spark.legacydata.LegacyDataConverters
import org.apache.spark.deploy.history.SparkDataCollection

import scala.concurrent.{Await, ExecutionContext, Future, blocking}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.control.NonFatal
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
Expand Down Expand Up @@ -81,53 +81,40 @@ class SparkRestClient(sparkConf: SparkConf) {
val (applicationInfo, attemptTarget) = getApplicationMetaData(appId)

Future {
blocking {
val futureJobDatas = Future {
blocking {
getJobDatas(attemptTarget)
}
}
val futureStageDatas = Future {
blocking {
getStageDatas(attemptTarget)
}
}
val futureExecutorSummaries = Future {
blocking {
getExecutorSummaries(attemptTarget)
}
val futureJobDatas = Future {
getJobDatas(attemptTarget)
}
val futureStageDatas = Future {
getStageDatas(attemptTarget)
}
val futureExecutorSummaries = Future {
getExecutorSummaries(attemptTarget)
}
val futureLogData = if (fetchLogs) {
Future {
getLogData(attemptTarget)
}
val futureLogData = if (fetchLogs) {
Future {
blocking {
getLogData(attemptTarget)
}
}
} else Future.successful(None)

if (fetchFailedTasks) {
val futureFailedTasksDatas = Future {
blocking {
getStagesWithFailedTasks(attemptTarget)
}
}
SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, DEFAULT_TIMEOUT),
Await.result(futureFailedTasksDatas, DEFAULT_TIMEOUT),
Await.result(futureLogData, DEFAULT_TIMEOUT))
} else {
SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, DEFAULT_TIMEOUT),
Seq.empty,
Await.result(futureLogData, DEFAULT_TIMEOUT)
)
} else Future.successful(None)
if (fetchFailedTasks) {
val futureFailedTasksDatas = Future {
getStagesWithFailedTasks(attemptTarget)
}
SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, DEFAULT_TIMEOUT),
Await.result(futureFailedTasksDatas, DEFAULT_TIMEOUT),
Await.result(futureLogData, DEFAULT_TIMEOUT))
} else {
SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, DEFAULT_TIMEOUT),
Seq.empty,
Await.result(futureLogData, DEFAULT_TIMEOUT)
)
}
}
}
Expand Down

0 comments on commit 23f4af9

Please sign in to comment.