Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Acquire lock per subscription instead of observable-wide lock #794

Merged
merged 10 commits into from Jan 12, 2019

Conversation

Projects
None yet
7 participants
@rfkm
Copy link
Contributor

commented Dec 14, 2018

Hi,

I've been investigating deadlock caused by Monix.

Stack trace:

Found one Java-level deadlock:
=============================
"scala-execution-context-global-2252":
  waiting to lock monitor 0x00007ff024054548 (object 0x00000000891aa608, a java.lang.Class),
  which is held by "scala-execution-context-global-2251"
"scala-execution-context-global-2251":
  waiting to lock monitor 0x00007ff024086678 (object 0x0000000089625140, a monix.reactive.internal.operators.SwitchMapObservable$$anon$1),
  which is held by "scala-execution-context-global-2250"
"scala-execution-context-global-2250":
  waiting to lock monitor 0x00007ff024054548 (object 0x00000000891aa608, a java.lang.Class),
  which is held by "scala-execution-context-global-2251"

Java stack information for the threads listed above:
===================================================
"scala-execution-context-global-2252":
        at monix.reactive.internal.builders.CombineLatest2Observable.monix$reactive$internal$builders$CombineLatest2Observable$$signalOnError$1(CombineLatest2Observable.scala:86)
        - waiting to lock <0x00000000891aa608> (a java.lang.Class for monix.reactive.internal.builders.CombineLatest2Observable)
        at monix.reactive.internal.builders.CombineLatest2Observable$$anon$2.onError(CombineLatest2Observable.scala:160)
        at monix.reactive.internal.operators.MapOperator$$anon$1.onError(MapOperator.scala:55)
        at monix.reactive.internal.operators.DistinctUntilChangedByKeyOperator$$anon$1.onError(DistinctUntilChangedByKeyOperator.scala:75)
        at monix.reactive.internal.builders.AsyncStateActionObservable.$anonfun$loop$1(AsyncStateActionObservable.scala:52)
        at monix.reactive.internal.builders.AsyncStateActionObservable$$Lambda$1872/24786665.apply(Unknown Source)
        at monix.eval.internal.StackFrame$RedeemWith.recover(StackFrame.scala:38)
        at monix.eval.internal.TaskRunLoop$.startFull(TaskRunLoop.scala:108)
        at monix.eval.internal.TaskRunLoop$.$anonfun$restartAsync$1(TaskRunLoop.scala:192)
        at monix.eval.internal.TaskRunLoop$$$Lambda$554/999999316.run(Unknown Source)
        at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
"scala-execution-context-global-2251":
        at monix.reactive.internal.operators.SwitchMapObservable$$anon$1$$anon$2.onError(SwitchMapObservable.scala:74)
        - waiting to lock <0x0000000089625140> (a monix.reactive.internal.operators.SwitchMapObservable$$anon$1)
        at monix.reactive.observers.Subscriber$Implementation.onError(Subscriber.scala:207)
        at monix.reactive.internal.operators.MapOperator$$anon$1.onError(MapOperator.scala:55)
        at monix.reactive.internal.operators.CollectOperator$$anon$1.onError(CollectOperator.scala:60)
        at monix.reactive.internal.operators.MapTaskObservable$MapAsyncSubscriber.signalFinish(MapTaskObservable.scala:310)
        at monix.reactive.internal.operators.MapTaskObservable$MapAsyncSubscriber.onError(MapTaskObservable.scala:346)
        at monix.reactive.internal.operators.MapOperator$$anon$1.onError(MapOperator.scala:55)
        at monix.reactive.internal.builders.CombineLatest2Observable.monix$reactive$internal$builders$CombineLatest2Observable$$signalOnError$1(CombineLatest2Observable.scala:88)
        - locked <0x00000000891aa608> (a java.lang.Class for monix.reactive.internal.builders.CombineLatest2Observable)
        at monix.reactive.internal.builders.CombineLatest2Observable$$anon$1.onError(CombineLatest2Observable.scala:139)
        at monix.reactive.internal.builders.CombineLatest2Observable.monix$reactive$internal$builders$CombineLatest2Observable$$signalOnError$1(CombineLatest2Observable.scala:88)
        - locked <0x00000000891aa608> (a java.lang.Class for monix.reactive.internal.builders.CombineLatest2Observable)
        at monix.reactive.internal.builders.CombineLatest2Observable$$anon$1.onError(CombineLatest2Observable.scala:139)
        at monix.reactive.internal.builders.CombineLatest2Observable.monix$reactive$internal$builders$CombineLatest2Observable$$signalOnError$1(CombineLatest2Observable.scala:88)
        - locked <0x00000000891aa608> (a java.lang.Class for monix.reactive.internal.builders.CombineLatest2Observable)
        at monix.reactive.internal.builders.CombineLatest2Observable$$anon$1.onError(CombineLatest2Observable.scala:139)
        at monix.reactive.internal.operators.MapOperator$$anon$1.onError(MapOperator.scala:55)
        at monix.reactive.internal.operators.MapOperator$$anon$1.onError(MapOperator.scala:55)
        at monix.reactive.internal.builders.CombineLatest2Observable.monix$reactive$internal$builders$CombineLatest2Observable$$signalOnError$1(CombineLatest2Observable.scala:88)
        - locked <0x00000000891aa608> (a java.lang.Class for monix.reactive.internal.builders.CombineLatest2Observable)
        at monix.reactive.internal.builders.CombineLatest2Observable$$anon$2.onError(CombineLatest2Observable.scala:160)
        at monix.reactive.internal.operators.MapOperator$$anon$1.onError(MapOperator.scala:55)
        at monix.reactive.internal.operators.DistinctUntilChangedByKeyOperator$$anon$1.onError(DistinctUntilChangedByKeyOperator.scala:75)
        at monix.reactive.internal.builders.AsyncStateActionObservable.$anonfun$loop$1(AsyncStateActionObservable.scala:52)
        at monix.reactive.internal.builders.AsyncStateActionObservable$$Lambda$1872/24786665.apply(Unknown Source)
        at monix.eval.internal.StackFrame$RedeemWith.recover(StackFrame.scala:38)
        at monix.eval.internal.TaskRunLoop$.startFull(TaskRunLoop.scala:108)
        at monix.eval.internal.TaskRunLoop$.$anonfun$restartAsync$1(TaskRunLoop.scala:192)
        at monix.eval.internal.TaskRunLoop$$$Lambda$554/999999316.run(Unknown Source)
        at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
"scala-execution-context-global-2250":
        at monix.reactive.internal.builders.CombineLatest2Observable.monix$reactive$internal$builders$CombineLatest2Observable$$signalOnError$1(CombineLatest2Observable.scala:86)
        - waiting to lock <0x00000000891aa608> (a java.lang.Class for monix.reactive.internal.builders.CombineLatest2Observable)
        at monix.reactive.internal.builders.CombineLatest2Observable$$anon$2.onError(CombineLatest2Observable.scala:160)
        at monix.reactive.internal.operators.MapTaskObservable$MapAsyncSubscriber.signalFinish(MapTaskObservable.scala:310)
        at monix.reactive.internal.operators.MapTaskObservable$MapAsyncSubscriber.onError(MapTaskObservable.scala:346)
        at monix.reactive.internal.operators.MapOperator$$anon$1.onError(MapOperator.scala:55)
        at monix.reactive.internal.operators.MapOperator$$anon$1.onError(MapOperator.scala:55)
        at monix.reactive.internal.operators.MapOperator$$anon$1.onError(MapOperator.scala:55)
        at monix.reactive.internal.operators.SwitchMapObservable$$anon$1.onError(SwitchMapObservable.scala:98)
        - locked <0x0000000089625140> (a monix.reactive.internal.operators.SwitchMapObservable$$anon$1)
        at monix.reactive.internal.operators.MapTaskObservable$MapAsyncSubscriber.signalFinish(MapTaskObservable.scala:310)
        at monix.reactive.internal.operators.MapTaskObservable$MapAsyncSubscriber.onError(MapTaskObservable.scala:346)
        at monix.reactive.internal.operators.MapOperator$$anon$1.onError(MapOperator.scala:55)
        at monix.reactive.internal.operators.MapOperator$$anon$1.onError(MapOperator.scala:55)
        at monix.reactive.internal.operators.DistinctUntilChangedByKeyOperator$$anon$1.onError(DistinctUntilChangedByKeyOperator.scala:75)
        at monix.reactive.internal.builders.AsyncStateActionObservable.$anonfun$loop$1(AsyncStateActionObservable.scala:52)
        at monix.reactive.internal.builders.AsyncStateActionObservable$$Lambda$1872/24786665.apply(Unknown Source)
        at monix.eval.internal.StackFrame$RedeemWith.recover(StackFrame.scala:38)
        at monix.eval.internal.TaskRunLoop$.startFull(TaskRunLoop.scala:108)
        at monix.eval.internal.TaskRunLoop$.$anonfun$restartAsync$1(TaskRunLoop.scala:192)
        at monix.eval.internal.TaskRunLoop$$$Lambda$554/999999316.run(Unknown Source)
        at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Found 1 deadlock.

The interesting part of the above stacktrace is CombineLatest2Observable seems to lock a java.lang.Class for monix.reactive.internal.builders.CombineLatest2Observable.

According to the following code, CombineLatest2Observable uses self as a lock: https://github.com/monix/monix/blob/master/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/CombineLatest2Observable.scala#L85

While I don't understand why this causes locking java.lang.Class, I actually can reproduce this with the following code:

object Demo {
  def main(args: Array[String]): Unit = {
    import monix.eval.Task
    import monix.reactive.Observable
    import monix.execution.Scheduler.Implicits.global
    import scala.concurrent.duration._

    val l = (Observable.range(1, 3).delayExecution(1.second) ++
      Observable.raiseError(new RuntimeException("boom"))).dump("L")

    val r = Observable(1, 2).dump("R")
    l.combineLatest(r)
      .dump("U")
      .mapEval { _ =>
        Task(dumpLocks())
      }
      .doOnError { _ =>
        Task(dumpLocks())
      }
      .completedL
      .runSyncUnsafe()
  }

  private def dumpLocks(): Unit = {
    import java.lang.management.ManagementFactory
    val bean = ManagementFactory.getThreadMXBean
    val info = bean.getThreadInfo(bean.getAllThreadIds, true, true).toList
    info.map(x => x.getThreadName -> x.getLockedMonitors.toList).foreach {
      case (thread, monitors) if monitors.nonEmpty =>
        val locks = monitors.map { info =>
          info.toString + "(" + info.getLockedStackFrame.getFileName + ":" + info.getLockedStackFrame.getLineNumber + ")"
        }
        println(s"'$thread' holds [ ${locks.mkString(", ")} ] ")
      case _ =>
    }

  }
}

Output:

0: R --> 1
1: R --> 2
2: R completed
0: L --> 1
0: U --> (1,2)
'scala-execution-context-global-13' holds [ monix.reactive.internal.builders.CombineLatest2Observable@1eb6fd3b(CombineLatest2Observable.scala:132) ] 
'Monitor Ctrl-Break' holds [ java.io.InputStreamReader@d6871bd(StreamDecoder.java:178), java.io.InputStreamReader@d6871bd(BufferedReader.java:324) ] 
1: L --> 2
1: U --> (2,2)
'scala-execution-context-global-13' holds [ monix.reactive.internal.builders.CombineLatest2Observable@1eb6fd3b(CombineLatest2Observable.scala:132) ] 
'Monitor Ctrl-Break' holds [ java.io.InputStreamReader@d6871bd(StreamDecoder.java:178), java.io.InputStreamReader@d6871bd(BufferedReader.java:324) ] 
2: L --> java.lang.RuntimeException: boom
2: U --> java.lang.RuntimeException: boom
'scala-execution-context-global-13' holds [ java.lang.Class@cd3deb2(CombineLatest2Observable.scala:88) ] 
'Monitor Ctrl-Break' holds [ java.io.InputStreamReader@d6871bd(StreamDecoder.java:178), java.io.InputStreamReader@d6871bd(BufferedReader.java:324) ] 
Exception in thread "main" java.lang.RuntimeException: boom
	at Demo$.$anonfun$main$1(Demo.scala:75)
	at monix.reactive.internal.builders.DeferObservable.unsafeSubscribeFn(DeferObservable.scala:44)
	at monix.reactive.observables.ChainedObservable$.$anonfun$subscribe$1(ChainedObservable.scala:72)
	at monix.execution.internal.Trampoline.monix$execution$internal$Trampoline$$immediateLoop(Trampoline.scala:65)
	at monix.execution.internal.Trampoline.startLoop(Trampoline.scala:31)
	at monix.execution.schedulers.TrampolineExecutionContext$JVMOptimalTrampoline.startLoop(TrampolineExecutionContext.scala:146)
	at monix.execution.internal.Trampoline.execute(Trampoline.scala:38)
	at monix.execution.schedulers.TrampolineExecutionContext.execute(TrampolineExecutionContext.scala:65)
	at monix.execution.schedulers.BatchingScheduler.execute(BatchingScheduler.scala:50)
	at monix.execution.schedulers.BatchingScheduler.execute$(BatchingScheduler.scala:47)
	at monix.execution.schedulers.AsyncScheduler.execute(AsyncScheduler.scala:29)
	at monix.execution.schedulers.ExecuteExtensions.executeTrampolined(ExecuteExtensions.scala:86)
	at monix.execution.schedulers.ExecuteExtensions.executeTrampolined$(ExecuteExtensions.scala:85)
	at monix.execution.Scheduler$Extensions.executeTrampolined(Scheduler.scala:255)
	at monix.reactive.observables.ChainedObservable$.subscribe(ChainedObservable.scala:69)
	at monix.reactive.internal.operators.ConcatObservable$$anon$1.$anonfun$onComplete$1(ConcatObservable.scala:50)
	at monix.execution.Ack$AckExtensions$.syncOnContinue$extension(Ack.scala:111)
	at monix.reactive.internal.operators.ConcatObservable$$anon$1.onComplete(ConcatObservable.scala:50)
	at monix.reactive.internal.builders.RangeObservable.loop(RangeObservable.scala:57)
	at monix.reactive.internal.builders.RangeObservable.unsafeSubscribeFn(RangeObservable.scala:43)
	at monix.reactive.internal.operators.DelayExecutionByTimespanObservable$$anon$1.run(DelayExecutionByTimespanObservable.scala:36)
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

This seems to happen only when onError is called. onNext seems to lock an instance of monix.reactive.internal.builders.CombineLatest2Observable as I expected. IIUC, anyway it is not a good idea to acquire lock per observable rather than subscription, so I added changes to avoid observable-wide locks.

@rfkm rfkm force-pushed the rfkm:patch branch from f0ec892 to ed819fe Dec 14, 2018

@rfkm

This comment has been minimized.

Copy link
Contributor Author

commented Dec 14, 2018

I've checked bytecode of CombineLatest2Observable and found signalOnError is marked as static:

  public static final synchronized void monix$reactive$internal$builders$CombineLatest2Observable$$signalOnError$1(java.lang.Throwable, monix.reactive.observers.Subscriber, scala.runtime.BooleanRef, scala.runtime.ObjectRef);

though other methods are not static:

  public final synchronized void monix$reactive$internal$builders$CombineLatest2Observable$$signalOnComplete$1(monix.reactive.observers.Subscriber, scala.runtime.BooleanRef, scala.runtime.ObjectRef, scala.runtime.IntRef);
  public final scala.concurrent.Future monix$reactive$internal$builders$CombineLatest2Observable$$signalOnNext$1(java.lang.Object, java.lang.Object, monix.reactive.observers.Subscriber, scala.runtime.BooleanRef, scala.runtime.ObjectRef);

I'm not familiar with Scala compiler/language internal, but this might be a bug of the compiler. The compiler seems not to take self.synchronized into account when optimizing a nested method. In fact, I've confirmed adding a reference to value of self (like println(self)) into the signalOnError turns it into non-static.

@alexandru

This comment has been minimized.

Copy link
Member

commented Dec 14, 2018

Wow, thanks for investigating it. We need to fix it if that's the case.

scala-steward and others added some commits Dec 14, 2018

Update intervalWithFixedDelay scaladoc (#798)
* Update intervalWithFixedDelay scaladoc

* Update Observable.scala
fix scalacOptions (#796)
* fix scalacOptions

"-Ywarn-unused-import" removed since Scala 2.13.x

scala/scala@ad25805

* fix scalacOptions for Scala 2.13.x

some options removed since Scala 2.13.x
@alexandru

This comment has been minimized.

Copy link
Member

commented Dec 27, 2018

For some reason the Travis build is freezing. I've restarted it multiple times already.

Needs to be investigated.

@Avasil

This comment has been minimized.

Copy link
Collaborator

commented Dec 27, 2018

@alexandru I was looking into this a couple of days ago because of master failing and it is always during Iterant.resource does not require non-strict use with test coverage enabled but I couldn't replicate it locally.

iravid and others added some commits Jan 8, 2019

Observable buffers refactoring (#801)
* Refactoring of Observable buffers and OverflowStrategy

* Observable.create change signature

* Fix Mima reports

* Small adjustments

* Add forgotten default value for maxBatch

* Make OverflowStrategy.Unbounded final

* Fix Mima
@codecov

This comment has been minimized.

Copy link

commented Jan 12, 2019

Codecov Report

❗️ No coverage uploaded for pull request base (master@1096549). Click here to learn what that means.
The diff coverage is 89.8%.

@@            Coverage Diff            @@
##             master     #794   +/-   ##
=========================================
  Coverage          ?   90.41%           
=========================================
  Files             ?      426           
  Lines             ?    12323           
  Branches          ?     2273           
=========================================
  Hits              ?    11142           
  Misses            ?     1181           
  Partials          ?        0

@alexandru alexandru merged commit 77b504b into monix:master Jan 12, 2019

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
@rfkm

This comment has been minimized.

Copy link
Contributor Author

commented Jan 12, 2019

Thank you for merging!
JFYI, I sent a patch to Scala itself also: scala/scala#7593

@rfkm rfkm deleted the rfkm:patch branch Jan 12, 2019

@alexandru

This comment has been minimized.

Copy link
Member

commented Jan 12, 2019

Awesome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.