Skip to content
This repository has been archived by the owner on Jul 9, 2022. It is now read-only.

会出现无法结束的情况,频率不是很高,但是也不低,十次可能就出现一次 #1

Open
nkbai opened this issue Feb 13, 2020 · 15 comments

Comments

@nkbai
Copy link

nkbai commented Feb 13, 2020

下面是我的实现:
做了一下修改:

  1. 不需要&mut self的地方一律改为了&self
  2. 加入名字调试信息,纯粹是为了方便调试
    出现了这种情况:
    关于 tokio我还不是特别熟悉,需要探索为什么会出现这种情况。
    我修改代码直接用你原始的版本,仍然会出现卡主(wg.await不返回),无法结束的情况
start_wg2 wait group polled.
start_wg2 wait group polled.
start_wg2 all done
start_wg2 wait group polled.
//! # async-wg
//!
//! Async version WaitGroup for RUST.
//!
//! ## Examples
//!
//! ```rust
//! #[tokio::main]
//! async fn main() {
//!     use async_wg::WaitGroup;
//!
//!     // Create a new wait group.
//!     let wg = WaitGroup::new();
//!
//!     for _ in 0..10 {
//!         let mut wg = wg.clone();
//!         // Add count n.
//!         wg.add(1).await;
//!
//!         tokio::spawn(async move {
//!             // Do some work.
//!
//!             // Done count 1.
//!             wg.done().await;
//!         });
//!     }
//!
//!     // Wait for done count is equal to add count.
//!     wg.await;
//! }
//! ```
//!
//! ## Benchmarks
//!
//! Simple benchmark comparison run on github actions.
//!
//! Code: [benchs/main.rs](https://github.com/jmjoy/async-wg/blob/master/benches/main.rs)
//!
//! ```text
//! test bench_join_handle ... bench:      34,485 ns/iter (+/- 18,969)
//! test bench_wait_group  ... bench:      36,916 ns/iter (+/- 7,555)
//! ```
//!
//! ## License
//!
//! The Unlicense.
//!

use futures_util::lock::Mutex;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};

#[derive(Clone)]
/// Enables multiple tasks to synchronize the beginning or end of some computation.
pub struct WaitGroup {
    inner: Arc<Inner>,
    name: String, //for test
}

struct Inner {
    started: AtomicBool,
    count: Mutex<isize>,
    waker: Mutex<Option<Waker>>,
}

impl WaitGroup {
    /// Creates a new wait group and returns the single reference to it.
    ///
    /// # Examples
    /// ```rust
    /// use async_wg::WaitGroup;
    /// let wg = WaitGroup::new();
    /// ```
    pub fn new(name: String) -> WaitGroup {
        WaitGroup {
            inner: Arc::new(Inner {
                started: AtomicBool::new(false),
                count: Mutex::new(0),
                waker: Mutex::new(None),
            }),
            name,
        }
    }

    /// Add count n.
    ///
    /// # Panic
    /// 1. The argument `delta` must be a non negative number (> 0).
    /// 2. The max count must be less than `isize::max_value()`.
    pub async fn add(&self, delta: isize) {
        if delta < 0 {
            panic!("The argument `delta` of wait group `add` must be a positive number");
        }
        if self
            .inner
            .started
            .load(std::sync::atomic::Ordering::Relaxed)
        {
            panic!("{} cannot add after started.", self.name);
        }
        let mut count = self.inner.count.lock().await;
        *count += delta;

        if *count >= isize::max_value() {
            panic!("{} wait group count is too large", self.name);
        }
    }

    /// Done count 1.
    pub async fn done(&self) {
        let mut count = self.inner.count.lock().await;
        *count -= 1;
        if *count < 0 {
            panic!("{} done must equal add", self.name);
        }
        if *count == 0 {
            println!("{} all done", self.name);
            if let Some(waker) = &*self.inner.waker.lock().await {
                waker.clone().wake();
            } else {
                println!("{} done before any await", self.name);
            }
        }
    }

    /// Get the inner count of `WaitGroup`, the primary count is `0`.
    pub async fn count(&self) -> isize {
        *self.inner.count.lock().await
    }
}

impl Future for WaitGroup {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        self.inner
            .started
            .store(true, std::sync::atomic::Ordering::SeqCst);
        println!("{} wait group polled.", self.name);
        let mut count = self.inner.count.lock();
        let pin_count = Pin::new(&mut count);
        if let Poll::Ready(count) = pin_count.poll(cx) {
            if *count <= 0 {
                println!("{} ready", self.name);
                return Poll::Ready(());
            }
        }
        drop(count);

        let mut waker = self.inner.waker.lock();
        let pin_waker = Pin::new(&mut waker);
        if let Poll::Ready(mut waker) = pin_waker.poll(cx) {
            *waker = Some(cx.waker().clone());
        }

        Poll::Pending
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::main]
    #[test]
    async fn add_zero() {
        let wg = WaitGroup::new();
        wg.add(0).await;
        //        wg.done().await;
        wg.await
    }

    #[tokio::test]
    #[should_panic]
    async fn add_neg_one() {
        let wg = WaitGroup::new();
        wg.add(-1).await;
    }

    #[tokio::main]
    #[test]
    #[should_panic]
    async fn add_very_max() {
        let wg = WaitGroup::new();
        wg.add(isize::max_value()).await;
    }

    #[tokio::main]
    #[test]
    async fn add() {
        let wg = WaitGroup::new();
        wg.add(1).await;
        wg.add(10).await;
        assert_eq!(*wg.inner.count.lock().await, 11);
    }

    #[tokio::main]
    #[test]
    #[should_panic]
    async fn done() {
        let wg = WaitGroup::new();
        wg.done().await;
        wg.done().await; //done次数超过了await
        assert_eq!(*wg.inner.count.lock().await, -2);
    }

    #[tokio::main]
    #[test]
    async fn count() {
        let wg = WaitGroup::new();
        assert_eq!(wg.count().await, 0);
        wg.add(10).await;
        assert_eq!(wg.count().await, 10);
        wg.done().await;
        assert_eq!(wg.count().await, 9);
    }
    #[tokio::main]
    #[test]
    async fn can_quit() {
        let wg = WaitGroup::new();
        assert_eq!(wg.count().await, 0);
        wg.add(1).await;
        let wg2 = wg.clone();
        tokio::spawn(async move {
            tokio::time::delay_for(tokio::time::Duration::from_millis(10)).await;
            wg2.done().await;
        });
        wg.await
    }
}
nkbai pushed a commit to nkbai/learnrustbynats that referenced this issue Feb 13, 2020
原来的实现有bug,会导致卡死的情况出现
jmjoy/async-wg#1
@jmjoy
Copy link
Owner

jmjoy commented Feb 13, 2020

有没有什么例子可以重现吗?

@nkbai
Copy link
Author

nkbai commented Feb 14, 2020

start_wg2 wait group polled.
start_wg2 wait group polled.
start_wg2 all done
start_wg2 wait group polled.
这就一个精简的例子啊, 这个await没有返回,关于wait group的调用日志是这个

@jmjoy
Copy link
Owner

jmjoy commented Feb 14, 2020

@nkbai 代码示例有吗?

@nkbai
Copy link
Author

nkbai commented Feb 15, 2020

关键在main函数中,

@jmjoy
Copy link
Owner

jmjoy commented Feb 16, 2020

@nkbai 现在master的代码呢?

@nkbai
Copy link
Author

nkbai commented Feb 16, 2020

我自己重新实现了一个简单的wait group,完全没有按照你的思路来,所以就没问题了

@laizy
Copy link

laizy commented Feb 22, 2020

闲逛无意间看了下源码,api没设计好。其中的一点是没借用RIIA,这很可能也是导致上面的代码失败的原因。
需要注意的是:aysnc函数可以在任何一个await点提前终止。 下面是一个简单的例子导致程序卡死:

use async_std::prelude::*;
use async_std::task;
use async_wg::WaitGroup;
use std::time::Duration;

fn main() {
	task::block_on( async {
		let wg = WaitGroup::new();
		for _ in 0..10isize {
			let mut wg = wg.clone();
			// Add count n.
			wg.add(1isize).await;

			let fut = async move {
				println!("sleeping");
				task::sleep(Duration::from_secs(4)).await; // 到这个await可以退出
				println!("never printed");      // 这里不执行,不会打印
				wg.done().await;                   // 这里也就不会调用
			};
			task::spawn(fut.race(task::sleep(Duration::from_secs(1)))); // 等待1秒钟结束,比上面的fut更早结束, 如果改成大于4秒的话,就能正常结束。
		}

		// Wait for done count is equal to add count.
		wg.await;
	});
}

当然 PR #2 重写的实现由于api没变自然也一样会有这种问题。

@jmjoy
Copy link
Owner

jmjoy commented Feb 23, 2020

@laizy 谢谢大佬指点。

@nkbai
Copy link
Author

nkbai commented Feb 24, 2020

@laizy 你说的这个应该是完全不同的两个问题
至少我想的wait_group 是和golang中的类似的。

如果没有调用done,导致await不返回,这个就是api设计的效果。

https://gobyexample.com/waitgroups
这里是一个例子

看我理解的不对

@jmjoy
Copy link
Owner

jmjoy commented Feb 24, 2020

@laizy 你说的这个应该是完全不同的两个问题
至少我想的wait_group 是和golang中的类似的。

如果没有调用done,导致await不返回,这个就是api设计的效果。

https://gobyexample.com/waitgroups
这里是一个例子

看我理解的不对

@nkbai Golang必须要defer确保Done执行了,Rust则需要在drop里面确定Done,这样的话,async/await就没法用了,参考crossbeam的WaitGroup。

我在想这个库是不是得archive掉。

@nkbai
Copy link
Author

nkbai commented Feb 24, 2020

@jmjoy 👍
一个好的接口不容易抽象,crossbeam的WaitGroup接口更优雅一些。

@laizy
Copy link

laizy commented Feb 24, 2020

如果没有调用done,导致await不返回,这个就是api设计的效果。

错了,你需要好好理理这里的本质问题。即便是调用done,也一样会导致卡死。我上面的例子只是为了方便重现。

let fut = async move {
    wg.done().await;  // 这个await也可能因为其他wg同时调用done方法导致它没法拿到内部的锁而提前退出。
};
task::spawn(fut.race(other_future));

@nkbai
Copy link
Author

nkbai commented Feb 25, 2020

什么情况下会同时调用done而导致无法拿到内部的锁呢?
现在0.1.2这个版本有不合理的地方,应该使用一个锁管理waker以及count。如果是这个问题,那已经在我修复的范围之内了:

如果count和waker不是同一把锁,会出现如下情况

  1. poll 发现 count 没有ready
  2. poll 被调度走
  3. done count-1,发现到0了
  4. done 尝试wake,发现没有必要,因为 poll正在执行.
  5. poll被调度回来
  6. poll继续,设置waker
  7. 但是没有任何人会wake他了.

@laizy
Copy link

laizy commented Feb 27, 2020

实现了一个lock-free的版本供你们参考:https://github.com/laizy/waitgroup-rs

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants