Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentStream usage with tokio leads to ACCESS_VIOLATION #182

Closed
inklesspen1rus opened this issue Apr 19, 2024 · 4 comments · Fixed by #187
Closed

ConcurrentStream usage with tokio leads to ACCESS_VIOLATION #182

inklesspen1rus opened this issue Apr 19, 2024 · 4 comments · Fixed by #187

Comments

@inklesspen1rus
Copy link

Tried to use concurrent streams to sleep in parallel with tokio:

use core::time::Duration;
use futures_concurrency::prelude::*;
use tokio;
use futures::prelude::*;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            tokio::time::sleep(Duration::from_secs(x as _)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

But sometimes I get crash:

> cargo run
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.15s
     Running `target\debug\rstestss.exe`
error: process didn't exit successfully: `target\debug\rstestss.exe` (exit code: 0xc0000005, STATUS_ACCESS_VIOLATION)

Without "current_thread" flavor program just freeze

Other runtimes work fine:
async_std

use core::time::Duration;
use async_std;
use futures_concurrency::prelude::*;
use futures::prelude::*;

#[async_std::main]
async fn main() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            async_std::task::sleep(Duration::from_secs(x)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

smol

use core::time::Duration;
use futures_concurrency::prelude::*;
use futures::prelude::*;

async fn main_async() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            smol::Timer::after(Duration::from_secs(x)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

fn main() {
    smol::block_on(main_async());
}

Also tokio runtime with smol Timer works fine:

use core::time::Duration;
use futures_concurrency::prelude::*;
use tokio;
use futures::prelude::*;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let s1 = stream::iter([1, 2]);
    let s2 = stream::iter([1, 2]);
    let s3 = stream::iter([1, 2]);

    (s1, s2, s3)
        .chain()
        .co()
        .map(|x| async move {
            // tokio::time::sleep(Duration::from_secs(x as _)).await;
            smol::Timer::after(Duration::from_secs(x)).await;
            ()
        })
        .for_each(|_| async {})
        .await;
}

Is that Tokio issue?

@matheus-consoli
Copy link
Collaborator

Interesting, thanks for reporting it
It seems that you're running it on Windows, right? - I only have Linux available, and I didn't find the ACCESS_VIOLATION error - but it freezes on every run (while the others runtimes run as you pointed out). @yoshuawuyts you run on Windows, right?
I'll investigate some more, but my initial guess is that it is probably a problem coming from tokio

@taiki-e
Copy link

taiki-e commented Apr 19, 2024

From rust-lang/futures-rs#2851 (comment):

I don't intend to review everything of it, but it has bunch of very suspicious unsafe codes even at a cursory glance: This would be unsound when vec, the storage of slab, is reallocated. This probably has the same problem as tokio-rs/tokio#2612. The heavy use of ManuallyDrop around Pin API reminds me of async-rs/async-std#903. etc.

IIRC, the tokio timer is !Unpin and the others are Unpin, so it is probably only the tokio timer that is affected by the first unsoundness in them.

@conradludgate
Copy link
Contributor

conradludgate commented Apr 20, 2024

I also noticed the unsoundness when looking through the code just now. I have put together a minimal test repro for the pin unsoundness in this ConcurrentStream vec collect impl.

use std::{future::Future, marker::PhantomPinned, pin::pin, task::Poll};

use futures_concurrency::{
    concurrent_stream::{ConcurrentStream, Consumer, ConsumerState},
    future::Race,
};
use futures_executor::block_on;
use pin_project::pin_project;

#[pin_project]
struct PinCheck {
    addr: usize,
    #[pin]
    _pinned: PhantomPinned,
}

impl PinCheck {
    fn new() -> Self {
        Self {
            addr: 0,
            _pinned: PhantomPinned,
        }
    }
}

impl Future for PinCheck {
    type Output = ();

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Self::Output> {
        let this = self.project();
        let addr = this.addr as *mut usize as usize;
        if *this.addr == 0 {
            cx.waker().wake_by_ref();
            *this.addr = addr;
            Poll::Pending
        } else {
            assert_eq!(*this.addr, addr, "pinned value was moved.");
            Poll::Ready(())
        }
    }
}

struct Tricky;

impl ConcurrentStream for Tricky {
    type Item = ();

    type Future = PinCheck;

    async fn drive<C>(self, consumer: C) -> C::Output
    where
        C: Consumer<Self::Item, Self::Future>,
    {
        let mut consumer = pin!(consumer);
        for _ in 0..64 {
            match consumer.as_mut().send(PinCheck::new()).await {
                ConsumerState::Break => return consumer.flush().await,
                ConsumerState::Continue => continue,
                ConsumerState::Empty => unreachable!(),
            }
        }

        let progress = async { Some(consumer.as_mut().progress().await) };
        let noop = async { None };

        // poll progress once.
        assert!((progress, noop).race().await.is_none());

        // push new entry, reallocs internal futures slab.
        // this moves the futures and violates pinning.
        consumer.as_mut().send(PinCheck::new()).await;

        consumer.flush().await
    }

    fn concurrency_limit(&self) -> Option<std::num::NonZeroUsize> {
        todo!()
    }
}

#[test]
fn it_works() {
    block_on(async { Tricky.collect::<Vec<()>>().await });
}

this currently outputs

thread 'tests::it_works' panicked at src/lib.rs:49:17:
assertion `left == right` failed: pinned value was moved.
  left: 5830115848
 right: 5830117896

@yoshuawuyts
Copy link
Owner

yoshuawuyts commented Apr 23, 2024

I've been out on sick leave for the past several days. Just wanted to quickly acknowledge this is indeed an issue we should fix.

I want to thank @inklesspen1rus for reporting this, and I wanted to thank everyone else in this thread helping narrow this issue down.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants