Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-implement scheduler's related code #277

Closed
victimsnino opened this issue Sep 25, 2022 · 3 comments
Closed

Re-implement scheduler's related code #277

victimsnino opened this issue Sep 25, 2022 · 3 comments
Assignees
Labels
enhancement New feature or request

Comments

@victimsnino
Copy link
Owner

victimsnino commented Sep 25, 2022

UPD:

  1. Split TrampolineScheduler into two:
    1. TrampolineScheduler which JUST TRAMPOLINE with LOCAL queue to just schedule "schedulable after current schedulable completed"
    2. CurrentThreadScheduler = TrampolineScheduler with thread_local queues
  2. Re-implement scheduler's based operators in the following way:
    1. Instead of "just scheduling" each emission to scheduler it needs to have local queue
    2. On new emission push emission to queue
    3. If queue is empty, then schedule draining of queue to scheduler
    4. If queue is not empty, then just do nothing due to "someone else" just scheduled draining
    5. Recursively re-schedule same schedulable to drain queue while queue is not empty.
  3. (????) NewThread/RunLoop and etc should store their queues as "CurrentThread" or not?
@victimsnino
Copy link
Owner Author

victimsnino commented Nov 30, 2022

Forbid trampoline scheduler to be scheduler for operators!

Operator's scheduler can't be trampoline! Why? Because of trampoline is "thread local" scheduler. But values in observable can be sent from multiple threads. Serialized, but multithreaded. And if you would schedule emissions in "thread local" scheduler, then emissions would become multithreaded and non-serialized again.

It is why i've mentioned it in RxCpp repository: ReactiveX/RxCpp#593

For example:

 rpp::source::just(rpp::schedulers::trampoline{}, 1, 1, 1)
            .merge_with(rpp::source::just(rpp::schedulers::new_thread{}, 2, 2, 2))
            .observe_on(rpp::schedulers::trampoline{})
            .as_blocking()
            .subscribe([](int v)
            {
                std::cout << "==================\n" << std::this_thread::get_id() << " START " << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds{1});
                std::cout << v << std::endl;
                std::cout << std::this_thread::get_id() << " END " << std::endl << "==================\n\n";
            });

Can produce something like:

==================
12872 START
==================
36576 START
2
36576 END
==================

1
12872 END
==================

==================
36576 START
2
36576 END
==================

==================
36576 START
==================
12872 START
1
12872 END
==================

2
36576 END
==================

==================
12872 START
1
12872 END
==================

So, it would be forbidden to use trampoline scheduler in operators!

If you have some good points why it is incorrect - feel free to submit issue mentioning this one =)

UPD:
Looks like in this case it is valid case:

rpp::source::just(1,2,3,4,5,6)
                .delay(delay_duration, rpp::schedulers::trampoline{})

but in this case just not =C

rpp::source::just(1,2,3,4,5,6)
                .merge_with(rpp::source::just(rpp::schedulers::new_thread{}, 1,2,3,4)
                .delay(delay_duration, rpp::schedulers::trampoline{})

@victimsnino victimsnino changed the title Add "default schedulers" for each scheduler dependent operator Add "default schedulers" and forbid trampoline scheduler for operators Nov 30, 2022
@tcw165
Copy link
Contributor

tcw165 commented Nov 30, 2022

I see the problem. But I feel the problem is probably inside the trampoline implementation that I initially wrote. Let me explain:

rpp::source::just(1,2,3,4,5,6)
                .merge_with(rpp::source::just(rpp::schedulers::new_thread{}, 1,2,3,4)
                .delay(delay_duration, rpp::schedulers::trampoline{})

The trampoline's behavior in the existing code schedule the emission of observable in merge_with() in the thread from ::new_thread{}, which breaks the rule of serialization.

But the ideal expectation is supposed, we should remember the thread local queue of the thread that we create the trampoline scheduler. In this case, it's the host thread for .delay(delay_duration, rpp::schedulers::trampoline{}). And then no matter what thread schedule in which the upstream is, the trampoline scheduler should schedule the emission in that remembered host thread's message queue.

That way the trampoline follows the rule of serialized outputs and be useful for unit tests.

@victimsnino
Copy link
Owner Author

victimsnino commented Dec 1, 2022

Looks like i've found solution!
rpp::schedulers::trampoline should has kind of weak_ptr to queue inside it. and algorithm should be next:

  1. when we schedule action to trampoline we are checking for weak_ptr. If it exists -> then we need to schedule to this one and return due to "someone else just own this queue"
  2. if no any active weak_ptr, then we should check for thread_local queue. It it exists, then set it to current weak_ptr and return due to "current thread just own this queue"
  3. if no weak_ptr and no thread_local, then create thread_local, set it to weak_ptr and start to drain queue till the end

Then in this case:

rpp::source::just(1,2,3,4,5,6)
                .delay(delay_duration, rpp::schedulers::trampoline{})

it would be thread_local everytime.
While in this case:

rpp::source::just(1,2,3,4,5,6)
                .merge_with(rpp::source::just(rpp::schedulers::new_thread{}, 1,2,3,4)
                .delay(delay_duration, rpp::schedulers::trampoline{})

it can be any:

  1. thread_local of first just. For example, if first just reaches delay faster. Then any schedulables from any thread would be processed by "thread_local" queue of first thread.
  2. queue from new_thread of "merge_with just" (this queue should be considered as "thread_local"). For example, if just from "merge_with" reaches delay faster. hen any schedulables from any thread would be processed by "new_thread" queue of new thread.
  3. queue can be even changed between schedulings. For example, if all emissions from first thread would happens before any emission from "new_thread". then weak_ptr would be destroyed when no any schedulables and would be set to queue from new_thread when it reaches.

@tcw165, what do you think?

BTW @kirkshoop, if you see it, also interested in your opinion =)

UPD: After a lot of investigation of this issue looks like i've found correct solution:
delay/observe_on and other scheduler-dependent operators should not "just schedule emissions to provided scheduler" but:

  1. Have local (saved in worker) queue
  2. when new emission arrived, then it pushed to queue
  3. If queue was empty before emission, then schedule to scheduler to drain one action from this queue
  4. If queue was not empty, then "someone" just scheduled action to drain queue, so, we don't need any extra actions
  5. Schedulable just extracts 1 emission, process it and check if any other items in queue:
    a) if yes, then re-schedule self to process next emission

As a result, while we have "non empty queue", then we use same scheduler. If queue becomes empty, then next "scheduling" would really use scheduler and can change thread if needed

@victimsnino victimsnino changed the title Add "default schedulers" and forbid trampoline scheduler for operators Re-implement scheduler's related code Dec 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants