Navigation Menu

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use InMemoryPersistenceStorage for persisting pod records #80

Merged
merged 28 commits into from May 8, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
19681f1
Update core module to use persistence module and use PodRecordRepository
takirala Mar 29, 2019
eb28b79
Update RecordRepository interface
takirala Mar 29, 2019
0e5c5c6
Add an integration test
takirala Mar 29, 2019
eccd7ff
Revert unintentional changes
takirala Mar 29, 2019
19cbb44
Emit StateSnapshot on crash-recovery
takirala Mar 30, 2019
9ac896d
Draft future-less persistence storage flow
takirala Apr 8, 2019
92c1c08
Use async call back for thread safe access
takirala Apr 8, 2019
f71b932
Add a unit test case to validate persistence graph stage
takirala Apr 11, 2019
3772c77
Make all methods in RecordRepository to be stream based
takirala Apr 11, 2019
d895c68
Cleanup tests
takirala Apr 11, 2019
3fd9cb3
Cleanup code
takirala Apr 12, 2019
5baccdd
Merge master and resolve conflicts
takirala Apr 12, 2019
21810fc
Address review comments
takirala Apr 14, 2019
6d2c943
Address review comments
takirala Apr 16, 2019
6d8faf8
Merge master and resolve conflicts
takirala Apr 16, 2019
88959c6
Use akka.pattern.after instead of Thread.sleep
takirala Apr 16, 2019
adcdd60
Change sleep max time from 1000 to 100
takirala Apr 16, 2019
fa19488
Revert unintentional formatting changes
takirala Apr 17, 2019
3a0e7a2
Address review comments
takirala Apr 18, 2019
4b6af6d
Add code comments
takirala Apr 18, 2019
1f6ec8e
changes from pairing
Apr 19, 2019
e30961e
Enforce pipelining limit across multiple SchedulerEvents
takirala Apr 19, 2019
265e7fd
Code reformat
takirala Apr 23, 2019
b41c1c8
Remove unused method - fromSnapshot
takirala Apr 24, 2019
827923a
Merge master and resolve conflicts
takirala Apr 24, 2019
f836891
Load the snapshot before a graph component is built
takirala Apr 26, 2019
059e0a4
Move InMemoryPodRecordRepository to test-utils
takirala May 2, 2019
a9389b5
Add comment on location of RepositoryBehavior
takirala May 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -7,7 +7,7 @@ language: scala
install:
- true
scala:
- 2.12.7
- 2.12.7
script:
- ./gradlew checkScalaFmtAll && sudo ./gradlew provision && ./gradlew ci --info
before_cache:
Expand Down
1 change: 1 addition & 0 deletions core/build.gradle
@@ -1,5 +1,6 @@
dependencies {
compile project(':core-models')
compile project(':persistence')
compile project(':mesos-client')
testCompile project(':test-utils')
}
@@ -0,0 +1,11 @@
package com.mesosphere.usi.core

import java.util.concurrent.Executor
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContextExecutor

private[usi] object CallerThreadExecutionContext {
takirala marked this conversation as resolved.
Show resolved Hide resolved
val executor: Executor = (command: Runnable) => command.run()

implicit val context: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
}
Expand Up @@ -27,12 +27,14 @@ case class FrameResultBuilder(
if (schedulerEvents == SchedulerEvents.empty)
this
else {
val newDirty = dirtyPodIds ++ schedulerEvents.stateEvents.iterator.collect {
case podEvent: PodStateEvent => podEvent.id
case _: StateSnapshot =>
// We need to handle status snapshots and create a mechanism to signal that all cache should be recomputed
???
}
val newDirty: Set[PodId] = dirtyPodIds ++ schedulerEvents.stateEvents.iterator
.collect[Set[PodId]] {
case podEvent: PodStateEvent => Set(podEvent.id)
// We need to handle status snapshots with a mechanism to signal that all cache should be recomputed
case snapshot: StateSnapshot => snapshot.podRecords.map(_.podId).toSet
}
.flatten

copy(
state = state.applyStateIntents(schedulerEvents.stateEvents),
dirtyPodIds = newDirty,
Expand Down
105 changes: 80 additions & 25 deletions core/src/main/scala/com/mesosphere/usi/core/Scheduler.scala
@@ -1,13 +1,24 @@
package com.mesosphere.usi.core

import akka.Done
import akka.NotUsed
import akka.stream.scaladsl.{BidiFlow, Broadcast, Flow, GraphDSL, Source}
import akka.stream.{BidiShape, FlowShape}
import akka.stream.{BidiShape, FlowShape, Materializer}
import akka.stream.scaladsl.{BidiFlow, Broadcast, Flow, GraphDSL, Sink, Source}
import com.mesosphere.mesos.client.{MesosCalls, MesosClient}
import com.mesosphere.usi.core.models.{SpecEvent, SpecUpdated, SpecsSnapshot, StateEvent, StateSnapshot, StateUpdated}
import com.mesosphere.usi.core.models.{
PodRecordUpdated,
SpecEvent,
SpecUpdated,
SpecsSnapshot,
StateEvent,
StateSnapshot,
StateUpdated
}
import com.mesosphere.usi.repository.PodRecordRepository
import org.apache.mesos.v1.Protos.FrameworkInfo
import org.apache.mesos.v1.scheduler.Protos.{Call => MesosCall, Event => MesosEvent}
import scala.collection.JavaConverters._
import scala.concurrent.Future

/*
* Provides the scheduler graph component. The component has two inputs, and two outputs:
Expand Down Expand Up @@ -51,30 +62,34 @@ object Scheduler {

type StateOutput = (StateSnapshot, Source[StateEvent, Any])

def fromClient(client: MesosClient): Flow[SpecInput, StateOutput, NotUsed] = {
def fromClient(
client: MesosClient,
podRecordRepository: PodRecordRepository
)(implicit materializer: Materializer): Flow[SpecInput, StateOutput, NotUsed] = {
if (!isMultiRoleFramework(client.frameworkInfo)) {
throw new IllegalArgumentException(
"USI scheduler provides support for MULTI_ROLE frameworks only. Please provide create MesosClient with FrameworkInfo that has capability MULTI_ROLE")
"USI scheduler provides support for MULTI_ROLE frameworks only. " +
"Please provide a MesosClient with FrameworkInfo that has capability MULTI_ROLE")
}
fromFlow(client.calls, Flow.fromSinkAndSource(client.mesosSink, client.mesosSource))
fromFlow(client.calls, podRecordRepository, Flow.fromSinkAndSource(client.mesosSink, client.mesosSource))
}

private def isMultiRoleFramework(frameworkInfo: FrameworkInfo): Boolean =
frameworkInfo.getCapabilitiesList.asScala.exists(_.getType == FrameworkInfo.Capability.Type.MULTI_ROLE)

def fromFlow(
mesosCallFactory: MesosCalls,
mesosFlow: Flow[MesosCall, MesosEvent, Any]): Flow[SpecInput, StateOutput, NotUsed] = {
podRecordRepository: PodRecordRepository,
mesosFlow: Flow[MesosCall, MesosEvent, Any]
)(implicit materializer: Materializer): Flow[SpecInput, StateOutput, NotUsed] = {
Flow.fromGraph {
GraphDSL.create(unconnectedGraph(mesosCallFactory), mesosFlow)((_, _) => NotUsed) { implicit builder =>
{ (graph, mesos) =>
import GraphDSL.Implicits._
GraphDSL.create(unconnectedGraph(mesosCallFactory, podRecordRepository), mesosFlow)((_, _) => NotUsed) {
implicit builder =>
{ (graph, mesos) =>
import GraphDSL.Implicits._

mesos ~> graph.in2
graph.out2 ~> mesos
mesos ~> graph.in2
graph.out2 ~> mesos

FlowShape(graph.in1, graph.out1)
}
FlowShape(graph.in1, graph.out1)
}
}
}
}
Expand All @@ -91,30 +106,38 @@ object Scheduler {
rest.prepend(Source.single(snapshot))
}

// TODO (DCOS-47476) use actual prefixAndTail and expect first event to be a Snapshot; change the prefixAndTail param from 0 value to 1, let fail, etc.
private val stateOutputBreakoutFlow: Flow[StateEvent, StateOutput, NotUsed] = Flow[StateEvent].prefixAndTail(0).map {
case (_, stateEvents) =>
private val stateOutputBreakoutFlow: Flow[StateEvent, StateOutput, NotUsed] = Flow[StateEvent].prefixAndTail(1).map {
case (Seq(snapshot), stateEvents) =>
val stateSnapshot = snapshot match {
case x: StateSnapshot => x
case _ => throw new IllegalStateException("First event is allowed to be only a state snapshot")
}
val stateUpdates = stateEvents.map {
case c: StateUpdated => c
case _: StateSnapshot =>
throw new IllegalStateException("Only the first event is allowed to be a state snapshot")
}
(StateSnapshot.empty, stateUpdates)
(stateSnapshot, stateUpdates)
}

def unconnectedGraph(
mesosCallFactory: MesosCalls): BidiFlow[SpecInput, StateOutput, MesosEvent, MesosCall, NotUsed] = {
private[core] def unconnectedGraph(
mesosCallFactory: MesosCalls,
podRecordRepository: PodRecordRepository
)(implicit materializer: Materializer): BidiFlow[SpecInput, StateOutput, MesosEvent, MesosCall, NotUsed] = {
BidiFlow.fromGraph {
GraphDSL.create(new SchedulerLogicGraph(mesosCallFactory)) { implicit builder => (schedulerLogic) =>
GraphDSL.create(
new SchedulerLogicGraph(mesosCallFactory, podRecordRepository.readAll().runWith(Sink.head)),
) { implicit builder => (schedulerLogic) =>
{
import GraphDSL.Implicits._

val broadcast = builder.add(Broadcast[SchedulerEvents](2, eagerCancel = true))
val specInputFlattening = builder.add(specInputFlatteningFlow)
val stateOutputBreakout = builder.add(stateOutputBreakoutFlow)

val persistenceStorageFlow = builder.add(persistenceFlow(podRecordRepository))
specInputFlattening ~> schedulerLogic.in0
schedulerLogic.out ~> broadcast.in
schedulerLogic.out ~> persistenceStorageFlow ~> broadcast.in

val mesosCalls = broadcast.out(0).mapConcat { frameResult =>
frameResult.mesosCalls
Expand All @@ -130,4 +153,36 @@ object Scheduler {
}
}
}

private[core] def persistenceFlow(
podRecordRepository: PodRecordRepository
)(implicit materializer: Materializer): Flow[SchedulerEvents, SchedulerEvents, NotUsed] = {
Flow[SchedulerEvents].mapAsync(1) { events =>
val (storeRecords, deleteRecords) = events.stateEvents.collect { case x: PodRecordUpdated => x }
.partition(_.newRecord.isDefined)
if (storeRecords.isEmpty && deleteRecords.isEmpty) {
Future.successful(events)
} else {
val storeResult = if (storeRecords.nonEmpty) {
Source(storeRecords).collect { case PodRecordUpdated(_, Some(record)) => record }
.via(podRecordRepository.storeFlow)
.grouped(storeRecords.size)
.map(_ => Done)
} else Source.single(Done)
val deleteResult = if (deleteRecords.nonEmpty) {
Source(deleteRecords)
.map(_.id)
.via(podRecordRepository.deleteFlow)
.grouped(deleteRecords.size)
.map(_ => Done)
} else Source.single(Done)
Source
.zipWithN[Done, SchedulerEvents](_ => events)(List(storeResult, deleteResult))
.runWith(Sink.head) // This materialization is cheap compared to the IO operation.
}
}
}

private def isMultiRoleFramework(frameworkInfo: FrameworkInfo): Boolean =
frameworkInfo.getCapabilitiesList.asScala.exists(_.getType == FrameworkInfo.Capability.Type.MULTI_ROLE)
}
Expand Up @@ -3,10 +3,11 @@ package com.mesosphere.usi.core
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FanInShape2, Inlet, Outlet}
import com.mesosphere.mesos.client.MesosCalls
import com.mesosphere.usi.core.models.SpecEvent
import com.mesosphere.usi.core.models.{PodId, PodRecord, SpecEvent}
import org.apache.mesos.v1.scheduler.Protos.{Event => MesosEvent}

import scala.collection.mutable
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

object SchedulerLogicGraph {
val BUFFER_SIZE = 32
Expand Down Expand Up @@ -41,13 +42,16 @@ object SchedulerLogicGraph {
* It's existence is only warranted by forecasted future needs. It's kept as a graph with an internal buffer as we will
* likely need timers, other callbacks, and additional output ports (such as an offer event stream?).
*/
class SchedulerLogicGraph(mesosCallFactory: MesosCalls)
private[core] class SchedulerLogicGraph(
mesosCallFactory: MesosCalls,
initialPodRecords: => Future[Map[PodId, PodRecord]])
takirala marked this conversation as resolved.
Show resolved Hide resolved
extends GraphStage[FanInShape2[SpecEvent, MesosEvent, SchedulerEvents]] {
import SchedulerLogicGraph.BUFFER_SIZE

val mesosEventsInlet = Inlet[MesosEvent]("mesos-events")
val specEventsInlet = Inlet[SpecEvent]("specs")
val frameResultOutlet = Outlet[SchedulerEvents]("effects")
private val mesosEventsInlet = Inlet[MesosEvent]("mesos-events")
private val specEventsInlet = Inlet[SpecEvent]("specs")
private val frameResultOutlet = Outlet[SchedulerEvents]("effects")

// Define the shape of this stage, which is SourceShape with the port we defined above
override val shape: FanInShape2[SpecEvent, MesosEvent, SchedulerEvents] =
new FanInShape2(specEventsInlet, mesosEventsInlet, frameResultOutlet)
Expand Down Expand Up @@ -82,12 +86,17 @@ class SchedulerLogicGraph(mesosCallFactory: MesosCalls)
}
})

override def preStart(): Unit = {
// Start the stream
pull(specEventsInlet)
pull(mesosEventsInlet)
val startGraph = this.getAsyncCallback[Try[Map[PodId, PodRecord]]] {
case Success(initialSnapshot) =>
pushOrQueueIntents(handler.handlePodRecordSnapshot(initialSnapshot))
maybePull()
case Failure(ex) =>
this.failStage(ex)
}

override def preStart(): Unit =
initialPodRecords.onComplete(startGraph.invoke)(CallerThreadExecutionContext.context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I is kind of odd. For user we ask them to deal with Source[(Snapshot, Source[StateEvent])] but internally we are not using that. Why are we not passing the state snapshot via a stream? Are we not loosing backpressure here? Especially recovering state was slowing down Marathon in the past so I would assume that we leverage backpressure just here so that the USI scheduler flow is not overloaded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least in the scope of this PR, I am not trying to handle adhoc-snapshots. Assumption in this PR is that we receive the snapshot only once during bootstrap (or crash recovery) and we intentionally block all inlets until we can load the snapshot and be ready for the processing. I initially had an approach using FanIn3Shape but decided this is simpler and cleaner to get the initial snapshot loading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @timcharper has to explain the interface then once more to me. We do not expect adhoc snapshots but it is odd that our internal storage uses plain futures when we prescribe the nested sources to users. Why would we not use streams here as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, we will not be sending a podSpec snapshot. This is going away.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Could we add a longer comment above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if/how I can pass an extra param to override def createLogic. One thought I had was to set an attribute on graph component but it feels like an overkill. Do you have any other thoughts ?

otoh, i don't understand why loading the pod records should not be part of logic - we are creating a graph component which should not start pulling elements from upstream until a certain precondition is satisfied. In our case, the precondition is that the given future completes with an expected (Success) result. Since this future completion is non deterministic (it can even fail), i think it is fair to register the call back in prestart method. Ultimately, this callback decides whether (if at all) or not to start pulling elements from upstream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that createLogic is predefined. However, initialPodRecords could simply be the loaded record set. Why should it be a future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By making pod records a future - we have the option to chose to either succeed loading the snapshot and then start the graph or mark graph stage as failed to start. If it's not a future, then we have to handle the failed future before creating this graph component (and it brings the question - if we fail to make the persistence call, should we fail to create the blueprint of the graph ?) cc @meln1k

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, note that the SchedulerLogicHanlder has a Map instead of a Future[Map] - https://github.com/mesosphere/usi/pull/80/files#diff-704496199082f7d74e9d62d315f0a7abR66 and it does make sense to me because if we have a "handler", then it can be expected to have a valid snapshot. But in case of SchedulerLogicGraph where we are "building" the graph, we can chose to verify a precondition (in this case - successful completion of the given Future) before we chose to start the graph.

I have made the initialPodRecords to be a anon function to delay the pod record loading. If you still think that having a Future in graph pre condition is not ideal, I can reformat the code to complete the future in Scheduler and fail fast before a graph blue print is generated. wdyt ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just wondering if loading the snapshot in the scheduler factory would be simpler. We can always handle errors there. Ideally the factory method would fail if the snapshot loading would fail. The caller would not even have a failed graph stage. How should the user handle a failed graph stage anyways?


def pushOrQueueIntents(effects: SchedulerEvents): Unit = {
if (isAvailable(frameResultOutlet)) {
if (pendingEffects.nonEmpty) {
Expand Down
Expand Up @@ -2,6 +2,8 @@ package com.mesosphere.usi.core

import com.mesosphere.mesos.client.MesosCalls
import com.mesosphere.usi.core.logic.{MesosEventsLogic, SpecLogic}
import com.mesosphere.usi.core.models.PodRecord
import com.mesosphere.usi.core.models.StateSnapshot
import com.mesosphere.usi.core.models.{PodId, SpecEvent}
import org.apache.mesos.v1.scheduler.Protos.{Event => MesosEvent}

Expand Down Expand Up @@ -95,6 +97,33 @@ private[core] class SchedulerLogicHandler(mesosCallFactory: MesosCalls) {
}
}

/**
* Process a Mesos event and update internal state.
*
* @param event
* @return The events describing state changes as Mesos call intents
*/
def handleMesosEvent(event: MesosEvent): SchedulerEvents = {
handleFrame { builder =>
builder.process { (specs, state, _) =>
mesosEventsLogic.processEvent(specs, state, cachedPendingLaunch.pendingLaunch)(event)
}
}
}

def handlePodRecordSnapshot(podRecords: Map[PodId, PodRecord]): SchedulerEvents = {
handleFrame { builder =>
builder.process { (specs, state, _) =>
if (state.podRecords.nonEmpty || specs.podSpecs.nonEmpty) {
takirala marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException(
s"Expected initial Scheduler state to be empty." +
s" Found ${state.podRecords.size} records and ${specs.podSpecs.size} statuses")
}
SchedulerEvents(stateEvents = List(StateSnapshot.empty.copy(podRecords = podRecords.values.toSeq)))
}
}
}

/**
* Instantiate a frameResultBuilder instance, call the handler, then follow up with housekeeping:
*
Expand Down Expand Up @@ -129,20 +158,6 @@ private[core] class SchedulerLogicHandler(mesosCallFactory: MesosCalls) {
SchedulerEvents.empty
}

/**
* Process a Mesos event and update internal state.
*
* @param event
* @return The events describing state changes as Mesos call intents
*/
def handleMesosEvent(event: MesosEvent): SchedulerEvents = {
handleFrame { builder =>
builder.process { (specs, state, _) =>
mesosEventsLogic.processEvent(specs, state, cachedPendingLaunch.pendingLaunch)(event)
}
}
}

/**
* We remove a task if it is not reachable and running, and it has no podSpec defined
*
Expand Down
Expand Up @@ -37,7 +37,10 @@ case class SchedulerState(podRecords: Map[PodId, PodRecord], podStatuses: Map[Po
}
case agentRecordChange: AgentRecordUpdated => ???
case reservationStatusChange: ReservationStatusUpdated => ???
case statusSnapshot: StateSnapshot => ???
case statusSnapshot: StateSnapshot =>
// TODO (DCOS-47476) Implement cache invalidation and handle snapshot fully
newPodRecords = statusSnapshot.podRecords
.foldLeft(newPodRecords)((acc, record) => acc.updated(record.podId, record))
}

copy(podRecords = newPodRecords, podStatuses = newPodStatuses)
Expand Down
16 changes: 16 additions & 0 deletions core/src/test/resources/application.conf
@@ -0,0 +1,16 @@
akka {
stream {
materializer {
debug {
# Enables the fuzzing mode which increases the chance of race conditions
# by aggressively reordering events and making certain operations more
# concurrent than usual.
# This setting is for testing purposes, NEVER enable this in a production
# environment!
# To get the best results, try combining this setting with a throughput
# of 1 on the corresponding dispatchers.
fuzzing-mode = on
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this useful when I was playing around with a custom GraphStage approach I had at some point and I think retaining this would be useful and makes testing more aggressive, i can revert this if there is sth i missed.

}
}
}
}