Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is async macro leaking? #265

Open
hurlebouc opened this issue May 6, 2021 · 9 comments · May be fixed by scala/scala#10783
Open

Is async macro leaking? #265

hurlebouc opened this issue May 6, 2021 · 9 comments · May be fixed by scala/scala#10783

Comments

@hurlebouc
Copy link

Hi,

when you run code

object Main {

implicit val syncExecutor : ExecutionContext = new ExecutionContext {
    override def execute(runnable: Runnable): Unit = runnable.run()

    override def reportFailure(cause: Throwable): Unit = throw cause
  }

  def stop : Unit = {
    Thread.sleep(60000)
    println("stop")
  }

  class Witness

  def main(args: Array[String]): Unit = {
    async{
      await(Future{new Witness})
      stop
    }
  }
}

and then, during the sleep, you take a heap dump with VisualVM, you see that there is still an object of type Witness in the heap. Is this normal?

@hurlebouc hurlebouc changed the title is async macro leaking? Is async macro leaking? May 6, 2021
@viktorklang
Copy link
Contributor

@hurlebouc While I realize that this is just a reproducer, that syncExecutor implementation is completely broken. I think you can just replace all of it with ExecutionContext.parasitic and make the reproducer shorter in the process.

@hurlebouc
Copy link
Author

The syncExecutor runs futures in the current thread. It's by design.

@viktorklang
Copy link
Contributor

@hurlebouc parasitic is a correct implementation of a current thread ExecutionContext. The syncExecutor above has several problems (execute will lead to stack issues and reportError is not supposed to throw exceptions.)

@hurlebouc
Copy link
Author

Ok.
@viktorklang : do you think this is related with the leaking problem?

@viktorklang
Copy link
Contributor

@hurlebouc No idea, but it's a bit of a two-for-one to reduce the size of the reproducer AND help people avoid problems with their sync ExecutionContexts. :)

@hurlebouc
Copy link
Author

@viktorklang Thank you for the advice.

But event replacing syncExecutor by parasitic leads to the same problem. My hypothesis is that line await(Future{new Witness}) is converted to line val tmp = await(Future{new Witness}) by ANF transformation, which results in memory leaks because tmp is not freed during execution of stop.

@retronym
Copy link
Member

This is the translation:

         final class stateMachine$async extends scala.async.FutureStateMachine {
            def <init>(): stateMachine$async = {
              stateMachine$async.super.<init>(ec);
              ()
            };
            override def apply(tr$async: scala.util.Try): Unit = while$(){
              try {
                stateMachine$async.this.state() match {
                  case 0 => {
                    val awaitable$async: scala.concurrent.Future = scala.concurrent.Future.apply({
                      final <artifact> def $anonfun$apply(): scala.async.Leak.Witness = new scala.async.Leak.Witness();
                      (() => $anonfun$apply())
                    }, ec);
                    tr$async = stateMachine$async.this.getCompleted(awaitable$async);
                    stateMachine$async.this.state_=(1);
                    if (null.!=(tr$async))
                      while$()
                    else
                      {
                        stateMachine$async.this.onComplete(awaitable$async);
                        return ()
                      }
                  }
                  case 1 => {
                    <synthetic> val await$1: Object = {
                      val tryGetResult$async: Object = stateMachine$async.this.tryGet(tr$async);
                      if (stateMachine$async.this.==(tryGetResult$async))
                        return ()
                      else
                        tryGetResult$async.$asInstanceOf[Object]()
                    };
                    await$1;
                    Leak.this.stop();
                    stateMachine$async.this.completeSuccess(scala.runtime.BoxedUnit.UNIT);
                    return ()
                  }
                  case _ => throw new IllegalStateException(java.lang.String.valueOf(stateMachine$async.this.state()))
                }
              } catch {
                case (throwable$async @ (_: Throwable)) => {
                  stateMachine$async.this.completeFailure(throwable$async);
                  return ()
                }
              };
              while$()
            };
            override <bridge> <artifact> def apply(v1: Object): Object = {
              stateMachine$async.this.apply(v1.$asInstanceOf[scala.util.Try]());
              scala.runtime.BoxedUnit.UNIT
            }
          };

Your analysis is correct: the ANF transform introduces <synthetic> val await$1: Object = { to hold the result of the future, even though you don't use this subsequently.

The workaround would be to manually discard the result of the future await(Future{new Witness; ()}). It would be preferable to the ANF transform smarter to avoid the problem but I don't see a straighforward way to implement it yet.

@djspiewak
Copy link
Member

You would have the same issue using map or flatMap and an executor that doesn't bounce the stack frame:

object Main {
  def stop: Unit = {
    Thread.sleep(60000)
    println("stop")
  }

  class Witness

  def main(args: Array[String]): Unit = {
    Future(new Witness).map(_ => stop)
    ()
  }
}

You'll see the same issue above. The problem isn't really the async macro. The problem is the fact that Future memoizes its results. As long as a Future is on the heap, its results are (once evaluated).

You're basically asking Future to behave a bit like IO. For example, if we port your example to Cats Effect:

import cats.effect._
import cats.effect.cps._

import scala.concurrent.duration._

object Main extends IOApp.Simple {

  val stop: IO[Unit] =
    IO.sleep(1.minute) >> IO.println("stop")

  class Witness

  val run = 
    async[IO] {
      IO(new Witness).await
      stop.await
    }
}

If you run the above and take a heap dump during stop, you'll find no Witness anywhere. And just to prove nothing strange is going on, we can still run the Witness creation through Future and observe the same effect:

import cats.effect._
import cats.effect.cps._

import scala.concurrent.duration._

object Main extends IOApp.Simple {

  val stop: IO[Unit] =
    IO.sleep(1.minute) >> IO.println("stop")

  class Witness

  val run = 
    async[IO] {
      IO.fromFuture(IO(Future(new Witness))).await
      stop.await
    }
}

Same idea. Since Future retains its results, anything that retains the future also retains the results. IO doesn't retain its results, so it doesn't exhibit the same issue. The async macro itself has no real impact one way or another aside from obscuring the val which is holding the Future (which would otherwise be more apparent in a direct construction).

@som-snytt
Copy link
Contributor

The PR turns the capturing val into a var, and if we see the await$1 all by its lonesome in "statement position", we know it wasn't "consumed" by an enclosing expression or definition (which possibly would have to be a val or a method arg); at that point, null out the var.

At stage or round N of the state machine, we receive the result of the F at N-1, so presumably there is no live reference to the F.

F itself should be off the hook.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants