Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async RxJava3 call adapter doesn't produce error event when request is canceled by timeout #3524

Open
alapshin opened this issue Feb 23, 2021 · 4 comments

Comments

@alapshin
Copy link

Test case/sample:
https://gist.github.com/alapshin/a60540a1a128c0af042b7fe427b9de88

Description

  1. OkHttp client is configured to use arbitrary call timeout
  2. Retrofit's RxJava3 call adapter is created via RxJava3CallAdapterFactory.create()

Result
When call time outs Rx stream doesn't receive any event and remains active

Expected result
When call time outs Rx stream receives error event and completes

Additional info
If RxJava3 call adapter is created via RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io) then call cancellation by timeout produces exception which is propagated to stream

java.io.InterruptedIOException: timeout
	at okhttp3.internal.connection.RealCall.timeoutExit(RealCall.kt:398)
	at okhttp3.internal.connection.RealCall.callDone(RealCall.kt:360)
	at okhttp3.internal.connection.RealCall.noMoreExchanges$okhttp(RealCall.kt:325)
	at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:209)
	at okhttp3.internal.connection.RealCall.execute(RealCall.kt:154)
	at retrofit2.OkHttpCall.execute(OkHttpCall.java:204)
	at retrofit2.adapter.rxjava3.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:46)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13127)
	at retrofit2.adapter.rxjava3.BodyObservable.subscribeActual(BodyObservable.java:35)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13127)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
	at io.reactivex.rxjava3.core.Scheduler$DisposeTask.run(Scheduler.java:644)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:65)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:56)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Canceled
	at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:72)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
	at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201)
	... 16 more

There is also somewhat related discussion at #3453 with comment
with comment #3453 (comment) which points to possible cause of this difference in behavior.

@purnaPrasanth
Copy link

Seems like the issue happens when thread switching happens while onFailure is being executed. To demonstrate the case deterministically I have created a sample snippet,

import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.Single
import io.reactivex.disposables.Disposable
import io.reactivex.exceptions.CompositeException
import io.reactivex.exceptions.Exceptions
import io.reactivex.plugins.RxJavaPlugins
import io.reactivex.schedulers.Schedulers
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit


fun main(args: Array<String>) {
    val countDownLatch = CountDownLatch(1)

    val disposable = SampleObservable()
        .subscribeOn(Schedulers.io())
        .subscribe(
            {
                println("Emitted")
                countDownLatch.countDown()
            },
            {
                println("Errored")
                countDownLatch.countDown()
            }
        )

    Single.just(Unit)
        .delay(3000, TimeUnit.MILLISECONDS, Schedulers.io())
        .subscribe({ disposable.dispose() }, {})

    countDownLatch.await()
}

class SampleObservable : Observable<Unit>() {
    override fun subscribeActual(observer: Observer<in Unit>?) {
        observer?.onSubscribe(SampleCallback(observer))
    }

    private inner class SampleCallback(private val observer: Observer<in Unit>) : Disposable {
        @Volatile
        private var isDisposed: Boolean = false

        init {
            sendError(IllegalStateException("Sample Exception"))
        }

        override fun dispose() {
            isDisposed = true
        }

        override fun isDisposed(): Boolean = isDisposed

        private fun sendError(exception: Exception) {
            if (!isDisposed) {
                val executorService = Executors.newSingleThreadScheduledExecutor()
                executorService.schedule(
                    {
                        try {
                            observer.onError(exception)
                        } catch (inner: Throwable) {
                            Exceptions.throwIfFatal(inner)
                            RxJavaPlugins.onError(CompositeException(exception, inner))
                        }
                    },
                    5,
                    TimeUnit.SECONDS
                )
            }
        }
    }
}

The fix would be to synchronise read and writes to isDisposed and observer.

This is the cause for one of the top trending crashes in our product. Is the fix already in pipeline? Please advice any alternatives if possible. Thanks.

@dbriggsDFleet
Copy link

dbriggsDFleet commented Jan 26, 2022

Is there any negative side effect of just leveraging, RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io) instead of the default .create()? Seems important that timeouts would be sent downstream instead of hanging.

EDIT: It's also a significant regression from the rxjava2 call adapter, no?

@optisamit
Copy link

Apparently this is happening due to this line:

@tomridder
Copy link

in okhttp,when timeout,it will cancel all the call
so in CallEnqueueObervale.java

if (call.isCanceled()) return;

then ,it will never catch the exception

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants