New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add monix.catnap.CircuitBreaker and LiftFuture #744

Merged
merged 22 commits into from Oct 16, 2018

Conversation

Projects
None yet
3 participants
@alexandru
Member

alexandru commented Oct 15, 2018

Moved monix.eval.TaskCircuitBreaker to monix.catnap and made it generic, to work with any Sync or Async data type.

I also improved the implementation, adding a method for waiting on the circuit breaker to be in the Closed state, for more fine grained retry logic:

def awaitClose(implicit F: Async[F]): F[Unit]

This makes it possible to do this:

def protectWithRetry[F[_], A](task: F[A], cb: CircuitBreaker[F])
  (implicit F: Async[F]): F[A] = {
  
  cb.protect(task).recoverWith {
    case _: ExecutionRejectedException =>
      // Waiting for the CircuitBreaker to close, then retry
      cb.awaitClose.flatMap(_ => protectWithRetry(task, cb))
    }
}

This logic is optional, the CircuitBreaker working with plain Sync data types. So you can use it with Coeval now.


In support of CircuitBreaker's internal implementation I'm also introducing LiftFuture, a type class resembling LiftIO, but whose purpose is to convert F[Future[A]] values into F[A] values.

trait LiftFuture[F[_]] {
  def liftFuture[A](fa: F[Future[A]]): F[A]
}

Instances and plain functions are provided in the LiftFuture companion object. Also syntax:

import monix.catnap.syntax._

def fromFuture[F[_], A](fa: F[Future[A]])(implicit F: Async[F]): F[A] =
  fa.liftFuture

N.B. this works with CancelableFuture as well:

val task = Task(1)

IO(task.runToFuture).liftFuture

Given that the result of task.runToFuture is a CancelableFuture, the resulting IO[Int] is cancelable as well, using the source's cancellation token.

@Avasil

This comment has been minimized.

Collaborator

Avasil commented Oct 15, 2018

Can we also integrate #383 which I think is the last "forgotten" PR ?

@alexandru

This comment has been minimized.

Member

alexandru commented Oct 15, 2018

@Avasil I think that #383 is solved by these changes.

The request was for having some way to restart the task after the circuit breaker allows it. And we have such logic now, via the introduced awaitClose.

@codecov

This comment has been minimized.

codecov bot commented Oct 15, 2018

Codecov Report

Merging #744 into master will decrease coverage by 0.05%.
The diff coverage is 87.42%.

@@            Coverage Diff             @@
##           master     #744      +/-   ##
==========================================
- Coverage   90.51%   90.45%   -0.06%     
==========================================
  Files         398      401       +3     
  Lines       11256    11349      +93     
  Branches     2077     2079       +2     
==========================================
+ Hits        10188    10266      +78     
- Misses       1068     1083      +15
await.asInstanceOf[Deferred[F, Unit]].get
case _ =>
// $COVERAGE-OFF$
F.raiseError(new APIContractViolationException(

This comment has been minimized.

@oleg-py

oleg-py Oct 15, 2018

Collaborator

So, it requires you to know how exactly the CircuitBreaker was constructed in order to understand how this method would behave at runtime? I have a feeling like this is a bad idea.

This comment has been minimized.

@alexandru

alexandru Oct 15, 2018

Member

You'll have a hard time building a CircuitBreaker instance that reaches this code.

For IO and Task specifically you'd have to build your own Sync instance that's not Async. And then you'd have to pass it explicitly in that constructor, otherwise their own Async[IO] or Async[Task] instances would take precedence.

It's not something that I can see happening. In the wild I'm pretty sure that this branch will never be hit, because it cannot be hit accidentally.

This comment has been minimized.

@alexandru

alexandru Oct 15, 2018

Member

Also the gains justifies the means — in this way we keep CircuitBreaker working with plain Sync data types and then have this little extra just for Async data types. It's what parametric polymorphism is all about.

This comment has been minimized.

@alexandru

alexandru Oct 15, 2018

Member

An alternative would be to return an F[Boolean] instead of throwing an error. I would rather throw the error, because a Boolean could be easily ignored.

This comment has been minimized.

@alexandru

alexandru Oct 15, 2018

Member

I have added a test to prove it:

https://github.com/monix/monix/pull/744/files#diff-48d1b2884eabc0edcb1c6dbcc95a039cR372

The magic is in OrElse definition, which does NOT use subtyping. So even if you have a Sync[F] override in scope, if an Async[F] is available then it's going to get used.

It's hard to force that error to happen and I don't think it can happen by accident.

This comment has been minimized.

@oleg-py

oleg-py Oct 15, 2018

Collaborator

@alexandru it seems to be trivial with polymorphic code:

def mkCircuitBreaker[F[_]: Sync] = CircuitBreaker[F].of(...) // boom

This comment has been minimized.

@alexandru

alexandru Oct 15, 2018

Member

Sure, but such polymorphic code doesn't have access to an Async[F] instance for calling awaitClose either. And in case the built CircuitBreaker object leaks, like in your code, then the error is reasonable imo.

This comment has been minimized.

@alexandru

alexandru Oct 15, 2018

Member

I just made a change to also use sub-typing when determining the Async instance, so it now looks like this:

  private def buildAwait: Option[Deferred[F, Unit]] =
    F0.fold(
      // Async
      F => Some(Deferred.unsafeUncancelable[F, Unit](F)),
      // Sync
      {
        case ref: Async[F] @unchecked =>
          Some(Deferred.unsafeUncancelable[F, Unit](ref))
        case _ =>
          None
      })

Now this makes it impossible to hit that branch for IO and Task, unless you force the use of an alternative Sync[F] that also risks type class coherence issues.

Also added extra tests for it.

This comment has been minimized.

@oleg-py

oleg-py Oct 15, 2018

Collaborator

Sure, but such polymorphic code doesn't have access to an Async[F] instance for calling awaitClose either

Polymorphic code can have Sync[F] on lower level, but Async[F] on higher level. I do it quite often in my own code :)

I just made a change to also use sub-typing when determining the Async instance

This would be better. Not sure if it would work for transformer instances, however, as those are created on-demand IIRC.

TBH I'd prefer the whole thing to just use AsyncVar or something like that, and have Sync on creation and Async on this particular method.

This comment has been minimized.

@alexandru

alexandru Oct 15, 2018

Member

M'kay, let me think about it 🙂

I would have to hide State, to make it a private API, but on the other hand exposing a Deferred in State is an internals leak too.

alexandru added some commits Oct 15, 2018

@alexandru alexandru changed the title from Add monix.catnap.CircuitBreaker to Add monix.catnap.CircuitBreaker and LiftFuture Oct 15, 2018

@alexandru

This comment has been minimized.

Member

alexandru commented Oct 15, 2018

@oleg-py I've changed the implementation to use a plain Promise internally.

And now that the need for converting from Future happened, I've also introduced a LiftFuture type class 🙂

alexandru added some commits Oct 15, 2018

@alexandru alexandru merged commit baeda33 into monix:master Oct 16, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment