Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable `concatMap` / `mapTask` trigger invalid state warning on `raiseError` children #335

Closed
arlequin-nyc opened this issue Mar 23, 2017 · 5 comments

Comments

Projects
None yet
2 participants
@arlequin-nyc
Copy link

commented Mar 23, 2017

@alexandru

The context of this is that I have a flatMap on an Observable where under specific circumstances i return an Observable.raiseError(CustomException) to trigger an error on the observable and therefore have it shut down.
I figured more sensible approach is to use raiseError rather than throw an exception ... cost wise.
Is this not the right approach to fail an Observable without throwing?

This is only for the scenario where i know i need to stop the stream and i know what is wrong vs an unforeseeable pop-thru-the-stack exception.

My exception is "TimeoutError" and when i provide this to raiseError inside a flatMap I get the following:

	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-03-23 14:43:31.585 [monix-computation-66] ERROR c.a.a.w.a.q.e.i.QueryObserver - Error received on [query] rxStream.
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long
	at com.agoda.adp.wf.api.queryengine.executors.internal.cassandra.CassandraQueryEngine.resultSet2IndexStoreColumnObs(CassandraQueryEngine.scala:136)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at com.agoda.adp.wf.api.queryengine.executors.internal.cassandra.CassandraQueryEngine$$anonfun$1$$anonfun$apply$3.apply(CassandraQueryEngine.scala:43)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at com.agoda.adp.wf.api.queryengine.executors.internal.cassandra.CassandraQueryEngine$$anonfun$1$$anonfun$apply$3.apply(CassandraQueryEngine.scala:42)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:125)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-03-23 14:43:31.586 [AS-akka.actor.default-dispatcher-14] ERROR c.a.a.w.a.q.e.i.InternalQueryExecutor - Query finished with a failure
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.IllegalStateException: State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid, please send a bug report! See https://monix.io
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.reportInvalidState(ConcatMapObservable.scala:332)
	at monix.reactive.internal.operators.ConcatMapObservable$ConcatMapSubscriber.onNext(ConcatMapObservable.scala:246)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$signalNext(AbstractBackPressuredBufferedSubscriber.scala:123)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.monix$reactive$observers$buffers$AbstractBackPressuredBufferedSubscriber$$anon$$fastLoop(AbstractBackPressuredBufferedSubscriber.scala:199)
	at monix.reactive.observers.buffers.AbstractBackPressuredBufferedSubscriber$$anon$1.run(AbstractBackPressuredBufferedSubscriber.scala:118)
	at monix.execution.internal.forkJoin.AdaptedForkJoinTask.exec(AdaptedForkJoinTask.scala:27)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)```

Let me know if there is anything i can further assist with.
@alexandru

This comment has been minimized.

Copy link
Member

commented Mar 23, 2017

@hrlqn sorry about this, the flatMap implementation has been optimized in ~ January / February with what is a difficult implementation. And optimizations are hard, as one leaves the safety of concurrency primitives that are reasonable :-)

Unfortunately you've hit a branch that I couldn't predict and that wasn't caught in testing. It doesn't do much, other than complaining and stopping the source and if an error happens the source gets stopped anyway.

I will issue a fix and will trigger a release either today or tomorrow, since I have another bug fix that needs to be released.

Sorry about the incovenience and thanks for reporting it 💌

@alexandru alexandru closed this in 3af70f1 Mar 23, 2017

@alexandru alexandru changed the title State WaitComplete(Some(com.agoda.adp.wf.api.queryengine.dto.response.TimeoutError: Query taking too long),null) in the Monix ConcatMap.onNext implementation is invalid Observable `concatMap` / `mapTask` trigger invalid state warning on `raiseError` children Mar 23, 2017

@alexandru

This comment has been minimized.

Copy link
Member

commented Mar 23, 2017

@hrlqn I have pushed the fix on master and added tests to catch it in the future.

Will be available in the next release.

@arlequin-nyc

This comment has been minimized.

Copy link
Author

commented Mar 23, 2017

Mersi @alexandru esti super rapid! (Thank you Alexandru, impressive turnaround!)

@alexandru

This comment has been minimized.

Copy link
Member

commented Mar 27, 2017

@hrlqn version 2.2.4 is out

@arlequin-nyc

This comment has been minimized.

Copy link
Author

commented Mar 29, 2017

@alexandru many thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.