New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

SI-7336 Link flatMapped promises to avoid memory leaks #2674

Merged
merged 1 commit into from Jul 12, 2013

Conversation

Projects
None yet
6 participants
@richdougherty
Contributor

richdougherty commented Jun 23, 2013

This patch addresses a bug (or missing optimisation) in Future.flatMap which can lead to memory leaks. We see these leaks in Play because Play iteratees use futures extensively.

Backward binary compatibility is preserved by this patch. Several new methods are introduced to DefaultPromise (theoretically a "private" class) to allow linking promises together. These new methods have been whitelisted for forward binary compatibility. Strictly speaking, only a single method new method to link promises is really needed. But if we're going to introduce one new method, we may as well introduce three, right? 馃槈

The patch is split into two changes. The first change can be cherry-picked into master. The second change has changes to preserve backward binary compatibility and whitelist forward binary compatibility.

This fix is based is based on the Twitter promise implementation. The current Scala implementation uses onComplete handlers to propagate the ultimate value of a flatMap operation to its promise. Recursive calls to flatMap build a chain of onComplete handlers and promises. Unfortunately none of the handlers or promises in the chain can be collected until the the handlers are called and detached, which only happens when the final flatMap future is completed. (In some situations, such as an infinite stream, this may never actually happen.) Because of the fact that the promise implementation internally creates references between promises, and these references are invisible to user code, it is too easy for user code to accidentally build a large chain of promises and leak memory. See the included test which quickly exhausts JVM memory with a simple recursive loop鈥攂ut not once this patch is applied. Even knowing about the internal problem, it is difficult to avoid this kind of leak without avoiding flatMap entirely.

Both the Twitter implementation and this patch solve the problem of leaks by automatically breaking these chains of promises, so that promises don't refer to each other in a long chain. This allows each promise to be individually collected. The idea is to "flatten" the chain of promises, so that instead of each promise pointing to its neighbour, they instead point directly the promise at the root of the chain. This means that only the root promise is referenced, and all the other promises are available for garbage collection if they're not referenced by user code.

To make the chains flattenable, the concept of linking promises together becomes an explicit feature in the promise implementation. This allows the implementation to navigate and rewire links as needed. A DefaultPromise gets a new state: being linked to another DefaultPromise. See the scaladoc for more details.

In practice, flattening the chain cannot always be done perfectly. When a promise is added to the end of the chain, it scans the chain and links directly to the root ("canonical") promise. But the root promise for a chain can change, which will leave all previously-linked promise pointing at a the old, now non-root, promise, rather than the new root promise. To mitigate the problem of the root promise changing, whenever any linked promise's methods are called, and it needs a reference to its root promise, it re-scans the promise chain and relinks itself directly to whatever the current root promise is, even if that promise has changed. Basically, rescanning and relinking occur at every possible opportunity. Unfortunately, even this eager relinking doesn't absolutely guarantee that the chain will be flattened and that leaks cannot occur. However it does greatly reduces the chance that they will occur.

To guarantee no leaks we'd probably need to retain backwards references when the root promise changed, and update all the promises pointing to the old root promise. These backward references would need to be weak references to prevent leaks. Changing the root promise would become a very expensive operation, which would be unfortunate, as most of the updated promises would be garbage anyway and updating them would be wasted work. Personally, I think mainting backwards weak references is too expensive to justify. So we are left with a fast and cheap implementation that does a pretty good, but not perfect, job of flattening promise chains.

The re-scanning and relinking code in this patch actually differs from the code in the Twitter implementation. The Twitter implementation relinks all promises it encounters as it scans for the root promise, whereas this patch only relinks a single promise. The reason for this is that the Twitter implementation uses the stack to store promises as it updates them, and so risks a stack overflow. In other words, the Twitter code is slightly more aggressive about flattening the promise chain, but runs the (admittedly small) risk of a overflowing the stack. The code in this patch does less flattening鈥攕o might leak memory in more cases鈥攂ut cannot overflow the stack. There is a trade off with no clear best answer. I'd be interested to hear other opinions.

@richdougherty

View changes

src/library/scala/concurrent/Future.scala Outdated
f(v) match {
case dp: DefaultPromise[_] =>
// Link DefaultPromises to avoid space leaks
dp.asInstanceOf[DefaultPromise[S]].link(p.canonical())

This comment has been minimized.

@richdougherty

richdougherty Jun 23, 2013

Contributor

Could just have one method here (linkCanonical()) if we wanted to avoid adding more than one new method to DefaultPromise. I think two methods are fine, but I'm mentioning it in case we want to try extra hard to add new methods for some reason.

This comment has been minimized.

@viktorklang

viktorklang Jun 23, 2013

Contributor

Is there a case where you wouldn't want to link to the canonical? i.e. should the p.canonical() go inside of link?

This comment has been minimized.

@richdougherty

richdougherty Jun 23, 2013

Contributor

Yes, I think p.canonical() can go inside link(). It would reduce the public method surface of DefaultPromise.

This comment has been minimized.

@viktorklang

viktorklang Jun 23, 2013

Contributor

The one thing I'm still skeptical about is that this is in the Future interface instead of in the impl.
Why is it not in the impl, I assume that other implementation of Futures don't want to have this here since they most likely won't use DefaultPromise as their Promise impl. What do you think?

This comment has been minimized.

@richdougherty

richdougherty Jun 26, 2013

Contributor

I'm not sure what you're suggesting should be in the impl. Maybe the flatMap method itself? Or the pattern match on the future returned by f(v)?

I suspect you're getting a bad feeling about seeing impl symbols like DefaultPromise inside the otherwise clean Future trait, but I'll await clarification鈥

This comment has been minimized.

@viktorklang

viktorklang Jun 26, 2013

Contributor

Yes, exactly. And Future is meant to be implemented by others. This may not be an issue though since today we are in practice returning DefaultPromises all over the place. Perhaps one solution would be to have a "createPromise" method on the Future trait and have that be called instead of Promise.apply. * thinking out loud *

This comment has been minimized.

@richdougherty

richdougherty Jun 26, 2013

Contributor

OK, thanks for explaining. My intuition is that the type of promise created inside map/flatMap/filter/etc is not something that you'd want to change when extending Future. The reason for my belief is that I view the type of promise created and returned by these methods as being solely driven by the needs of the logic inside the methods, rather than being driven by the type of future that the methods are called on.

The logic for each of these methods goes like this: return a promise, use onComplete to wait for the value of the underlying future, do some stuff with that value, then complete the promise. This logic remains unchanged even if the type of future changes. If the underlying future type changes then it will change the implementation of onComplete, but all the logic in the methods remains stay the same. Since the logic stays the same, the default type of promise we return will still work too. I don't think we need to provide a hook that lets people change the type of promise used by that logic. (And we don't currently provide a hook either.)

Now some types of future might conceivably have their own way to map/flatMap/etc. But if that's the case then they can go the whole way and override the methods entirely. That of course gives them the freedom to use whatever type of promise they wish. :)

They only thing we're not giving users is a way to do their own linking of Scala promises. We're keeping the promise linking API private. I'm happy to keep it private for the moment. We can always expose promise linking in the API in the future if we see that there is a need. I'd rather be conservative about adding to the API. It is much easier for us to add to the API in the future than it is for us to remove something once it has been added.

This comment has been minimized.

@viktorklang

viktorklang Jun 26, 2013

Contributor

Sounds good.

@richdougherty

View changes

src/library/scala/concurrent/impl/Promise.scala Outdated
*/
class DefaultPromise[T] extends AbstractPromise with Promise[T] { self =>
updateState(null, Nil) // Start at "No callbacks"
updateState(null, Nil)

This comment has been minimized.

@richdougherty

richdougherty Jun 23, 2013

Contributor

I'll re-add a comment here.

@richdougherty

View changes

src/library/scala/concurrent/impl/Promise.scala Outdated
/** Get the promise at the head of the chain of linked promises. Used by `canonical()`.
*/
@tailrec
private def headLinked: DefaultPromise[T] = {

This comment has been minimized.

@richdougherty

richdougherty Jun 23, 2013

Contributor

Could be a local function inside canonical if we prefer that style. But in the master version of this patch there are no local functions; the 2.10.x version has them only because we can't directly make value or isCompleted final.

This comment has been minimized.

@viktorklang

viktorklang Jun 23, 2013

Contributor

I'd prefer a local method or even a while loop inside canonical since traits cannot have private methods.

This comment has been minimized.

@richdougherty

richdougherty Jun 23, 2013

Contributor

Did you know that DefaultPromise is a class?

* listeners, or `null` if it is already completed.
*/
@tailrec
private def tryCompleteAndGetListeners(v: Try[T]): List[CallbackRunnable[T]] = {

This comment has been minimized.

@richdougherty

richdougherty Jun 23, 2013

Contributor

Again could be a local function.

This comment has been minimized.

@viktorklang

viktorklang Jun 23, 2013

Contributor

I don't know what's preferred, @retronym is it better to have a a private method or a nested method?

This comment has been minimized.

@richdougherty

richdougherty Jun 25, 2013

Contributor

I'd prefer to use private methods because then there's only one promise in scope, so I can't accidentally use the wrong one!

@richdougherty

View changes

src/library/scala/concurrent/impl/Promise.scala Outdated
/** Tries to add the callback, if already completed, it dispatches the callback to be executed
*/
@tailrec
private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = {

This comment has been minimized.

@richdougherty

richdougherty Jun 23, 2013

Contributor

This method cannot be a local function, as it is used by both onComplete and link. The link method uses dispatchOrAddCallback to safely transfer callbacks to the other promise.

This comment has been minimized.

@viktorklang

viktorklang Jun 23, 2013

Contributor

Alright, might be good to add as a comment

@richdougherty

View changes

src/library/scala/concurrent/impl/Promise.scala Outdated
getState match {
case r: Try[_] =>
if (!target.tryComplete(r.asInstanceOf[Try[T]])) throw new IllegalStateException("Cannot link completed promises together")

This comment has been minimized.

@richdougherty

richdougherty Jun 23, 2013

Contributor

This state cannot be reached unless someone casts the Future returned by Future.flatMap into a Promise and then completes that Promise.

An alternative way to handle the problem of linking two completed promises would be to return a boolean expressing the linking result and let the caller (only Future.flatMap) handle the situation. However, lacking any use-case for recovery at the moment, my preference is to fail at the point the problem is discovered as this completely eliminates the possibility of forgetting to handle the problem.

This comment has been minimized.

@viktorklang

viktorklang Jun 23, 2013

Contributor

Do you have a test for this?

This comment has been minimized.

@richdougherty

richdougherty Jun 25, 2013

Contributor

I'll add one.

@richdougherty

View changes

src/library/scala/concurrent/impl/Promise.scala Outdated
* copying this promise's result to the target promise.
*/
@tailrec
final def link(target: DefaultPromise[T]): Unit = {

This comment has been minimized.

@richdougherty

richdougherty Jun 23, 2013

Contributor

This method (and canonical) is public because the DefaultPromise class is essentially private[concurrent]. However I can also make these methods private[concurrent] if it seems like a good idea.

This comment has been minimized.

@viktorklang

viktorklang Jun 23, 2013

Contributor

A LinkablePromise trait/interface perhaps? That is not exposed in the Promise interface, so it's only visible for DefaultPromise (extends Promise with LinkablePromise?)

@viktorklang

View changes

src/library/scala/concurrent/impl/Promise.scala Outdated
* `canonical()` will just be `this`. However for linked promises, this method will
* traverse each link until it locates the canonical promise at the head of the link chain.
*
* As a side effect of calling this method, any link from this promise back to the

This comment has been minimized.

@viktorklang

viktorklang Jun 23, 2013

Contributor

I don't like that it looks like a getter but it actually side-effects. Can we / should we do something about that?

This comment has been minimized.

@richdougherty

richdougherty Jun 23, 2013

Contributor

The Twitter version of this is called compress(). Do you like that better? Mind you, that mentions the side effect, but not the getting. I'd prefer something that isn't too long.

@viktorklang

View changes

src/library/scala/concurrent/impl/Promise.scala Outdated
@tailrec
def value0(p: DefaultPromise[T]): Option[Try[T]] = getState match {
case c: Try[_] => Some(c.asInstanceOf[Try[T]])
case _: DefaultPromise[_] => value0(canonical())

This comment has been minimized.

@viktorklang

viktorklang Jun 23, 2013

Contributor

Alright, so one thing that will trip people up is that getState also operates on the linked promise, but you have to agree that it looks weird to call getState on this, and technically not use p at all.

This comment has been minimized.

@richdougherty

richdougherty Jun 23, 2013

Contributor

Good catch. That should be p.canonical(). (That's part of the reason I've been preferring private methods rather than local functions鈥攖heres only one DefaultPromise in scope to operate on, so no mistakes.)

This comment has been minimized.

@viktorklang

viktorklang Jun 23, 2013

Contributor

Is there a regression test for this?

This comment has been minimized.

@richdougherty

richdougherty Jun 25, 2013

Contributor

Both p.canonical() and this.canonical() both traverse the chain of linked promises and yield the same result, but the former traverses a slightly shorter chain. So there's no observable difference between the two that I can test unfortunately, without exposing the traversal logic (which would be slow).

This comment has been minimized.

@viktorklang

viktorklang Jun 25, 2013

Contributor

p.canonical() would definitely be preferable to me

This comment has been minimized.

@richdougherty

richdougherty Jun 26, 2013

Contributor

:) Yes, I'll fix it up so we get the faster one!

@viktorklang

View changes

src/library/scala/concurrent/impl/Promise.scala Outdated
@tailrec
def isCompleted0(p: DefaultPromise[T]): Boolean = getState match {
case _: Try[_] => true
case _: DefaultPromise[_] => isCompleted0(canonical())

This comment has been minimized.

@viktorklang

viktorklang Jun 23, 2013

Contributor

Alright, so one thing that will trip people up is that getState also operates on the linked promise, but you have to agree that it looks weird to call getState on this, and technically not use p at all.

This comment has been minimized.

@richdougherty

richdougherty Jun 23, 2013

Contributor

Ditto.

This comment has been minimized.

@viktorklang

viktorklang Jun 23, 2013

Contributor

Ditto.

@retronym

This comment has been minimized.

Member

retronym commented Jun 23, 2013

Can you please move some parts of your commentary into the commit message and/or doc comments in the code? It is too good to be effectively lost to history as a PR comment.

We also require all commits to pass the build, so you'll need organize the work differently. You could submit this as one commit prefixed with [nomaster] SI-1234 ..., and submit a separate PR to master without the binary compatibility considerations. (But, remember that 2.11.0 should be source compatible with 2.10.3)

@gkossakowski

View changes

test/files/run/t7336.scala Outdated
import scala.concurrent.Future
import scala.concurrent.duration.Duration
object Test {

This comment has been minimized.

@gkossakowski

gkossakowski Jun 25, 2013

Member

How about making this test a junit test?

See 25a8e70 and 173e709.

The advantages of junit tests:

  • they executed faster
  • they have proper IDE support

This comment has been minimized.

@richdougherty

richdougherty Jun 26, 2013

Contributor

I'm happy to do that, but one consideration is that this particular test doesn't play very nice with other tests. If it fails, it causes an OutOfMemoryError.

I am going to add some other tests though, and I'll make these JUnit tests.

This comment has been minimized.

@adriaanm

adriaanm Jun 26, 2013

Member

As a general note (only tangentially related), our test suite runs on machines that are running at extreme loads, so anything that's time sensitive or might run for a long time/fail under these conditions is problematic.

This comment has been minimized.

@richdougherty

richdougherty Jun 26, 2013

Contributor

Thanks for the heads up. This test should still work fine under load. All it does is allocate heap size / 1MiB arrays of size 1MiB and then exit. Prior to this patch it would crash the VM in <1s. After this patch the arrays can be collected straight out of the nursery (I assume), so the test runs nice and fast and also shouldn't grow the JVM process heap.

This comment has been minimized.

@soc

soc Jul 1, 2013

Member

Not specific to this issue, but it would make sense to throw away the JVM process anyway if an OOME or an SOE is encountered, because there is no guarantee that the runtime is in a consistent state afterwards.

@@ -473,6 +473,14 @@ filter {
problemName=MissingClassProblem
},
{
matchName="scala.concurrent.impl.Promise#DefaultPromise.linkRootOf"

This comment has been minimized.

@adriaanm

adriaanm Jul 5, 2013

Member

I'm okay with breaking forward compatibility in internal/impl packages when a bugfix is impossible otherwise.
It would still be good to ensure these methods won't (accidentally) be called by users, as that would create a dependency on this particular version of Scala. So, could you mark this method protected[scala.concurrent]? It's still against the letter of the law, but in the right spirit :-) (Perhaps we should make MiMa more flexible to take scala visibility into account when checking forward compatibility. Even better, generate the right access modifiers in bytecode so Java clients are protected as well.)

This comment has been minimized.

@richdougherty

richdougherty Jul 6, 2013

Contributor

OK - now marked as protected[concurrent].

This comment has been minimized.

@soc

soc Jul 6, 2013

Member

I'm wondering how the futures in Java 8 handle this issue...

@adriaanm

View changes

src/library/scala/concurrent/impl/Promise.scala Outdated
/** Link this promise to the root of another promise using `link()`. Should only be
* be called by Future.flatMap.
*/
final def linkRootOf(target: DefaultPromise[T]): Unit = link(target.compressedRoot())

This comment has been minimized.

@adriaanm

adriaanm Jul 5, 2013

Member

Please make this protected[scala.concurrent].

@adriaanm

This comment has been minimized.

Member

adriaanm commented Jul 5, 2013

Excellent work -- thanks, @richdougherty! Awesome docs and tests. (And code, of course :-))

@adriaanm

This comment has been minimized.

Member

adriaanm commented Jul 12, 2013

Yay, 馃嵃 all around!

adriaanm added a commit that referenced this pull request Jul 12, 2013

Merge pull request #2674 from richdougherty/2.10.x-si7336-try2
SI-7336 Link flatMapped promises to avoid memory leaks

@adriaanm adriaanm merged commit 2247593 into scala:2.10.x Jul 12, 2013

1 check passed

default pr-checkin-per-commit Took 44 min.
Details

@richdougherty richdougherty deleted the richdougherty:2.10.x-si7336-try2 branch Nov 11, 2014

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment