Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use InMemoryPersistenceStorage for persisting pod records #80

Merged
merged 28 commits into from
May 8, 2019

Conversation

takirala
Copy link
Contributor

@takirala takirala commented Mar 29, 2019

Add a persistence storage interface and create a flow from it to use in the Scheduler graph. This PR adds the following changes

  • Create a generic RecordRepository trait. Currently, only PodRecordRepository is implemented.
  • Handle the initial snapshot loading from the PodRecordRepository.
  • Create a scheduler graph component that uses the PodRecordRepository to persist the SchedulerEvents. This component itself performs materialization for every SchedulerEvents element.

Testing

  • Add a RepositoryBehavior that can be used to verify all the implementors of RecordRepository. Currently used only for InMemory storage but can/will be extended to other implementations (such as zk).
  • Add the akka-stream-testkit library dependency to test-utils module so that it can be used in other modules.
  • Add a unit test for the persistence flow component.
  • Add an integration test to verify the atmost once launch guarantee.

@jeschkies made the initial PR #51 that laid some groundwork for this.

JIRA : DCOS-49274

@takirala takirala self-assigned this Mar 29, 2019
@takirala takirala force-pushed the tga/use-inmemory-persistence branch from cdccb11 to eccd7ff Compare March 29, 2019 01:18
private def persistenceFlow(
podRecordRepository: PodRecordRepository): Flow[SchedulerEvents, SchedulerEvents, NotUsed] =
Flow[SchedulerEvents].mapAsync(1) { events =>
import CallerThreadExecutionContext.context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand the context. Why was that introduced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was introduced to perform cheap operations such as .map(_ => events) without switching the context. If we don't do future transformations in scope, we don't need this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It had to be called something, can't make the object the execution context.

@@ -130,4 +147,25 @@ object Scheduler {
}
}
}

private def persistenceFlow(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have assumed that the repository would maybe provide this interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how the repository can provide this interface. core module should control how the persistence storage is read/written to right ?

Flow[SchedulerEvents].mapAsync(1) { events =>
import CallerThreadExecutionContext.context
Future
.sequence(events.stateEvents.map {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a stream here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current flow of things:

[SchedulerEvents] ------>        [Persistence]    ----------------> [StateOutput, MesosCalls]
emitted by 				 process ALL the StateEvents
Scheduler logic			 in single SchedulerEvents
						 object and THEN emits the 
						 object downstream.
						 backpressures the upstream 
						 until ALL the StateEvents
						 in SchedulerEvents are processed.
						 We can use a stream here but i 
					     am not sure what advantage it would
					     add. We are using `.mapAsync` with 				
						 parallelism 1 to process each
						 SchedulerEvents object - one at a time
						 with backpressure.

Currently the persistence layer emits the exact SchedulerEvents object that it receives, are you perhaps inferring that we don't need to do this and emit a "split-down" SchedulerEvents based on the stream processing speed ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some place in Marathon we used Akka streams instead of Future.sequence to avoid too many futures. Also, there is not backpressure for loading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I think @zen-dog can help. I thought we had something like a Flow[StoreOp, Try] So that one could map the updates.

val persitenceFlow: Flow[SchedulerEvents, SchedulerEvents] = {
  Flow[SchedulerEvents]
    .flatMapConcat(_.stateEvents)
    .map {
       case ... => UpdateOp | DeleteOp
    }
    .via(repository.flow)
    ...
}

This might be a little to complicated though it also omits then mapping back to SchedulerEvents.

}

override def preStart(): Unit =
initialPodRecords.onComplete(startGraph.invoke)(CallerThreadExecutionContext.context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I is kind of odd. For user we ask them to deal with Source[(Snapshot, Source[StateEvent])] but internally we are not using that. Why are we not passing the state snapshot via a stream? Are we not loosing backpressure here? Especially recovering state was slowing down Marathon in the past so I would assume that we leverage backpressure just here so that the USI scheduler flow is not overloaded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least in the scope of this PR, I am not trying to handle adhoc-snapshots. Assumption in this PR is that we receive the snapshot only once during bootstrap (or crash recovery) and we intentionally block all inlets until we can load the snapshot and be ready for the processing. I initially had an approach using FanIn3Shape but decided this is simpler and cleaner to get the initial snapshot loading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @timcharper has to explain the interface then once more to me. We do not expect adhoc snapshots but it is odd that our internal storage uses plain futures when we prescribe the nested sources to users. Why would we not use streams here as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, we will not be sending a podSpec snapshot. This is going away.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Could we add a longer comment above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if/how I can pass an extra param to override def createLogic. One thought I had was to set an attribute on graph component but it feels like an overkill. Do you have any other thoughts ?

otoh, i don't understand why loading the pod records should not be part of logic - we are creating a graph component which should not start pulling elements from upstream until a certain precondition is satisfied. In our case, the precondition is that the given future completes with an expected (Success) result. Since this future completion is non deterministic (it can even fail), i think it is fair to register the call back in prestart method. Ultimately, this callback decides whether (if at all) or not to start pulling elements from upstream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that createLogic is predefined. However, initialPodRecords could simply be the loaded record set. Why should it be a future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By making pod records a future - we have the option to chose to either succeed loading the snapshot and then start the graph or mark graph stage as failed to start. If it's not a future, then we have to handle the failed future before creating this graph component (and it brings the question - if we fail to make the persistence call, should we fail to create the blueprint of the graph ?) cc @meln1k

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, note that the SchedulerLogicHanlder has a Map instead of a Future[Map] - https://github.com/mesosphere/usi/pull/80/files#diff-704496199082f7d74e9d62d315f0a7abR66 and it does make sense to me because if we have a "handler", then it can be expected to have a valid snapshot. But in case of SchedulerLogicGraph where we are "building" the graph, we can chose to verify a precondition (in this case - successful completion of the given Future) before we chose to start the graph.

I have made the initialPodRecords to be a anon function to delay the pod record loading. If you still think that having a Future in graph pre condition is not ideal, I can reformat the code to complete the future in Scheduler and fail fast before a graph blue print is generated. wdyt ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just wondering if loading the snapshot in the scheduler factory would be simpler. We can always handle errors there. Ideally the factory method would fail if the snapshot loading would fail. The caller would not even have a failed graph stage. How should the user handle a failed graph stage anyways?

*/
def update(record: Record): Future[RecordId]
def readAll(): Future[Map[RecordId, Record]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we provide a stream instead to leverage backpressure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current structure assumes we are intentionally blocking inlets until the state can be loaded. Based on the outcome of that structure, we can chose to change this to be a stream instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we blocking? I must be missing something?

} else {
// TODO : batch transactions
Source(storeRecords)
.map(_.newRecord.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is newRecord an Option? If so I'd use collect:

Suggested change
.map(_.newRecord.get)
.collect { case Some(newRecord) => newRecord }

meln1k
meln1k previously requested changes Apr 8, 2019
.map(_.newRecord.get)
.via(podRecordRepository.storeFlow)
.runWith(Sink.foreach(_ => {
if (eventCounter.decrementAndGet() == 0) maybePushToPull()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map(_.newRecord.get)
.via(podRecordRepository.storeFlow)
.runWith(Sink.foreach(_ => {
if (eventCounter.decrementAndGet() == 0) maybePushToPull()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybePushToPull happens on another thread so you should not call it like this

@jeschkies jeschkies self-requested a review April 9, 2019 16:17
Flow[SchedulerEvents].mapAsync(1) { events =>
import CallerThreadExecutionContext.context
Future
.sequence(events.stateEvents.map {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would go for something like:

mapAsync(1) { events => 
val (updated, deleted) = events.stateEvents.partition(....)
val updatedF: Future[Done] = Source(updated)
 .via(podRepository.storeFlow)
 .runWith(Sink.ignore)

val deletedF: Future[Done] = Source(deleted)
 .via(podRepository.deleteFlow)
 .runWith(Sink.ignore)

updated.andThen(deleted) 
}

It has the downside of 2x materialization but that might be ok for now.

# environment!
# To get the best results, try combining this setting with a throughput
# of 1 on the corresponding dispatchers.
fuzzing-mode = on
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this useful when I was playing around with a custom GraphStage approach I had at some point and I think retaining this would be useful and makes testing more aggressive, i can revert this if there is sth i missed.

@takirala takirala marked this pull request as ready for review April 12, 2019 06:40
@takirala takirala requested review from zen-dog and meln1k April 12, 2019 06:40

"Persistence flow pipelines writes" in {
Given("a slow persistence storage and a flow using it")
val delayPerElement = 50.millis // Delay precision is 10ms
Copy link
Contributor

@timcharper timcharper Apr 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's please avoid using fixed delays in tests.

@@ -101,4 +105,55 @@ class SchedulerTest extends AkkaUnitTest with Inside {
output.cancel()
mesosCompleted.futureValue shouldBe Done
}

"Persistence flow pipelines writes" in {
Copy link
Contributor

@timcharper timcharper Apr 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading this test several times, I'm inclined to submit that we should just delete this test. I don't think that it's testing anything meaningful; it appears to be testing Akka itself, which already has tremendous test coverage.

Perhaps a test that would be more meaningful would be a unit test for the persistence flow itself which asserts that delete and create operations are applied in order, and that the order of elements being processed is preserved (perhaps some fuzzing can be applied by causing the storage operation Future to complete in a non-linear order)

Copy link
Contributor Author

@takirala takirala Apr 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this test does not create much value. It was useful when i had a custom graph stage logic but since we got rid of that, we don't need this exact test anymore. I have rewritten the test using Random.nextInt(100) to apply "fuzzing", lmk if that is not ideal.

@takirala takirala requested a review from zen-dog April 17, 2019 16:13
@takirala takirala added the WIP label Apr 19, 2019
@takirala takirala removed the WIP label Apr 19, 2019
}

override def preStart(): Unit =
initialPodRecords.onComplete(startGraph.invoke)(CallerThreadExecutionContext.context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, because this loads, the pod records, right? I still don't quite understand why it is a callback. What do you think of passing initialPodRecods to createLogic? This would simplify things a bit and the loading of the pod records should not be part of the logic anyways IMHO.

@takirala takirala force-pushed the tga/use-inmemory-persistence branch from f22c19c to 265e7fd Compare April 23, 2019 22:46
private[core] def unconnectedGraph(
mesosCallFactory: MesosCalls,
podRecordRepository: PodRecordRepository): BidiFlow[SpecInput, StateOutput, MesosEvent, MesosCall, NotUsed] = {
val schedulerLogicGraph = new SchedulerLogicGraph(mesosCallFactory, loadPodRecords(podRecordRepository))
Copy link
Contributor

@jeschkies jeschkies Apr 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting closer to it but I don't understand why I cannot be a async. The two main factory methods asFlow and asSinkAndSource return futures. So you could simply call

  def asFlow(specsSnapshot: SpecsSnapshot, client: MesosClient, , podRecordRepository: PodRecordRepository)(
      implicit materializer: Materializer): Future[(StateSnapshot, Flow[SpecUpdated, StateEvent, NotUsed])] = async {

    implicit val ec = scala.concurrent.ExecutionContext.Implicits.global //only for ultra-fast non-blocking onComplete

    val initialRecords = await(loadRecords(repository))
    val (snap, source, sink) = asSourceAndSink(specsSnapshot, client, initialRecords)
    (await(snap), Flow.fromSinkAndSourceCoupled(sink, source))
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discusses this offline, In an ideal world we should be using async-await construct to do this in a non-blocking fashion. But factory methods fromFlow and fromClient requires this to be done in a blocking fashion. I guess we can find a more cleaner solution once we finalize on a limited number of factory methods.

Based on the isolated fact that we can't make any progress until we load the records I guess this is okay for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromClient and fromFlow could just be non-blocking. The signature of these methods is not set in stone.

@jeschkies jeschkies dismissed their stale review April 26, 2019 09:03

Since I'll be out and do not want to block this PR.

@takirala
Copy link
Contributor Author

@zen-dog @meln1k can you please re-review this :)

Copy link
Contributor

@zen-dog zen-dog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few nits about naming and similar. My biggest question mark is the location of the InMemoryStore* related files. See my comment. Once clarified, this PR is good with me!

import com.mesosphere.utils.UnitTest
import com.typesafe.scalalogging.StrictLogging

class InMemoryRepositoryTest extends UnitTest with RepositoryBehavior with StrictLogging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by the location of this file (and other in-memory-repository related files). Why are those in the persistence-zookeeper sub-module? Why is InMemoryPodRecordRepository.scala in the persistence sub-module? Ideally:

usi
  |
  ...
   - persistence # traits and interfaces, no implementation
   - persistence-zookeeper # zookeeper implementation
   - test-utils # In-memory implementation here (see DummyMetrics for example)? 
      It is not supposed to be used in production so this seems appropriate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it was done so that we don't need to have

implementation project(':test-utils')

and can just do

implementation project(':persistence')
testCompile project(':test-utils')

in order to instantiate the persistence layer. I have moved InMemoryPodRecordRepository to test-utils and added a note to build.gradle file to replace the implementation project(':test-utils') with testCompile project(':test-utils') when we have a persistence-zookeeper implementation.

The only thing I am unsure about is the location of InMemoryRepositoryTest which is currently in persistence-zookeeper. I can not move this to test-utils as it would cause a cyclic dependency. I guess having this here is fine as its intention is to verify the RepositoryBehavior which would be used to verify a zk persistence layer in future. wdyt ?

Copy link
Contributor

@jeschkies jeschkies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside from the blocking call in the Scheduler factory methods I'm fine with this patch 🎉

private[core] def unconnectedGraph(
mesosCallFactory: MesosCalls,
podRecordRepository: PodRecordRepository): BidiFlow[SpecInput, StateOutput, MesosEvent, MesosCall, NotUsed] = {
val schedulerLogicGraph = new SchedulerLogicGraph(mesosCallFactory, loadPodRecords(podRecordRepository))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromClient and fromFlow could just be non-blocking. The signature of these methods is not set in stone.

Copy link
Contributor

@zen-dog zen-dog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

@takirala takirala merged commit dd4c04a into master May 8, 2019
@takirala takirala deleted the tga/use-inmemory-persistence branch May 8, 2019 09:30
@takirala
Copy link
Contributor Author

takirala commented May 8, 2019

Thanks for the reviews @zen-dog @jeschkies @meln1k and @timcharper :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants