Skip to content

Commit

Permalink
Fixes #520 - fetch the frameworkId from the state store on MesosDrive…
Browse files Browse the repository at this point in the history
…r creation
  • Loading branch information
gkleiman committed Aug 17, 2015
1 parent dc22aba commit ef0976a
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,19 @@ import javax.inject.Named
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.util.Timeout
import com.codahale.metrics.MetricRegistry
import org.apache.mesos.chronos.notification.{JobNotificationObserver, MailClient, RavenClient, SlackClient, HttpClient}
import org.apache.mesos.chronos.scheduler.graph.JobGraph
import org.apache.mesos.chronos.scheduler.jobs.stats.JobStats
import org.apache.mesos.chronos.scheduler.jobs.{JobsObserver, JobMetrics, JobScheduler, TaskManager}
import org.apache.mesos.chronos.scheduler.mesos._
import org.apache.mesos.chronos.scheduler.state.PersistenceStore
import com.google.common.util.concurrent.{ListeningScheduledExecutorService, MoreExecutors, ThreadFactoryBuilder}
import com.google.inject.{AbstractModule, Inject, Provides, Singleton}
import mesosphere.chaos.http.HttpConf
import mesosphere.mesos.util.FrameworkIdUtil
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.leader.LeaderLatch
import org.apache.mesos.Protos.FrameworkInfo
import org.apache.mesos.Scheduler
import org.apache.mesos.chronos.notification.{HttpClient, JobNotificationObserver, MailClient, RavenClient, SlackClient}
import org.apache.mesos.chronos.scheduler.graph.JobGraph
import org.apache.mesos.chronos.scheduler.jobs.stats.JobStats
import org.apache.mesos.chronos.scheduler.jobs.{JobMetrics, JobScheduler, JobsObserver, TaskManager}
import org.apache.mesos.chronos.scheduler.mesos._
import org.apache.mesos.chronos.scheduler.state.PersistenceStore
import org.joda.time.Seconds

import scala.concurrent.duration._
Expand All @@ -48,41 +47,10 @@ class MainModule(val config: SchedulerConfiguration with HttpConf)
bind(classOf[JobGraph]).asEagerSingleton()
}

@Inject
@Singleton
@Provides
def provideFrameworkInfo(frameworkIdUtil: FrameworkIdUtil): FrameworkInfo = {
import mesosphere.util.BackToTheFuture.Implicits.defaultTimeout
import scala.concurrent.ExecutionContext.Implicits.global

val frameworkInfoBuilder = FrameworkInfo.newBuilder()
.setName(config.mesosFrameworkName())
.setCheckpoint(config.mesosCheckpoint())
.setRole(config.mesosRole())
.setFailoverTimeout(config.failoverTimeoutSeconds())
.setUser(config.user())

config.mesosAuthenticationPrincipal.get.foreach(frameworkInfoBuilder.setPrincipal)

if (config.webuiUrl.isSupplied) {
frameworkInfoBuilder.setWebuiUrl(config.webuiUrl())
} else if (config.sslKeystorePath.isDefined) {
// ssl enabled, use https
frameworkInfoBuilder.setWebuiUrl(s"https://${config.hostname()}:${config.httpsPort()}")
} else {
// ssl disabled, use http
frameworkInfoBuilder.setWebuiUrl(s"http://${config.hostname()}:${config.httpPort()}")
}

frameworkIdUtil.setIdIfExists(frameworkInfoBuilder)
frameworkInfoBuilder.build()
}


@Singleton
@Provides
def provideMesosSchedulerDriverFactory(mesosScheduler: Scheduler, frameworkInfo: FrameworkInfo): MesosDriverFactory =
new MesosDriverFactory(mesosScheduler, frameworkInfo, config)
def provideMesosSchedulerDriverFactory(mesosScheduler: Scheduler, frameworkIdUtil: FrameworkIdUtil): MesosDriverFactory =
new MesosDriverFactory(mesosScheduler, frameworkIdUtil, config)

@Singleton
@Provides
Expand Down
Original file line number Diff line number Diff line change
@@ -1,93 +1,59 @@
package org.apache.mesos.chronos.scheduler.mesos

import java.io.{ FileInputStream, IOException }
import java.nio.file.attribute.PosixFilePermission
import java.nio.file.{ Files, Paths }
import java.util.logging.Logger

import com.google.protobuf.ByteString
import org.apache.mesos.Protos.{ Credential, FrameworkInfo, Status }
import mesosphere.chaos.http.HttpConf
import mesosphere.mesos.util.FrameworkIdUtil
import org.apache.mesos.Protos.{ FrameworkID, Status }
import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.{ MesosSchedulerDriver, Scheduler, SchedulerDriver }

import scala.collection.JavaConverters.asScalaSetConverter
import org.apache.mesos.{ Scheduler, SchedulerDriver }

/**
* The chronos driver doesn't allow calling the start() method after stop() has been called, thus we need a factory to
* create a new driver once we call stop() - which will be called if the leader abdicates or is no longer a leader.
* @author Florian Leibert (flo@leibert.de)
*/
class MesosDriverFactory(val mesosScheduler: Scheduler, val frameworkInfo: FrameworkInfo, val config: SchedulerConfiguration) {
class MesosDriverFactory(
val scheduler: Scheduler,
val frameworkIdUtil: FrameworkIdUtil,
val config: SchedulerConfiguration with HttpConf,
val schedulerDriverBuilder: SchedulerDriverBuilder = new SchedulerDriverBuilder) {

private[this] val log = Logger.getLogger(getClass.getName)

var mesosDriver: Option[SchedulerDriver] = None

def start() {
def start() = {
val status = get().start()
if (status != Status.DRIVER_RUNNING) {
log.severe(s"MesosSchedulerDriver start resulted in status:$status. Committing suicide!")
log.severe(s"MesosSchedulerDriver start resulted in status: $status. Committing suicide!")
System.exit(1)
}
}

def get(): SchedulerDriver = {
if (mesosDriver.isEmpty) {
makeDriver()
mesosDriver = Some(makeDriver())
}
mesosDriver.get
}

def makeDriver() {

val driver = config.mesosAuthenticationPrincipal.get match {
case Some(principal) =>
val credential = buildMesosCredentials(principal, config.mesosAuthenticationSecretFile.get)
new MesosSchedulerDriver(mesosScheduler, frameworkInfo, config.master(), credential)
case None =>
new MesosSchedulerDriver(mesosScheduler, frameworkInfo, config.master())
}

mesosDriver = Option(driver)
}

def close() {
def close() = {
assert(mesosDriver.nonEmpty, "Attempted to close a non initialized driver")
if (mesosDriver.isEmpty) {
log.severe("Attempted to close a non initialized driver")
System.exit(1)
}

mesosDriver.get.stop(true)
mesosDriver = None
}

private[this] def makeDriver(): SchedulerDriver = {
import scala.concurrent.ExecutionContext.Implicits.global
import mesosphere.util.BackToTheFuture.Implicits.defaultTimeout

/**
* Create the optional credentials instance, used to authenticate calls from Chronos to Mesos.
*/
def buildMesosCredentials(principal: String, secretFile: Option[String]): Credential = {

val credentialBuilder = Credential.newBuilder()
.setPrincipal(principal)

secretFile foreach { file =>
try {
val secretBytes = ByteString.readFrom(new FileInputStream(file))

val filePermissions = Files.getPosixFilePermissions(Paths.get(file)).asScala
if (!(filePermissions & Set(PosixFilePermission.OTHERS_READ, PosixFilePermission.OTHERS_WRITE)).isEmpty)
log.warning(s"Secret file $file should not be globally accessible.")

credentialBuilder.setSecret(secretBytes)
}
catch {
case cause: Throwable =>
throw new IOException(s"Error reading authentication secret from file [$file]", cause)
}
}

credentialBuilder.build()
val maybeFrameworkID: Option[FrameworkID] = frameworkIdUtil.fetch
schedulerDriverBuilder.newDriver(config, maybeFrameworkID, scheduler)
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.apache.mesos.chronos.scheduler.mesos

import java.io.{ IOException, FileInputStream }
import java.nio.file.attribute.PosixFilePermission
import java.nio.file.{ Paths, Files }
import java.util.logging.Logger

import com.google.protobuf.ByteString
import mesosphere.chaos.http.HttpConf
import org.apache.mesos.Protos.{ Credential, FrameworkID, FrameworkInfo }
import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.{ MesosSchedulerDriver, Scheduler, SchedulerDriver }

import scala.collection.JavaConverters.asScalaSetConverter

class SchedulerDriverBuilder {
private[this] val log = Logger.getLogger(getClass.getName)

def newDriver(config: SchedulerConfiguration with HttpConf,
frameworkId: Option[FrameworkID],
scheduler: Scheduler): SchedulerDriver = {
def buildCredentials(principal: String, secretFile: String): Credential = {
val credentialBuilder = Credential.newBuilder().setPrincipal(principal)

try {
val secretBytes = ByteString.readFrom(new FileInputStream(secretFile))

val filePermissions = Files.getPosixFilePermissions(Paths.get(secretFile)).asScala
if ((filePermissions & Set(PosixFilePermission.OTHERS_READ, PosixFilePermission.OTHERS_WRITE)).nonEmpty)
log.warning(s"Secret file $secretFile should not be globally accessible.")

credentialBuilder.setSecret(secretBytes)
}
catch {
case cause: Throwable =>
throw new IOException(s"Error reading authentication secret from file [$secretFile]", cause)
}

credentialBuilder.build()
}

val frameworkInfoBuilder = FrameworkInfo.newBuilder()
.setName(config.mesosFrameworkName())
.setCheckpoint(config.mesosCheckpoint())
.setRole(config.mesosRole())
.setFailoverTimeout(config.failoverTimeoutSeconds())
.setUser(config.user())

// Set the ID, if provided
frameworkId.foreach(frameworkInfoBuilder.setId)

if (config.webuiUrl.isSupplied) {
frameworkInfoBuilder.setWebuiUrl(config.webuiUrl())
}
else if (config.sslKeystorePath.isDefined) {
// ssl enabled, use https
frameworkInfoBuilder.setWebuiUrl(s"https://${config.hostname()}:${config.httpsPort()}")
}
else {
// ssl disabled, use http
frameworkInfoBuilder.setWebuiUrl(s"http://${config.hostname()}:${config.httpPort()}")
}

// set the authentication principal, if provided
config.mesosAuthenticationPrincipal.get.foreach(frameworkInfoBuilder.setPrincipal)

val frameworkInfo = frameworkInfoBuilder.build()

val credential: Option[Credential] = config.mesosAuthenticationPrincipal.get.flatMap { principal =>
config.mesosAuthenticationSecretFile.get.map { secretFile =>
buildCredentials(principal, secretFile)
}
}

credential match {
case Some(cred) =>
new MesosSchedulerDriver(scheduler, frameworkInfo, config.master(), cred)

case None =>
new MesosSchedulerDriver(scheduler, frameworkInfo, config.master())
}
}
}
18 changes: 18 additions & 0 deletions src/test/scala/org/apache/mesos/chronos/ChronosSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.apache.mesos.chronos

import mesosphere.chaos.http.HttpConf
import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.rogach.scallop.ScallopConf
import org.specs2.mock.Mockito
import org.specs2.mutable.SpecificationWithJUnit

trait ChronosSpec extends SpecificationWithJUnit with Mockito {
def makeConfig(args: String*): SchedulerConfiguration with HttpConf = {
val opts = new ScallopConf(args) with SchedulerConfiguration with HttpConf {
// scallop will trigger sys exit
override protected def onError(e: Throwable): Unit = throw e
}
opts.afterInit()
opts
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,14 @@
package org.apache.mesos.chronos.scheduler.jobs

import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import com.codahale.metrics.MetricRegistry
import com.google.common.util.concurrent.ListeningScheduledExecutorService
import org.apache.mesos.chronos.ChronosSpec
import org.apache.mesos.chronos.scheduler.graph.JobGraph
import org.apache.mesos.chronos.scheduler.mesos.MesosOfferReviver
import org.apache.mesos.chronos.scheduler.state.PersistenceStore
import com.codahale.metrics.MetricRegistry
import com.google.common.util.concurrent.ListeningScheduledExecutorService
import org.joda.time._
import org.rogach.scallop.ScallopConf
import org.specs2.mock._
import org.specs2.mutable._

class TaskManagerSpec extends SpecificationWithJUnit with Mockito {

private[this] def makeConfig(args: String*): SchedulerConfiguration = {
val opts = new ScallopConf(args) with SchedulerConfiguration {
// scallop will trigger sys exit
override protected def onError(e: Throwable): Unit = throw e
}
opts.afterInit()
opts
}

class TaskManagerSpec extends ChronosSpec {
"TaskManager" should {
"Calculate the correct time delay between scheduling and dispatching the job" in {
val taskManager = new TaskManager(mock[ListeningScheduledExecutorService], mock[PersistenceStore],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.mesos.chronos.scheduler.mesos

import mesosphere.mesos.util.FrameworkIdUtil
import org.apache.mesos.Scheduler
import org.apache.mesos.chronos.ChronosSpec

class MesosDriverFactorySpec extends ChronosSpec {
"MesosDriverFactorySpec" should {
"always fetch the frameworkId from the state store before creating a driver" in {
val scheduler: Scheduler = mock[Scheduler]
val frameworkIdUtil: FrameworkIdUtil = mock[FrameworkIdUtil]
val mesosDriverFactory = new MesosDriverFactory(
scheduler,
frameworkIdUtil,
makeConfig(),
mock[SchedulerDriverBuilder])

frameworkIdUtil.fetch(any, any).returns(None)

mesosDriverFactory.get()

there was one(frameworkIdUtil).fetch(any, any)
ok
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,14 @@ package org.apache.mesos.chronos.scheduler.mesos
import mesosphere.mesos.protos._
import mesosphere.mesos.util.FrameworkIdUtil
import org.apache.mesos.Protos.Offer
import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.chronos.ChronosSpec
import org.apache.mesos.chronos.scheduler.jobs.{ BaseJob, JobScheduler, TaskManager }
import org.apache.mesos.{ Protos, SchedulerDriver }
import org.mockito.Mockito.{ doNothing, doReturn }
import org.rogach.scallop.ScallopConf
import org.specs2.mock._
import org.specs2.mutable._

import scala.collection.mutable

class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito {

private[this] def makeConfig(args: String*): SchedulerConfiguration = {
val opts = new ScallopConf(args) with SchedulerConfiguration {
// scallop will trigger sys exit
override protected def onError(e: Throwable): Unit = throw e
}
opts.afterInit()
opts
}

class MesosJobFrameworkSpec extends ChronosSpec {
"MesosJobFramework" should {
"Revive offers when registering" in {
val mockMesosOfferReviver = mock[MesosOfferReviver]
Expand Down
Loading

0 comments on commit ef0976a

Please sign in to comment.