Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

macros: join! start by polling a different future each time poll_fn is polled #4624

Merged
merged 1 commit into from May 7, 2022

Conversation

PoorlyDefinedBehaviour
Copy link
Contributor

@PoorlyDefinedBehaviour PoorlyDefinedBehaviour commented Apr 17, 2022

Motivation

Fixes: #4612

Since tokio::join! runs the futures in the same task, the futures were are the same budget which makes it possible for one future to consume the whole budget leaving nothing for the other futures to consume.

Starvation example taken from #4612

    use std::{sync::Arc, time::Duration};
    use tokio::sync::Semaphore;

    #[tokio::main]
    async fn main() {
        let permits = Arc::new(Semaphore::new(10));

        tokio::join!(non_cooperative_task(permits), poor_little_task());
    }

    async fn non_cooperative_task(permits: Arc<Semaphore>) {
        loop {
            let permit = permits.clone().acquire_owned().await.unwrap();
            // uncommenting the following makes it work
            // tokio::time::sleep(Duration::from_millis(1)).await;
        }
    }
    
    async fn poor_little_task() {
        loop {
            tokio::time::sleep(Duration::from_secs(1)).await;
            // This println! never gets to run.
            println!("Hello!")
        }
    }

Solution

Start by polling a different future each time the future generated by poll_ fn is polled. This way, each future will get the chance to make progress even if there are futures that consume the whole budget.

Given futures A, B and C passsed to join!

tokio::join!(A, B, C);

The polling will look like this:

The first time the future generated by poll_fn is polled:

poll A
poll B
poll C

The second time:

poll B
poll C
poll A

The third time:

poll C
poll A
poll B

The fourth time (we are back at the start):

poll A 
poll B
poll C

Cargo expand example

#![feature(prelude_import)]
#[prelude_import]
use std::prelude::rust_2021::*;
#[macro_use]
extern crate std;
use std::{sync::Arc, time::Duration};
use tokio::sync::Semaphore;
fn main() {
    let body = async {
        let permits = Arc::new(Semaphore::new(10));
        {
            use ::tokio::macros::support::{maybe_done, poll_fn, Future, Pin};
            use ::tokio::macros::support::Poll::{Ready, Pending};
            let mut futures = (
                maybe_done(non_cooperative_task(permits)),
                maybe_done(poor_little_task()),
            );
            let mut skip_next_time: u32 = 0;
            poll_fn(move |cx| {
                const COUNT: u32 = 0 + 1 + 1;
                let mut is_pending = false;
                let mut to_run = COUNT;
                let mut skip = skip_next_time;
                skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 };
                loop {
                    if skip == 0 {
                        if to_run == 0 {
                            break;
                        }
                        to_run -= 1;
                        let (fut, ..) = &mut futures;
                        let mut fut = unsafe { Pin::new_unchecked(fut) };
                        if fut.poll(cx).is_pending() {
                            is_pending = true;
                        }
                    } else {
                        skip -= 1;
                    }
                    if skip == 0 {
                        if to_run == 0 {
                            break;
                        }
                        to_run -= 1;
                        let (_, fut, ..) = &mut futures;
                        let mut fut = unsafe { Pin::new_unchecked(fut) };
                        if fut.poll(cx).is_pending() {
                            is_pending = true;
                        }
                    } else {
                        skip -= 1;
                    }
                }
                if is_pending {
                    Pending
                } else {
                    Ready((
                        {
                            let (fut, ..) = &mut futures;
                            let mut fut = unsafe { Pin::new_unchecked(fut) };
                            fut.take_output().expect("expected completed future")
                        },
                        {
                            let (_, fut, ..) = &mut futures;
                            let mut fut = unsafe { Pin::new_unchecked(fut) };
                            fut.take_output().expect("expected completed future")
                        },
                    ))
                }
            })
            .await
        };
    };
    #[allow(clippy::expect_used)]
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .expect("Failed building the Runtime")
        .block_on(body);
}
async fn non_cooperative_task(permits: Arc<Semaphore>) {
    loop {
        let _permit = permits.clone().acquire_owned().await.unwrap();
    }
}
async fn poor_little_task() {
    loop {
        tokio::time::sleep(Duration::from_secs(1)).await;
        ::std::io::_print(::core::fmt::Arguments::new_v1(&["Hello!\n"], &[]))
    }
}

@PoorlyDefinedBehaviour PoorlyDefinedBehaviour marked this pull request as draft April 17, 2022 22:15
@PoorlyDefinedBehaviour PoorlyDefinedBehaviour changed the title macros: join! gives each task its own budget to avoid starvation. macros: join! start by polling a different future each time poll_fn is polled Apr 24, 2022
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-coop Module: tokio/coop M-macros Module: macros in the main Tokio crate labels Apr 24, 2022
// How many futures were passed to join!.
const FUTURE_COUNT: u32 = $crate::count!( $($count)* );
Copy link
Contributor

@Darksonn Darksonn Apr 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the join! macro previously worked with more than 64 branches, then this will break any such code. Please double check whether the old version had a branch limit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pass 125 branches to the old version without changing the recursion limit 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to make the new join! version accept 125 branches too and kept hitting the macro recursion limit. I think it its because of the way the join! input is normalized.

    // ===== Normalize =====

    (@ { ( $($s:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
        $crate::join!(@{ ($($s)* _) $($t)* ($($s)*) $e, } $($r)*)
    };

    // ===== Entry point =====

    ( $($e:expr),* $(,)?) => {
        $crate::join!(@{ () } $($e,)*)
    };

I implemented join! as a proc macro, it was easier to avoid the recursion limit this way. Would a proc macro be ok? (we are accepting way more branches at the moment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to avoid a procedural macro if possible. In general, I recommend you ask about an implementation strategy first before implementing it, especially if it's a larger change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend you ask about an implementation strategy first before implementing it, especially if it's a larger change

I think i read that in the CONTRIBUTING.md file, i will make sure to ask first next time. I chose to implement a proc macro without asking because i can use the practice if it is not accepted.

I went back to the version using just a declarative macro and was playing around with it the last couple of days but i'm pretty much stuck at the moment.

I keep hitting the recursion limit when i pass 125 futures to join!(the maximum supported without changing the recursion limit at the moment) because after join! normalizes its input, it is 126 levels deep in macro recursion and i need to call a macro (count!) once again.

Any ideas i could try?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about having the normalization count the number of branches?

         // One `_` for each branch in the `join!` macro. This is not used once
         // normalization is complete.
         ( $($count:tt)* )
+        // The expression `0+1+1+ ... +1` equal to the number of branches.
+        ( $($total:tt)* )
 
         // Normalized join! branches
         $( ( $($skip:tt)* ) $e:expr, )*
     // ===== Normalize =====
 
-    (@ { ( $($s:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
-        $crate::join!(@{ ($($s)* _) $($t)* ($($s)*) $e, } $($r)*)
+    (@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
+        $crate::join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
     };
 
     // ===== Entry point =====
 
     ( $($e:expr),* $(,)?) => {
-        $crate::join!(@{ () } $($e,)*)
+        $crate::join!(@{ () (0) } $($e,)*)
     };
 }

Then you could use the count in this way:

// Safety: nothing must be moved out of `futures`. This is to satisfy
// the requirement of `Pin::new_unchecked` called below.
let mut futures = ( $( maybe_done($e), )* );

let mut skip_next_time: u32 = 0;

poll_fn(move |cx| {
    const COUNT: u32 = $($total)*;

    let mut is_pending = false;

    let mut to_run = COUNT;
    let mut skip = skip_next_time;
    skip_next_time = if skip+1 == COUNT { 0 } else { skip+1 };

    loop {
    $(
        if skip == 0 {
            if to_run == 0 {
                break;
            }
            to_run -= 1;

            // Extract the future for this branch from the tuple.
            let ( $($skip,)* fut, .. ) = &mut futures;

            // Safety: future is stored on the stack above
            // and never moved.
            let mut fut = unsafe { Pin::new_unchecked(fut) };

            // Try polling
            if fut.poll(cx).is_pending() {
                is_pending = true;
            }
        } else {
            skip -= 1;
        }
    )*
    }

    ...
}).await

Also, I think the same issue exists for the try_join! macro, so if the above works, it would be good to fix that one too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works 👍

I should have thought about that 🤦

for _ in 0..FUTURE_COUNT {
$(
{
const INDEX: u32 = $($branch_index)*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually poll every future every time as far as I can tell, like the loop approach I suggested does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you could also add a test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test, i think the futures are getting polled in the expected order. Let me know if i am missing something.

I can switch to the approach you suggested if you prefer.

I added the cargo expand output to the PR description if you want to take a look.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I misread it. Anyway, the current loop is quite wasteful because you go through every future with a smaller index every time you poll something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

let mut how_many_times_i_got_to_run = 0;

for _ in 0..5 {
tokio::time::sleep(Duration::from_millis(100)).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to use time in tests, then you need to use Tokio's pause-time feature. Having a test that takes 100ms to run is way too long. Our test suite would take forever if we had many tests like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stopped using tokio::time

@Darksonn
Copy link
Contributor

Darksonn commented May 4, 2022

In general, I think this looks quite good! Are you able to add a comment on the skip variables that explain why they exist? (in both the join! and try_join! file)

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

@Darksonn Darksonn merged commit 1872a42 into tokio-rs:master May 7, 2022
36 checks passed
@PoorlyDefinedBehaviour PoorlyDefinedBehaviour deleted the issue_4612 branch May 7, 2022 14:51
crapStone pushed a commit to Calciumdibromid/CaBr2 that referenced this pull request Jun 11, 2022
This PR contains the following updates:

| Package | Type | Update | Change |
|---|---|---|---|
| [tokio](https://tokio.rs) ([source](https://github.com/tokio-rs/tokio)) | dependencies | minor | `1.18.2` -> `1.19.1` |
| [tokio](https://tokio.rs) ([source](https://github.com/tokio-rs/tokio)) | dev-dependencies | minor | `1.18.2` -> `1.19.1` |

---

### Release Notes

<details>
<summary>tokio-rs/tokio</summary>

### [`v1.19.1`](https://github.com/tokio-rs/tokio/releases/tag/tokio-1.19.1)

[Compare Source](tokio-rs/tokio@tokio-1.19.0...tokio-1.19.1)

##### 1.19.1 (June 5, 2022)

This release fixes a bug in `Notified::enable`. ([#&#8203;4747])

[#&#8203;4747]: tokio-rs/tokio#4747

### [`v1.19.0`](https://github.com/tokio-rs/tokio/releases/tag/tokio-1.19.0)

[Compare Source](tokio-rs/tokio@tokio-1.18.2...tokio-1.19.0)

##### 1.19.0 (June 3, 2022)

##### Added

-   runtime: add `is_finished` method for `JoinHandle` and `AbortHandle` ([#&#8203;4709])
-   runtime: make global queue and event polling intervals configurable ([#&#8203;4671])
-   sync: add `Notified::enable` ([#&#8203;4705])
-   sync: add `watch::Sender::send_if_modified` ([#&#8203;4591])
-   sync: add resubscribe method to broadcast::Receiver ([#&#8203;4607])
-   net: add `take_error` to `TcpSocket` and `TcpStream` ([#&#8203;4739])

##### Changed

-   io: refactor out usage of Weak in the io handle ([#&#8203;4656])

##### Fixed

-   macros: avoid starvation in `join!` and `try_join!` ([#&#8203;4624])

##### Documented

-   runtime: clarify semantics of tasks outliving `block_on` ([#&#8203;4729])
-   time: fix example for `MissedTickBehavior::Burst` ([#&#8203;4713])

##### Unstable

-   metrics: correctly update atomics in `IoDriverMetrics` ([#&#8203;4725])
-   metrics: fix compilation with unstable, process, and rt, but without net ([#&#8203;4682])
-   task: add `#[track_caller]` to `JoinSet`/`JoinMap` ([#&#8203;4697])
-   task: add `Builder::{spawn_on, spawn_local_on, spawn_blocking_on}` ([#&#8203;4683])
-   task: add `consume_budget` for cooperative scheduling ([#&#8203;4498])
-   task: add `join_set::Builder` for configuring `JoinSet` tasks ([#&#8203;4687])
-   task: update return value of `JoinSet::join_one` ([#&#8203;4726])

[#&#8203;4498]: tokio-rs/tokio#4498

[#&#8203;4591]: tokio-rs/tokio#4591

[#&#8203;4607]: tokio-rs/tokio#4607

[#&#8203;4624]: tokio-rs/tokio#4624

[#&#8203;4656]: tokio-rs/tokio#4656

[#&#8203;4671]: tokio-rs/tokio#4671

[#&#8203;4682]: tokio-rs/tokio#4682

[#&#8203;4683]: tokio-rs/tokio#4683

[#&#8203;4687]: tokio-rs/tokio#4687

[#&#8203;4697]: tokio-rs/tokio#4697

[#&#8203;4705]: tokio-rs/tokio#4705

[#&#8203;4709]: tokio-rs/tokio#4709

[#&#8203;4713]: tokio-rs/tokio#4713

[#&#8203;4725]: tokio-rs/tokio#4725

[#&#8203;4726]: tokio-rs/tokio#4726

[#&#8203;4729]: tokio-rs/tokio#4729

[#&#8203;4739]: tokio-rs/tokio#4739

</details>

---

### Configuration

📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined).

🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied.

 **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about these updates again.

---

 - [x] <!-- rebase-check -->If you want to rebase/retry this PR, click this checkbox.

---

This PR has been generated by [Renovate Bot](https://github.com/renovatebot/renovate).

Co-authored-by: cabr2-bot <cabr2.help@gmail.com>
Reviewed-on: https://codeberg.org/Calciumdibromid/CaBr2/pulls/1394
Reviewed-by: crapStone <crapstone@noreply.codeberg.org>
Co-authored-by: Calciumdibromid Bot <cabr2_bot@noreply.codeberg.org>
Co-committed-by: Calciumdibromid Bot <cabr2_bot@noreply.codeberg.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-coop Module: tokio/coop M-macros Module: macros in the main Tokio crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Fairness problem with semaphore acquire
2 participants