Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mapreduce.Finish中嵌套使用mapreduce.MapReduce会导致Finish变成非阻塞操作 #4

Open
lujin123 opened this issue Nov 3, 2022 · 1 comment

Comments

@lujin123
Copy link

lujin123 commented Nov 3, 2022

嵌套的MapReducereducer中如果不调用writer.Write方法,会产生一个ErrReduceNoOutput错误,Finish 中 worker 返回异常会直接结束 Finish 调用,但是Finish中调用的MapReduceVoid会吞掉ErrReduceNoOutput错误返回一个 nil,从最后结果看是没有异常的成功调用,实际其他的 worker 都还在异步运行

例如下面这样的调用:

func main(){
        err := mapreduce.Finish(func() error {
		return worker1()
	}, func() error {
		val, err := mapreduce.MapReduce(func(source chan<- interface{}) {
			for i := 0;i<10;i++{
				source <- i
			}
		}, func(item interface{}, writer mapreduce.Writer, cancel func(error)) {
			i := item.(int)
			writer.Write(i * i)
		}, func(pipe <-chan interface{}, writer mapreduce.Writer, cancel func(error)) {
			var cnt int
			for i := range pipe{
				cnt += i.(int)
			}
                         // 这里不调用Write 会导致当前这个 worker 任务返回一个异常
			// writer.Write(cnt) 
		})
                 // 收到一个异常 `ErrReduceNoOutput`
		if err != nil {
			return err
		}
		fmt.Println("result:", val)
	})
       // 这里的 err 是 nil
       if err != nil {
           fmt.Println(err)
      }
}
@kevwan
Copy link
Owner

kevwan commented Jan 12, 2023

If any err, Finish function quits.

Because like we're doing some IO requests, if any of them failed, others should quit, but if they're blocked in IO, the only way to do so is just quit Finish.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants