Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #380: Deferred, Semaphore and MVar #424

Merged
merged 16 commits into from
Nov 30, 2018

Conversation

alexandru
Copy link
Member

@alexandru alexandru commented Nov 24, 2018

Fixes #380.

TL;DR about the issue:

  1. when we have an acquire and release pair, like we have in Deferred (get / complete), Semaphore (acquire / release) and MVar (take / put), all of them being similar, the release part can block indefinitely in case the bind continuation of acquire is a never ending loop
  2. the fix would be for acquire to always fork after an await, however we do not have shift available because we don't have a ContextShift restriction specified
  3. F.unit.start.flatMap(_.join) is NOT a replacement for shift

The solution is, when notifying consumers that are waiting for a value, to start and then forget about the result, something like this:

def notifyReaders(a: A, r: Iterable[A => Unit]): F[Unit] = {
  r.foldLeft(F.unit) { (acc, cb) => 
    acc.flatMap { _ =>
      F.delay(cb(a)).start.map(_ => ())
    }
  }
}

This is the only reasonable solution for the Concurrent-enabled implementations. Some notes:

  • ordering callback calls is wrong, because a single callback that blocks will block all the rest, just like it blocks the thread calling release (this is related to a remark by @SystemFw that Semaphore relies on listeners waking up in order and I really hope you only rely on the priority of the listeners and not on how the actual callbacks are being called, because that would be very wrong)
  • we really need startAndForget in the type class hierarchy (Cats-Effect 2.0) ... unfortunately we cannot add it for Scala 2.11 without breaking binary compatibility, but maybe we can find alternative ways until then
  • I believe we need a suspendWithContext[A](f: ExecutionContext => F[A]): F[A] in ContextShift; this too breaks binary compatibility, plus it involves some work in IO's run-loop, but that part is I believe doable without binary compat changes

Unfortunately in fixing MVar I needed this function and it made sense to expose it:

object Concurrent {
  // ...

  def cancelableF[F[_], A](k: (Either[Throwable, A] => Unit) => F[CancelToken[F]])
    (implicit F: Concurrent[F]): F[A] = {

    F.asyncF { cb =>
      val state = new AtomicReference[Either[Throwable, Unit] => Unit](null)
      val cb1 = (a: Either[Throwable, A]) => {
        try {
          cb(a)
        } finally {
          // This CAS can only succeed in case the operation is already finished
          // and no cancellation token was installed yet
          if (!state.compareAndSet(null, Callback.dummy1)) {
            val cb2 = state.get()
            state.lazySet(null)
            cb2(Callback.rightUnit)
          }
        }
      }
      // Until we've got a cancellation token, the task needs to be evaluated
      // uninterruptedly, otherwise risking a leak, hence the bracket
      F.bracketCase(k(cb1)) { _ =>
        F.async[Unit] { cb =>
          if (!state.compareAndSet(null, cb)) {
            cb(Callback.rightUnit)
          }
        }
      } { (token, e) =>
        e match {
          case ExitCase.Canceled => token
          case _ => F.unit
        }
      }
    }
  }  
}

@alexandru alexandru changed the title Fix #380: Deferred, Semaphore and MVar WIP: Fix #380: Deferred, Semaphore and MVar Nov 24, 2018
@alexandru
Copy link
Member Author

I broke MVar apparently. Have to fix it.

@codecov-io
Copy link

codecov-io commented Nov 24, 2018

Codecov Report

Merging #424 into master will increase coverage by 2.63%.
The diff coverage is 96.61%.

@@            Coverage Diff             @@
##           master     #424      +/-   ##
==========================================
+ Coverage   87.03%   89.66%   +2.63%     
==========================================
  Files          69       70       +1     
  Lines        1905     2003      +98     
  Branches      199      199              
==========================================
+ Hits         1658     1796     +138     
+ Misses        247      207      -40

@alexandru alexandru changed the title WIP: Fix #380: Deferred, Semaphore and MVar Fix #380: Deferred, Semaphore and MVar Nov 24, 2018
val cursor = r.iterator
while (cursor.hasNext) {
val next = cursor.next()
val task = F.map(F.start(F.delay(next(a))))(mapUnit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F.void instead of F.map(..)(mapUnit)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F.void is more wasteful, because it translates to F.map(task)(_ => ()) and the Scala compiler won't reuse the created _ => ().

When doing performance-sensitive work, in general I care about these issues in about this order:

  1. avoiding extraneous async or lazy boundaries
  2. avoiding extraneous compareAndSet operations
  3. avoiding extraneous memory allocations

Caching dummy functions like _ => () falls into point 3, and sometimes I screw up due to the code becoming unclear, but it doesn't happen that often 🙂

@@ -388,6 +388,80 @@ object Concurrent {
def timeout[F[_], A](fa: F[A], duration: FiniteDuration)(implicit F: Concurrent[F], timer: Timer[F]): F[A] =
timeoutTo(fa, duration, F.raiseError(new TimeoutException(duration.toString)))

/**
* Function that creates an async and cancelable `F[A]`, similar with
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What guarantees are provided w.r.t. cancelation of the returned action?

Copy link
Member Author

@alexandru alexandru Nov 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good question.

I realized that the task generated by the provided generator function should be evaluated uninterruptedly, otherwise it can leak.

I fixed it and added a ScalaDoc comment about it.

It's now using bracketCase, which tbh isn't ideal for performance, but it does so only after the first put or take attempt failed, so it's not on the happy path.

@alexandru alexandru changed the title Fix #380: Deferred, Semaphore and MVar WIP: Fix #380: Deferred, Semaphore and MVar Nov 29, 2018
@alexandru alexandru changed the title WIP: Fix #380: Deferred, Semaphore and MVar Fix #380: Deferred, Semaphore and MVar Nov 30, 2018
@SystemFw SystemFw merged commit 390e2a7 into typelevel:master Nov 30, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve Semaphore.release behavior
4 participants