Skip to content

Commit

Permalink
Use our execution context and the configured ZK timeout everywhere.
Browse files Browse the repository at this point in the history
Conflicts:
	project/build.scala
	src/main/scala/mesosphere/marathon/Main.scala
	src/main/scala/mesosphere/marathon/MarathonModule.scala
	src/main/scala/mesosphere/marathon/MarathonScheduler.scala
	src/main/scala/mesosphere/marathon/MarathonSchedulerService.scala
	src/main/scala/mesosphere/marathon/api/v2/AppsResource.scala
	src/main/scala/mesosphere/marathon/state/AppRepository.scala
	src/main/scala/mesosphere/marathon/state/MarathonStore.scala
	src/main/scala/mesosphere/marathon/tasks/TaskTracker.scala
  • Loading branch information
Tobi Knaup committed Jun 5, 2014
1 parent c53e052 commit b28d684
Show file tree
Hide file tree
Showing 15 changed files with 96 additions and 101 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<chaos.version>0.5.6</chaos.version>
<jackson-ccm.version>0.1.0</jackson-ccm.version>
<mesos.version>0.18.2</mesos.version>
<mesos-utils.version>0.18.2-1</mesos-utils.version>
<mesos-utils.version>0.18.2-2</mesos-utils.version>
<akka.version>2.2.4</akka.version>
<spray.version>1.2.1</spray.version>
<json4s.version>3.2.5</json4s.version>
Expand Down
4 changes: 1 addition & 3 deletions src/main/scala/mesosphere/marathon/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ object Main extends App {
}

//TOOD(FL): Make Events optional / wire up.
lazy val conf = new ScallopConf(args)
with HttpConf with MarathonConf with AppConfiguration
with EventConfiguration with HttpEventConfiguration with ZookeeperConf
lazy val conf = new ScallopConf(args) with HttpConf with MarathonConf with AppConfiguration with EventConfiguration with HttpEventConfiguration

run(
classOf[HttpService],
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/mesosphere/marathon/MarathonConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import scala.sys.SystemProperties
* @author Tobi Knaup
*/

trait MarathonConf extends ScallopConf {
trait MarathonConf extends ScallopConf with ZookeeperConf {

lazy val mesosMaster = opt[String]("master",
descr = "The URL of the Mesos master",
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/mesosphere/marathon/MarathonModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ object ModuleNames {
final val NAMED_SERVER_SET_PATH = "SERVER_SET_PATH"
}

class MarathonModule(conf: MarathonConf with ZookeeperConf, zk: ZooKeeperClient)
extends AbstractModule {
class MarathonModule(conf: MarathonConf, zk: ZooKeeperClient)
extends AbstractModule {

val log = Logger.getLogger(getClass.getName)

Expand Down Expand Up @@ -60,7 +60,7 @@ class MarathonModule(conf: MarathonConf with ZookeeperConf, zk: ZooKeeperClient)
def provideMesosState(): State = {
new ZooKeeperState(
conf.zkHosts,
conf.zooKeeperTimeout.get.get,
conf.zkTimeoutDuration.toMillis,
TimeUnit.MILLISECONDS,
conf.zooKeeperStatePath
)
Expand Down
27 changes: 13 additions & 14 deletions src/main/scala/mesosphere/marathon/MarathonScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import mesosphere.mesos.TaskBuilder
import mesosphere.marathon.api.v1.AppDefinition
import mesosphere.marathon.api.v2.AppUpdate
import mesosphere.marathon.state.AppRepository
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.Future
import com.google.common.collect.Lists
import javax.inject.{Named, Inject}
import com.google.common.eventbus.EventBus
Expand Down Expand Up @@ -40,25 +40,24 @@ object MarathonScheduler {
val callbacks: SchedulerCallbacks = new MarathonSchedulerCallbacksImpl(Some(Main.injector.getInstance(classOf[MarathonSchedulerService])))
}

/**
* @author Tobi Knaup
*/
class MarathonScheduler @Inject()(
@Named(EventModule.busName) eventBus: Option[EventBus],
@Named("restMapper") mapper: ObjectMapper,
appRepository: AppRepository,
healthCheckManager: HealthCheckManager,
taskTracker: TaskTracker,
taskQueue: TaskQueue,
frameworkIdUtil: FrameworkIdUtil,
rateLimiters: RateLimiters
) extends Scheduler {
class MarathonScheduler @Inject() (
@Named(EventModule.busName) eventBus: Option[EventBus],
@Named("restMapper") mapper: ObjectMapper,
appRepository: AppRepository,
healthCheckManager: HealthCheckManager,
taskTracker: TaskTracker,
taskQueue: TaskQueue,
frameworkIdUtil: FrameworkIdUtil,
rateLimiters: RateLimiters,
config: MarathonConf) extends Scheduler {

private val log = Logger.getLogger(getClass.getName)

import ThreadPoolContext.context
import mesosphere.mesos.protos.Implicits._

implicit val zkTimeout = config.zkFutureTimeout

/**
* Returns a future containing the optional most recent version
* of the specified app from persistent storage.
Expand Down
33 changes: 15 additions & 18 deletions src/main/scala/mesosphere/marathon/MarathonSchedulerService.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package mesosphere.marathon

import org.apache.mesos.Protos.{TaskID, FrameworkInfo}
import org.apache.mesos.MesosSchedulerDriver
import org.apache.mesos.Protos.TaskID
import org.apache.log4j.Logger
import mesosphere.marathon.api.v1.AppDefinition
import mesosphere.marathon.api.v2.AppUpdate
import mesosphere.marathon.state.{AppRepository, Timestamp}
import com.google.common.util.concurrent.AbstractExecutionThreadService
import javax.inject.{Named, Inject}
import java.util.{TimerTask, Timer}
import scala.concurrent.{Future, ExecutionContext, Await}
import scala.concurrent.duration.{Duration, MILLISECONDS}
import javax.inject.{ Named, Inject }
import java.util.{ TimerTask, Timer }
import scala.concurrent.{ Future, Await }
import scala.concurrent.duration.MILLISECONDS
import java.util.concurrent.atomic.AtomicBoolean
import com.twitter.common.base.ExceptionalCommand
import com.twitter.common.zookeeper.Group.JoinException
Expand All @@ -23,7 +22,7 @@ import mesosphere.marathon.Protos.MarathonTask
import mesosphere.marathon.health.HealthCheckManager
import scala.concurrent.duration._
import java.util.concurrent.CountDownLatch
import mesosphere.util.ThreadPoolContext
import mesosphere.util.{ BackToTheFuture, ThreadPoolContext }

/**
* Wrapper class for the scheduler
Expand All @@ -42,6 +41,8 @@ class MarathonSchedulerService @Inject()(

import ThreadPoolContext.context

implicit val zkTimeout = config.zkFutureTimeout

val latch = new CountDownLatch(1)

// Time to wait before trying to reconcile app tasks after driver starts
Expand All @@ -56,7 +57,7 @@ class MarathonSchedulerService @Inject()(

val log = Logger.getLogger(getClass.getName)

val frameworkId = frameworkIdUtil.fetch()
val frameworkId = frameworkIdUtil.fetch
frameworkId match {
case Some(id) =>
log.info(s"Setting framework ID to ${id.getValue}")
Expand All @@ -69,10 +70,6 @@ class MarathonSchedulerService @Inject()(
// we have to allocate a new driver before each run or after each stop.
var driver = MarathonSchedulerDriver.newDriver(config, scheduler, frameworkId)

def defaultWait = {
appRepository.defaultWait
}

def startApp(app: AppDefinition): Future[_] = {
// Backwards compatibility
val oldPorts = app.ports
Expand All @@ -96,17 +93,17 @@ class MarathonSchedulerService @Inject()(
}

def listApps(): Iterable[AppDefinition] =
Await.result(appRepository.apps, defaultWait)
Await.result(appRepository.apps, config.zkTimeoutDuration)

def listAppVersions(appName: String): Iterable[Timestamp] =
Await.result(appRepository.listVersions(appName), defaultWait)
Await.result(appRepository.listVersions(appName), config.zkTimeoutDuration)

def getApp(appName: String): Option[AppDefinition] = {
Await.result(appRepository.currentVersion(appName), defaultWait)
Await.result(appRepository.currentVersion(appName), config.zkTimeoutDuration)
}

def getApp(appName: String, version: Timestamp) : Option[AppDefinition] = {
Await.result(appRepository.app(appName, version), defaultWait)
def getApp(appName: String, version: Timestamp): Option[AppDefinition] = {
Await.result(appRepository.app(appName, version), config.zkTimeoutDuration)
}

def killTasks(
Expand All @@ -117,7 +114,7 @@ class MarathonSchedulerService @Inject()(
if (scale) {
getApp(appName) foreach { app =>
val appUpdate = AppUpdate(instances = Some(app.instances - tasks.size))
Await.result(scheduler.updateApp(driver, appName, appUpdate), defaultWait)
Await.result(scheduler.updateApp(driver, appName, appUpdate), config.zkTimeoutDuration)
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/mesosphere/marathon/ZookeeperConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package mesosphere.marathon

import org.rogach.scallop.ScallopConf
import java.net.InetSocketAddress
import mesosphere.util.BackToTheFuture
import scala.concurrent.duration._

/**
* @author Tobi Knaup
Expand Down Expand Up @@ -51,4 +53,6 @@ trait ZookeeperConf extends ScallopConf {
def zkURL = zooKeeperUrl.get.getOrElse(s"zk://${zooKeeperHostString()}${zooKeeperPath()}")
lazy val zkHosts = zkURL match { case zkURLPattern(server, _) => server }
lazy val zkPath = zkURL match { case zkURLPattern(_, path) => path }
lazy val zkTimeoutDuration = Duration(zooKeeperTimeout(), MILLISECONDS)
lazy val zkFutureTimeout = BackToTheFuture.Timeout(zkTimeoutDuration)
}
11 changes: 6 additions & 5 deletions src/main/scala/mesosphere/marathon/api/v1/AppsResource.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package mesosphere.marathon.api.v1

import mesosphere.marathon.MarathonSchedulerService
import mesosphere.marathon.{ MarathonConf, MarathonSchedulerService }
import mesosphere.marathon.tasks.TaskTracker
import mesosphere.marathon.api.v2.AppUpdate
import mesosphere.marathon.event.{EventModule, ApiPostEvent}
Expand All @@ -23,7 +23,8 @@ import mesosphere.marathon.api.Responses
class AppsResource @Inject()(
@Named(EventModule.busName) eventBus: Option[EventBus],
service: MarathonSchedulerService,
taskTracker: TaskTracker) {
taskTracker: TaskTracker,
config: MarathonConf) {

val log = Logger.getLogger(getClass.getName)

Expand All @@ -36,7 +37,7 @@ class AppsResource @Inject()(
@Timed
def start(@Context req: HttpServletRequest, @Valid app: AppDefinition): Response = {
maybePostEvent(req, app)
Await.result(service.startApp(app), service.defaultWait)
Await.result(service.startApp(app), config.zkTimeoutDuration)
Response.noContent.build
}

Expand All @@ -45,7 +46,7 @@ class AppsResource @Inject()(
@Timed
def stop(@Context req: HttpServletRequest, app: AppDefinition): Response = {
maybePostEvent(req, app)
Await.result(service.stopApp(app), service.defaultWait)
Await.result(service.stopApp(app), config.zkTimeoutDuration)
Response.noContent.build
}

Expand All @@ -55,7 +56,7 @@ class AppsResource @Inject()(
def scale(@Context req: HttpServletRequest, @Valid app: AppDefinition): Response = {
maybePostEvent(req, app)
val appUpdate = AppUpdate(instances = Some(app.instances))
Await.result(service.updateApp(app.id, appUpdate), service.defaultWait)
Await.result(service.updateApp(app.id, appUpdate), config.zkTimeoutDuration)
Response.noContent.build
}

Expand Down
17 changes: 7 additions & 10 deletions src/main/scala/mesosphere/marathon/api/v2/AppsResource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,27 @@ import javax.ws.rs.core.{Response, Context, MediaType}
import javax.inject.{Named, Inject}
import mesosphere.marathon.event.EventModule
import com.google.common.eventbus.EventBus
import mesosphere.marathon.{MarathonSchedulerService, BadRequestException}
import mesosphere.marathon.{ BadRequestException, MarathonConf, MarathonSchedulerService }
import mesosphere.marathon.tasks.TaskTracker
import com.codahale.metrics.annotation.Timed
import com.sun.jersey.api.NotFoundException
import javax.servlet.http.HttpServletRequest
import javax.validation.Valid
import mesosphere.marathon.api.v1.AppDefinition
import scala.concurrent.Await
import mesosphere.marathon.event.ApiPostEvent
import java.net.URI
import mesosphere.marathon.health.HealthCheckManager
import mesosphere.marathon.api.{PATCH, Responses}
import mesosphere.marathon.api.Responses

/**
* @author Tobi Knaup
*/

@Path("v2/apps")
@Consumes(Array(MediaType.APPLICATION_JSON))
class AppsResource @Inject()(
@Named(EventModule.busName) eventBus: Option[EventBus],
service: MarathonSchedulerService,
taskTracker: TaskTracker,
healthCheckManager: HealthCheckManager) {
healthCheckManager: HealthCheckManager,
config: MarathonConf) {

@GET
@Timed
Expand All @@ -52,7 +49,7 @@ class AppsResource @Inject()(
validateContainerOpts(app)

maybePostEvent(req, app)
Await.result(service.startApp(app), service.defaultWait)
Await.result(service.startApp(app), config.zkTimeoutDuration)
Response.created(new URI(s"${app.id}")).build
}

Expand Down Expand Up @@ -81,7 +78,7 @@ class AppsResource @Inject()(
case Some(app) =>
validateContainerOpts(updatedApp)
maybePostEvent(req, updatedApp)
Await.result(service.updateApp(id, appUpdate), service.defaultWait)
Await.result(service.updateApp(id, appUpdate), config.zkTimeoutDuration)
Response.noContent.build

case None => create(req, updatedApp)
Expand All @@ -94,7 +91,7 @@ class AppsResource @Inject()(
def delete(@Context req: HttpServletRequest, @PathParam("id") id: String): Response = {
val app = AppDefinition(id = id)
maybePostEvent(req, app)
Await.result(service.stopApp(app), service.defaultWait)
Await.result(service.stopApp(app), config.zkTimeoutDuration)
Response.noContent.build
}

Expand Down
5 changes: 0 additions & 5 deletions src/main/scala/mesosphere/marathon/state/AppRepository.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ class AppRepository(store: PersistenceStore[AppDefinition]) {

protected val ID_DELIMITER = ":"

val defaultWait = store match {
case m: MarathonStore[_] => m.defaultWait
case _ => Duration(3, SECONDS)
}

/**
* Returns the most recently stored app with the supplied id.
*/
Expand Down
18 changes: 8 additions & 10 deletions src/main/scala/mesosphere/marathon/state/MarathonStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@ import com.google.protobuf.InvalidProtocolBufferException
import org.apache.mesos.state.State
import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration.Duration
import mesosphere.marathon.StorageException
import com.google.common.cache.{CacheLoader, CacheBuilder}
import java.util.concurrent.Semaphore
import mesosphere.util.{ ThreadPoolContext, BackToTheFuture }

/**
* @author Tobi Knaup
*/

class MarathonStore[S <: MarathonState[_, S]](state: State,
newState: () => S, prefix:String = "app:") extends PersistenceStore[S] {
newState: () => S,
prefix: String = "app:",
implicit val timeout: BackToTheFuture.Timeout = BackToTheFuture.Implicits.defaultTimeout) extends PersistenceStore[S] {

val defaultWait = Duration(3, "seconds")
private [this] val locks = {
import ThreadPoolContext.context
import BackToTheFuture.futureToFutureOption

private[this] val locks = {
CacheBuilder
.newBuilder()
.weakValues()
Expand All @@ -28,9 +29,6 @@ class MarathonStore[S <: MarathonState[_, S]](state: State,
)
}

import mesosphere.util.ThreadPoolContext.context
import mesosphere.util.BackToTheFuture.futureToFutureOption

def fetch(key: String): Future[Option[S]] = {
state.fetch(prefix + key) map {
case Some(variable) => stateFromBytes(variable.value)
Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/mesosphere/marathon/tasks/TaskTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@ import org.apache.mesos.Protos.{TaskID, TaskStatus}
import javax.inject.Inject
import org.apache.mesos.state.{Variable, State}
import mesosphere.marathon.Protos._
import mesosphere.marathon.Main
import mesosphere.marathon.{ MarathonConf, Main }
import java.io._
import scala.Some
import scala.concurrent.Future
import org.apache.log4j.Logger
import mesosphere.util.{ ThreadPoolContext, BackToTheFuture }

/**
* @author Tobi Knaup
*/

class TaskTracker @Inject()(state: State) {
class TaskTracker @Inject() (state: State, config: MarathonConf) {

import TaskTracker.App
import mesosphere.util.ThreadPoolContext.context
import mesosphere.util.BackToTheFuture.futureToFuture
import ThreadPoolContext.context
import BackToTheFuture.futureToFuture

implicit val timeout = config.zkFutureTimeout

private[this] val log = Logger.getLogger(getClass.getName)

Expand Down
Loading

0 comments on commit b28d684

Please sign in to comment.