Skip to content

Commit

Permalink
Merge 5c394d4 into 1f806c9
Browse files Browse the repository at this point in the history
  • Loading branch information
izeigerman committed Aug 19, 2019
2 parents 1f806c9 + 5c394d4 commit 13f01fc
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 24 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ script:

matrix:
include:
- jdk: oraclejdk8
- jdk: oraclejdk9
scala: 2.11.11
- jdk: oraclejdk8
- jdk: oraclejdk9
scala: 2.12.6

before_cache:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,14 @@ final class YarnLauncher(yarnConf: YarnConfiguration,

private val logger = LoggerFactory.getLogger(classOf[YarnLauncher])

private def withYarnClient[T](f: YarnLauncherClient => T): T = {
private def withYarnClient[T](f: YarnLauncherClient => Future[T]): Future[T] = {
val yarnClient = yarnClientCreator()
try {
val yarnClientStarted = Future {
yarnClient.init(yarnConf)
yarnClient.start()
f(yarnClient)
} finally {
yarnClient.stop()
}
val result = yarnClientStarted.flatMap(_ => f(yarnClient))
result.andThen { case _ => yarnClient.stop() }
}

private def retrieveMasterHost(yarnClient: YarnLauncherClient,
Expand Down Expand Up @@ -200,10 +199,8 @@ final class YarnLauncher(yarnConf: YarnConfiguration,
}

override def launch(config: Config, args: LaunchArguments): Future[LaunchResult] = {
Future {
withYarnClient { yarnClient =>
launchWithClient(yarnClient, config, args)
}
}.flatMap(identity) // There is no Future.flatten method in Scala 2.11.
withYarnClient { yarnClient =>
launchWithClient(yarnClient, config, args)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,27 +108,32 @@ class YarnLauncherSpec extends FlatSpec with Matchers with MockFactory with Befo
}

"A YARN Launcher" should "launch application master successfully" in {
val yarnConfig = new YarnConfiguration()
val yarnClient = mock[YarnLauncherClient]
(yarnClient.init _).expects(yarnConfig)
(yarnClient.start _).expects()
(yarnClient.stop _).expects()

val yarnClientApp = createYarnClientApplication(1)
val appContext = yarnClientApp.getApplicationSubmissionContext
val appId = appContext.getApplicationId
(yarnClient.createApplication _).expects().returns(yarnClientApp)
(yarnClient.submitApplication _).expects(appContext).returns(appId)

val reportGenerator = new ReportGeneratorBuilder(appId)
.reportState(YarnApplicationState.ACCEPTED, 1)
.reportState(YarnApplicationState.SUBMITTED, 1)
.reportState(YarnApplicationState.RUNNING, 1)
.build
(yarnClient.getApplicationReport _)
.expects(appId)
.onCall((_: ApplicationId) => reportGenerator.getNextReport())
.repeated(3)

val yarnConfig = new YarnConfiguration()
val yarnClient = mock[YarnLauncherClient]
inSequence {
(yarnClient.init _).expects(yarnConfig)
(yarnClient.start _).expects()

(yarnClient.createApplication _).expects().returns(yarnClientApp)
(yarnClient.submitApplication _).expects(appContext).returns(appId)

(yarnClient.getApplicationReport _)
.expects(appId)
.onCall((_: ApplicationId) => reportGenerator.getNextReport())
.repeated(3)

(yarnClient.stop _).expects()
}

val launcher = new YarnLauncher(yarnConfig, () => yarnClient)
val launchArgs = createTestLaunchArguments
Expand Down

0 comments on commit 13f01fc

Please sign in to comment.