Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observer with observeOn on Schedulers.currentThread is not called on currentThread #370

Closed
Sagaris opened this issue Jan 4, 2014 · 9 comments

Comments

@Sagaris
Copy link

Sagaris commented Jan 4, 2014

I'm new to RxJava so forgive me if I made the wrong assumption.
I assumed from the RxJava documentation that the Observer is called from the thread which is set by the observeOn call of an Observable.

In my case I set the Schedulers as follows:
.subscribeOn(Schedulers.threadPoolForIO()).observeOn(Schedulers.currentThread())

So I was expecting that the onNext is called from the current thread but it appears that onNext is called from the "Retrofit-Idle" thread instead.

It seems that the observeOn is completely ignored.
Is this as designed or is this an error?

@JakeWharton
Copy link
Member

This is by design. It's your job to observe on whatever thread you want.

fooService.doSomething("Hi!")
  .map({ o -> o.getList() })
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe({ adapter.setList(it); })

This is documented on RestAdapter and on the website:

Observable requests are subscribed asynchronously and observed on the same thread that executed the HTTP request. To observe on a different thread (e.g. Android's main thread) call observeOn(Scheduler) on the returned Observable.

@JakeWharton
Copy link
Member

Are you on the JVM? I'm not familiar with the semantics of exactly how Schedulers.currentThread() works since I've never used it.

Retrofit shouldn't exhibit any behaviors that are different from normal RxJava. Internally we are just calling Observable.create(..).observeOn(..) and then returning it.

@Sagaris
Copy link
Author

Sagaris commented Jan 4, 2014

I dug a litter deeper. If I do not use the Observable return type from a service call and then wrap the response in an Observable, the observeOn and subcribeOn are performed on the correct threads.

If I use the Observable return type from a service call then the onNext is not performed on the expected thread set by the observeOn(Schedulers.currentThread) but on the "Retrofit-idle" thread instead.

Maybe I'm doing something wrong.

@loganj
Copy link
Collaborator

loganj commented Jan 4, 2014

Retrofit is behaving correctly, but RxJava can be confusing. Schedulers.currentThread() is badly named and does not do what you think it does. It does not refer to the thread on which you call observeOn(); it just means that no thread-hopping will be performed when the observer is called. I made the same mistake when starting with RxJava myself.

Generally speaking you want subscribeOn(AndroidThreads.mainThread()). If you're in a background thread when you make the call and you want to process the response on that same thread, try AndroidSchedulers.handlerThread(Handler).

@Sagaris
Copy link
Author

Sagaris commented Jan 5, 2014

Thank you loganj. I did indeed made the same mistake.

@bhargavms
Copy link

bhargavms commented Oct 20, 2016

@loganj what if you are on JVM and therefore do not have access to AndroidThreads.mainThread() ? Ideally I would like to create a scheduler that basically runs on the thread on which .subscribe is called on

@guelo
Copy link

guelo commented Jul 17, 2017

To answer the JVM question, you need an Executor that executes the runnables back on the calling thread. One solution is posting to a BlockingQueue that's being consumed on the caller's thread. Something like this (kotlin),

  val tasks = LinkedBlockingQueue<Runnable>()

  Observable.fromCallable {"Hello" }
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.from(Executor { runnable -> tasks.add(runnable) }))
    .subscribe { println("Back on the original thread:  " + Thread.currentThread().name) }
		
  tasks.take().run()

@bhargavms
Copy link

@guelo so that just basically blocks the calling thread until, tasks.take() returns which is when the observable emits.

@guelo
Copy link

guelo commented Jul 18, 2017

Correct. You need some kind of strategy to allow the thread to cede control and execute work asynchronously posted from another thread. In UI and game code that's usually provided for you as an event loop, which is what AndroidSchedulers.mainThread() hooks into. If you're doing your own asynchronous callbacks you have to come up with your own strategy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants