Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use InMemoryPersistenceStorage for persisting pod records #80

Merged
merged 28 commits into from May 8, 2019
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
19681f1
Update core module to use persistence module and use PodRecordRepository
takirala Mar 29, 2019
eb28b79
Update RecordRepository interface
takirala Mar 29, 2019
0e5c5c6
Add an integration test
takirala Mar 29, 2019
eccd7ff
Revert unintentional changes
takirala Mar 29, 2019
19cbb44
Emit StateSnapshot on crash-recovery
takirala Mar 30, 2019
9ac896d
Draft future-less persistence storage flow
takirala Apr 8, 2019
92c1c08
Use async call back for thread safe access
takirala Apr 8, 2019
f71b932
Add a unit test case to validate persistence graph stage
takirala Apr 11, 2019
3772c77
Make all methods in RecordRepository to be stream based
takirala Apr 11, 2019
d895c68
Cleanup tests
takirala Apr 11, 2019
3fd9cb3
Cleanup code
takirala Apr 12, 2019
5baccdd
Merge master and resolve conflicts
takirala Apr 12, 2019
21810fc
Address review comments
takirala Apr 14, 2019
6d2c943
Address review comments
takirala Apr 16, 2019
6d8faf8
Merge master and resolve conflicts
takirala Apr 16, 2019
88959c6
Use akka.pattern.after instead of Thread.sleep
takirala Apr 16, 2019
adcdd60
Change sleep max time from 1000 to 100
takirala Apr 16, 2019
fa19488
Revert unintentional formatting changes
takirala Apr 17, 2019
3a0e7a2
Address review comments
takirala Apr 18, 2019
4b6af6d
Add code comments
takirala Apr 18, 2019
1f6ec8e
changes from pairing
Apr 19, 2019
e30961e
Enforce pipelining limit across multiple SchedulerEvents
takirala Apr 19, 2019
265e7fd
Code reformat
takirala Apr 23, 2019
b41c1c8
Remove unused method - fromSnapshot
takirala Apr 24, 2019
827923a
Merge master and resolve conflicts
takirala Apr 24, 2019
f836891
Load the snapshot before a graph component is built
takirala Apr 26, 2019
059e0a4
Move InMemoryPodRecordRepository to test-utils
takirala May 2, 2019
a9389b5
Add comment on location of RepositoryBehavior
takirala May 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -7,7 +7,7 @@ language: scala
install:
- true
scala:
- 2.12.7
- 2.12.7
script:
- ./gradlew checkScalaFmtAll && sudo ./gradlew provision && ./gradlew ci --info
before_cache:
Expand Down
9 changes: 8 additions & 1 deletion core-models/src/main/resources/reference.conf
Expand Up @@ -4,4 +4,11 @@ akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}
}

scheduler {
persistence {
pipeline-limit = 128
load-timeout = 60 # seconds
}
}
Expand Up @@ -12,14 +12,9 @@ sealed trait PodStateEvent extends StateEvent {
def id: PodId
}

case class StateSnapshot(
podStatuses: Seq[PodStatus],
podRecords: Seq[PodRecord],
agentRecords: Seq[AgentRecord],
reservationStatuses: Seq[ReservationStatus])
extends StateEvent
case class StateSnapshot(podRecords: Seq[PodRecord] = Nil, agentRecords: Seq[AgentRecord] = Nil) extends StateEvent
object StateSnapshot {
def empty = StateSnapshot(Nil, Nil, Nil, Nil)
val empty = StateSnapshot()
}

/**
Expand Down
1 change: 1 addition & 0 deletions core/build.gradle
@@ -1,5 +1,6 @@
dependencies {
compile project(':core-models')
compile project(':persistence')
compile project(':mesos-client')
testCompile project(':test-utils')
}
@@ -0,0 +1,12 @@
package com.mesosphere.usi.core

import java.util.concurrent.Executor
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContextExecutor

// TODO : This is a duplicated in com.mesosphere.usi.async. Ideally, this needs to be in a shared module.
private[usi] object CallerThreadExecutionContext {
takirala marked this conversation as resolved.
Show resolved Hide resolved
val executor: Executor = (command: Runnable) => command.run()

implicit val context: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
}
Expand Up @@ -27,12 +27,14 @@ case class FrameResultBuilder(
if (schedulerEvents == SchedulerEvents.empty)
this
else {
val newDirty = dirtyPodIds ++ schedulerEvents.stateEvents.iterator.collect {
case podEvent: PodStateEvent => podEvent.id
case _: StateSnapshot =>
// We need to handle status snapshots and create a mechanism to signal that all cache should be recomputed
???
}
val newDirty: Set[PodId] = dirtyPodIds ++ schedulerEvents.stateEvents.iterator
.collect[Set[PodId]] {
case podEvent: PodStateEvent => Set(podEvent.id)
// We need to handle status snapshots with a mechanism to signal that all cache should be recomputed
case snapshot: StateSnapshot => snapshot.podRecords.map(_.podId).toSet
}
.flatten

copy(
state = state.applyStateIntents(schedulerEvents.stateEvents),
dirtyPodIds = newDirty,
Expand Down
117 changes: 86 additions & 31 deletions core/src/main/scala/com/mesosphere/usi/core/Scheduler.scala
Expand Up @@ -4,11 +4,24 @@ import akka.{Done, NotUsed}
import akka.stream.scaladsl.{BidiFlow, Broadcast, Flow, GraphDSL, Sink, SinkQueueWithCancel, Source}
import akka.stream.{BidiShape, FlowShape, KillSwitches, Materializer, OverflowStrategy, QueueOfferResult}
import com.mesosphere.mesos.client.{MesosCalls, MesosClient}
import com.mesosphere.usi.core.models.{SpecEvent, SpecUpdated, SpecsSnapshot, StateEvent, StateSnapshot}
import com.mesosphere.usi.core.conf.SchedulerSettings
import com.mesosphere.usi.core.models.PodId
import com.mesosphere.usi.core.models.PodRecord
import com.mesosphere.usi.core.models.{
PodRecordUpdated,
SpecEvent,
SpecUpdated,
SpecsSnapshot,
StateEvent,
StateSnapshot
}
import com.mesosphere.usi.repository.PodRecordRepository
import com.typesafe.config.ConfigFactory
import org.apache.mesos.v1.Protos.FrameworkInfo
import org.apache.mesos.v1.scheduler.Protos.{Call => MesosCall, Event => MesosEvent}

import scala.collection.JavaConverters._
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}

Expand Down Expand Up @@ -56,20 +69,18 @@ object Scheduler {

type StateOutput = (StateSnapshot, Source[StateEvent, Any])

def fromSnapshot(specsSnapshot: SpecsSnapshot, client: MesosClient): Flow[SpecUpdated, StateOutput, NotUsed] =
Flow[SpecUpdated].prefixAndTail(0).map { case (_, rest) => specsSnapshot -> rest }.via(fromClient(client))
private val schedulerSettings = SchedulerSettings.fromConfig(ConfigFactory.load().getConfig("scheduler"))

def asFlow(specsSnapshot: SpecsSnapshot, client: MesosClient)(
def asFlow(specsSnapshot: SpecsSnapshot, client: MesosClient, podRecordRepository: PodRecordRepository)(
implicit materializer: Materializer): Future[(StateSnapshot, Flow[SpecUpdated, StateEvent, NotUsed])] = {

implicit val ec = scala.concurrent.ExecutionContext.Implicits.global //only for ultra-fast non-blocking onComplete
implicit val ec = scala.concurrent.ExecutionContext.Implicits.global //only for ultra-fast non-blocking map

val (snap, source, sink) = asSourceAndSink(specsSnapshot, client)
val (snap, source, sink) = asSourceAndSink(specsSnapshot, client, podRecordRepository)

snap.map { snapshot =>
(snapshot, Flow.fromSinkAndSourceCoupled(sink, source))
}

}

/**
Expand All @@ -80,9 +91,9 @@ object Scheduler {
* @param specsSnapshot Snapshot of the current specs
* @return Snapshot of the current state, as well as Source which produces StateEvents and Sink which accepts SpecEvents
*/
def asSourceAndSink(specsSnapshot: SpecsSnapshot, client: MesosClient)(
def asSourceAndSink(specsSnapshot: SpecsSnapshot, client: MesosClient, podRecordRepository: PodRecordRepository)(
implicit mat: Materializer): (Future[StateSnapshot], Source[StateEvent, NotUsed], Sink[SpecUpdated, NotUsed]) = {
val flow = fromClient(client)
val flow = fromClient(client, podRecordRepository)
asSourceAndSink(specsSnapshot, flow)(mat)
}

Expand Down Expand Up @@ -190,30 +201,32 @@ object Scheduler {

}

def fromClient(client: MesosClient): Flow[SpecInput, StateOutput, NotUsed] = {
def fromClient(
client: MesosClient,
podRecordRepository: PodRecordRepository): Flow[SpecInput, StateOutput, NotUsed] = {
if (!isMultiRoleFramework(client.frameworkInfo)) {
throw new IllegalArgumentException(
"USI scheduler provides support for MULTI_ROLE frameworks only. Please provide create MesosClient with FrameworkInfo that has capability MULTI_ROLE")
"USI scheduler provides support for MULTI_ROLE frameworks only. " +
"Please provide a MesosClient with FrameworkInfo that has capability MULTI_ROLE")
}
fromFlow(client.calls, Flow.fromSinkAndSource(client.mesosSink, client.mesosSource))
fromFlow(client.calls, podRecordRepository, Flow.fromSinkAndSource(client.mesosSink, client.mesosSource))
}

private def isMultiRoleFramework(frameworkInfo: FrameworkInfo): Boolean =
frameworkInfo.getCapabilitiesList.asScala.exists(_.getType == FrameworkInfo.Capability.Type.MULTI_ROLE)

def fromFlow(
mesosCallFactory: MesosCalls,
podRecordRepository: PodRecordRepository,
mesosFlow: Flow[MesosCall, MesosEvent, Any]): Flow[SpecInput, StateOutput, NotUsed] = {
Flow.fromGraph {
GraphDSL.create(unconnectedGraph(mesosCallFactory), mesosFlow)((_, _) => NotUsed) { implicit builder =>
{ (graph, mesos) =>
import GraphDSL.Implicits._
GraphDSL.create(unconnectedGraph(mesosCallFactory, podRecordRepository), mesosFlow)((_, _) => NotUsed) {
implicit builder =>
{ (graph, mesos) =>
import GraphDSL.Implicits._

mesos ~> graph.in2
graph.out2 ~> mesos
mesos ~> graph.in2
graph.out2 ~> mesos

FlowShape(graph.in1, graph.out1)
}
FlowShape(graph.in1, graph.out1)
}
}
}
}
Expand All @@ -230,30 +243,36 @@ object Scheduler {
rest.prepend(Source.single(snapshot))
}

// TODO (DCOS-47476) use actual prefixAndTail and expect first event to be a Snapshot; change the prefixAndTail param from 0 value to 1, let fail, etc.
private val stateOutputBreakoutFlow: Flow[StateEvent, StateOutput, NotUsed] = Flow[StateEvent].prefixAndTail(0).map {
case (_, stateEvents) =>
private val stateOutputBreakoutFlow: Flow[StateEvent, StateOutput, NotUsed] = Flow[StateEvent].prefixAndTail(1).map {
case (Seq(snapshot), stateEvents) =>
val stateSnapshot = snapshot match {
case x: StateSnapshot => x
case _ => throw new IllegalStateException("First event is allowed to be only a state snapshot")
}
val stateUpdates = stateEvents.map {
case _: StateSnapshot =>
throw new IllegalStateException("Only the first event is allowed to be a state snapshot")
case event => event
}
(StateSnapshot.empty, stateUpdates)
(stateSnapshot, stateUpdates)
}

def unconnectedGraph(
mesosCallFactory: MesosCalls): BidiFlow[SpecInput, StateOutput, MesosEvent, MesosCall, NotUsed] = {
private[core] def unconnectedGraph(
mesosCallFactory: MesosCalls,
podRecordRepository: PodRecordRepository): BidiFlow[SpecInput, StateOutput, MesosEvent, MesosCall, NotUsed] = {
val schedulerLogicGraph = new SchedulerLogicGraph(mesosCallFactory, loadPodRecords(podRecordRepository))
Copy link
Contributor

@jeschkies jeschkies Apr 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting closer to it but I don't understand why I cannot be a async. The two main factory methods asFlow and asSinkAndSource return futures. So you could simply call

  def asFlow(specsSnapshot: SpecsSnapshot, client: MesosClient, , podRecordRepository: PodRecordRepository)(
      implicit materializer: Materializer): Future[(StateSnapshot, Flow[SpecUpdated, StateEvent, NotUsed])] = async {

    implicit val ec = scala.concurrent.ExecutionContext.Implicits.global //only for ultra-fast non-blocking onComplete

    val initialRecords = await(loadRecords(repository))
    val (snap, source, sink) = asSourceAndSink(specsSnapshot, client, initialRecords)
    (await(snap), Flow.fromSinkAndSourceCoupled(sink, source))
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discusses this offline, In an ideal world we should be using async-await construct to do this in a non-blocking fashion. But factory methods fromFlow and fromClient requires this to be done in a blocking fashion. I guess we can find a more cleaner solution once we finalize on a limited number of factory methods.

Based on the isolated fact that we can't make any progress until we load the records I guess this is okay for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromClient and fromFlow could just be non-blocking. The signature of these methods is not set in stone.

BidiFlow.fromGraph {
GraphDSL.create(new SchedulerLogicGraph(mesosCallFactory)) { implicit builder => (schedulerLogic) =>
GraphDSL.create(schedulerLogicGraph) { implicit builder => (schedulerLogic) =>
{
import GraphDSL.Implicits._

val broadcast = builder.add(Broadcast[SchedulerEvents](2, eagerCancel = true))
val specInputFlattening = builder.add(specInputFlatteningFlow)
val stateOutputBreakout = builder.add(stateOutputBreakoutFlow)

val persistenceStorageFlow = builder.add(persistenceFlow(podRecordRepository))
specInputFlattening ~> schedulerLogic.in0
schedulerLogic.out ~> broadcast.in
schedulerLogic.out ~> persistenceStorageFlow ~> broadcast.in

val mesosCalls = broadcast.out(0).mapConcat { frameResult =>
frameResult.mesosCalls
Expand All @@ -269,4 +288,40 @@ object Scheduler {
}
}
}

private[core] def persistenceFlow(
podRecordRepository: PodRecordRepository): Flow[SchedulerEvents, SchedulerEvents, NotUsed] = {
Flow[SchedulerEvents]
.mapConcat(persistEvents(_, podRecordRepository))
.mapAsync(schedulerSettings.persistencePipelineLimit)(call => call())
.collect { case Some(events) => events }
}

private def persistEvents(
events: SchedulerEvents,
podRecordRepository: PodRecordRepository): List[() => Future[Option[SchedulerEvents]]] = {
val ops: List[() => Future[Option[SchedulerEvents]]] = events.stateEvents.collect {
case PodRecordUpdated(_, Some(podRecord)) =>
() =>
podRecordRepository.store(podRecord).map(_ => None)(CallerThreadExecutionContext.context)
case PodRecordUpdated(podId, None) =>
() =>
podRecordRepository.delete(podId).map(_ => None)(CallerThreadExecutionContext.context)
}
ops :+ (() => Future.successful(Some(events)))
}

/*
* We don't start processing any commands until we've finished loading the entire set of podRecords
* This code delays building a scheduler stage until this podRecord snapshot is available.
*
* Block for IO - If the IO call fails or a timeout occurs, we should not make any progress.
*/
private def loadPodRecords(podRecordRepository: PodRecordRepository): Map[PodId, PodRecord] = {
// Add error handling (and maybe a retry mechanism).
Await.result(podRecordRepository.readAll(), schedulerSettings.persistenceLoadTimeout.seconds)
}

private def isMultiRoleFramework(frameworkInfo: FrameworkInfo): Boolean =
frameworkInfo.getCapabilitiesList.asScala.exists(_.getType == FrameworkInfo.Capability.Type.MULTI_ROLE)
}
Expand Up @@ -3,9 +3,8 @@ package com.mesosphere.usi.core
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FanInShape2, Inlet, Outlet}
import com.mesosphere.mesos.client.MesosCalls
import com.mesosphere.usi.core.models.SpecEvent
import com.mesosphere.usi.core.models.{PodId, PodRecord, SpecEvent, StateSnapshot}
import org.apache.mesos.v1.scheduler.Protos.{Event => MesosEvent}

import scala.collection.mutable

object SchedulerLogicGraph {
Expand Down Expand Up @@ -41,22 +40,24 @@ object SchedulerLogicGraph {
* It's existence is only warranted by forecasted future needs. It's kept as a graph with an internal buffer as we will
* likely need timers, other callbacks, and additional output ports (such as an offer event stream?).
*/
class SchedulerLogicGraph(mesosCallFactory: MesosCalls)
private[core] class SchedulerLogicGraph(mesosCallFactory: MesosCalls, initialPodRecords: Map[PodId, PodRecord])
extends GraphStage[FanInShape2[SpecEvent, MesosEvent, SchedulerEvents]] {
import SchedulerLogicGraph.BUFFER_SIZE

val mesosEventsInlet = Inlet[MesosEvent]("mesos-events")
val specEventsInlet = Inlet[SpecEvent]("specs")
val frameResultOutlet = Outlet[SchedulerEvents]("effects")
private val mesosEventsInlet = Inlet[MesosEvent]("mesos-events")
private val specEventsInlet = Inlet[SpecEvent]("specs")
private val frameResultOutlet = Outlet[SchedulerEvents]("effects")

// Define the shape of this stage, which is SourceShape with the port we defined above
override val shape: FanInShape2[SpecEvent, MesosEvent, SchedulerEvents] =
new FanInShape2(specEventsInlet, mesosEventsInlet, frameResultOutlet)

// This is where the actual (possibly stateful) logic will live
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
val handler = new SchedulerLogicHandler(mesosCallFactory)

new GraphStageLogic(shape) {
private[this] var handler: SchedulerLogicHandler = _

val pendingEffects: mutable.Queue[SchedulerEvents] = mutable.Queue.empty

setHandler(mesosEventsInlet, new InHandler {
Expand All @@ -83,9 +84,9 @@ class SchedulerLogicGraph(mesosCallFactory: MesosCalls)
})

override def preStart(): Unit = {
// Start the stream
pull(specEventsInlet)
pull(mesosEventsInlet)
handler = new SchedulerLogicHandler(mesosCallFactory, initialPodRecords)
// Publish the initial state snapshot event; podStatuses will not be in the snapshot in the future
pushOrQueueIntents(SchedulerEvents(List(StateSnapshot(podRecords = initialPodRecords.values.toSeq))))
}

def pushOrQueueIntents(effects: SchedulerEvents): Unit = {
Expand Down
Expand Up @@ -2,6 +2,7 @@ package com.mesosphere.usi.core

import com.mesosphere.mesos.client.MesosCalls
import com.mesosphere.usi.core.logic.{MesosEventsLogic, SpecLogic}
import com.mesosphere.usi.core.models.PodRecord
import com.mesosphere.usi.core.models.{PodId, PodInvalid, PodSpec, PodSpecUpdated, SpecEvent, SpecsSnapshot}
import org.apache.mesos.v1.scheduler.Protos.{Call, Event => MesosEvent}

Expand Down Expand Up @@ -62,7 +63,7 @@ import org.apache.mesos.v1.scheduler.Protos.{Call, Event => MesosEvent}
* through the application of several functions. The result of that event will lead to the emission of several
* stateEvents and Mesos intents, which will be accumulated and emitted via a data structure we call the FrameResult.
*/
private[core] class SchedulerLogicHandler(mesosCallFactory: MesosCalls) {
private[core] class SchedulerLogicHandler(mesosCallFactory: MesosCalls, initialPodRecords: Map[PodId, PodRecord]) {

private val schedulerLogic = new SpecLogic(mesosCallFactory)
private val mesosEventsLogic = new MesosEventsLogic(mesosCallFactory)
Expand All @@ -77,7 +78,7 @@ private[core] class SchedulerLogicHandler(mesosCallFactory: MesosCalls) {
* contain persistent, non-recoverable facts from Mesos, such as pod-launched time, agentId on which a pod was
* launched, agent information or the time at which a pod was first seen as unhealthy or unreachable.
*/
private var state: SchedulerState = SchedulerState.empty
private var state: SchedulerState = SchedulerState(podStatuses = Map.empty, podRecords = initialPodRecords)

/**
* Cached view of SchedulerState and SpecificationState. We incrementally update this computation at the end of each
Expand Down Expand Up @@ -110,6 +111,20 @@ private[core] class SchedulerLogicHandler(mesosCallFactory: MesosCalls) {
}
}

/**
* Process a Mesos event and update internal state.
*
* @param event
* @return The events describing state changes as Mesos call intents
*/
def handleMesosEvent(event: MesosEvent): SchedulerEvents = {
handleFrame { builder =>
builder.process { (specs, state, _) =>
mesosEventsLogic.processEvent(specs, state, cachedPendingLaunch.pendingLaunch)(event)
}
}
}

/**
* Instantiate a frameResultBuilder instance, call the handler, then follow up with housekeeping:
*
Expand Down Expand Up @@ -171,20 +186,6 @@ private[core] class SchedulerLogicHandler(mesosCallFactory: MesosCalls) {
SchedulerEvents(mesosCalls = reviveCalls ++ suppressCalls)
}

/**
* Process a Mesos event and update internal state.
*
* @param event
* @return The events describing state changes as Mesos call intents
*/
def handleMesosEvent(event: MesosEvent): SchedulerEvents = {
handleFrame { builder =>
builder.process { (specs, state, _) =>
mesosEventsLogic.processEvent(specs, state, cachedPendingLaunch.pendingLaunch)(event)
}
}
}

/**
* We remove a task if it is not reachable and running, and it has no podSpec defined
*
Expand Down