Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context support to Go method? #107

Closed
molon opened this issue Mar 22, 2023 · 5 comments
Closed

Add context support to Go method? #107

molon opened this issue Mar 22, 2023 · 5 comments

Comments

@molon
Copy link

molon commented Mar 22, 2023

conc/pool/pool.go

Lines 54 to 67 in 1d4991d

select {
case p.limiter <- struct{}{}:
// If we are below our limit, spawn a new worker rather
// than waiting for one to become available.
p.handle.Go(p.worker)
// We know there is at least one worker running, so wait
// for it to become available. This ensures we never spawn
// more workers than the number of tasks.
p.tasks <- f
case p.tasks <- f:
// A worker is available and has accepted the task.
return
}

If WithMaxGoroutines is set, the caller may block until a worker is released, but in some scenarios, it is necessary for the caller to decide when to cancel or wait for a timeout.

func (p *Pool) GoContext(ctx context.Context, f func()) error
or
func (p *Pool) TryGo(ctx context.Context, f func()) error

select { 
 case <-ctx.Done():  // <------------
 	return ctx.Error()
 case p.limiter <- struct{}{}: 
 	// If we are below our limit, spawn a new worker rather 
 	// than waiting for one to become available. 
 	p.handle.Go(p.worker) 
  
 	// We know there is at least one worker running, so wait 
 	// for it to become available. This ensures we never spawn 
 	// more workers than the number of tasks. 
 	p.tasks <- f 
 case p.tasks <- f: 
 	// A worker is available and has accepted the task. 
 	return nil
 } 
@mhmtszr
Copy link

mhmtszr commented Apr 20, 2023

We may also want to cancel the scope by using the contexts.

@camdencheek
Copy link
Member

camdencheek commented Jun 1, 2023

Hi @molon! Thanks for the detailed request.

The reason I initially avoided adding an API like this was because I thought it made it easier to use incorrectly by not calling pool.Wait().

Consider this toy example:

p := pool.New().WithMaxGoroutines()
for i := 0; i < 10; i++ {
    i := i
    err := p.TryGo(ctx, func() {
        doSomeLongTaskThatMightPanic(ctx)
    })
    if err != nil {
         p.Wait() // This is really easy to forget
         return err
    }
}
p.Wait()
return nil

In that example, failing to wait means the panic is never propagated and concurrency is not scoped because there are still background goroutines executing after return.

Instead, the contract is "calling p.Go() guarantees the task will run to completion if p.Wait() is called." Since concurrency in go is all cooperative "completion" here might just mean "the task reads from the outer context and exits early." So if you care about exiting early, your tasks can respect a context and p.Go() will block for at most the time it takes for one of your tasks to finish after a context cancellation.

p := pool.New().WithMaxGoroutines()
for i := 0; i < 10; i++ {
    if err := ctx.Err(); err != nil {
        p.Wait()
        return err
    }
    i := i
    p.Go(func() {
        doSomeLongTaskThatMightPanic(ctx)
    })
}
p.Wait()
return nil

This example achieves almost the same thing as TryGo(), but I think it's slightly more obvious that you're breaking out of the normal control flow.

So I guess where I'm at is I don't really see much additional value compared to just having cancellable tasks, and adding a TryGo() adds a pattern to the library that is easy to misuse. WDYT?

cc @bobheadxi for thoughts as well

@molon
Copy link
Author

molon commented Jun 2, 2023

Hi, @camdencheek

I understand your point of view.

But what I want is to be able to control the timeout of waiting for a worker, and not need to cancel the running workers.
I think your usage is only suitable for one context controls all scenarios.

Also, I suspect that the way I'm using conc.Pool is simply inappropriate:
Call Go/Wait methods asynchronously, because it is easy to encounter panic: send on closed channel

simulation scene

func TestPoolGo(t *testing.T) {
	// Like a bank with only one employee
	p := New().WithMaxGoroutines(1)

	go func() {
		// customer A only willing to wait 2 seconds
		p.Go(func() {
			log.Println("a running")
			time.Sleep(3 * time.Second)
		})
	}()
	go func() {
		// customer B only willing to wait 3 seconds
		p.Go(func() {
			log.Println("b running")
			time.Sleep(3 * time.Second)
		})
	}()
	time.Sleep(1 * time.Second)
	log.Println("before wait")
	p.Wait() // !!!! panic: send on closed channel !!!!
	log.Println("after wait")
}

Now what I want to know is whether I am using conc.Pool in a non-recommended scenario.

And please note

This is safe if the TryGo method is present. This may also be one of the reasons why TryGo should exist.

func TestPoolTryGo(t *testing.T) {
	// Like a bank with only one employee
	p := New().WithMaxGoroutines(1)

	// Like a switch that can stop the bank service
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func() {
		// customer A only willing to wait 2 seconds
		ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
		defer cancel()
		err := p.TryGo(ctx, func() {
			log.Println("a running")
			time.Sleep(3 * time.Second)
		})
		log.Printf("a try result: %v", err)
	}()
	go func() {
		// customer B only willing to wait 3 seconds
		ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
		defer cancel()
		err := p.TryGo(ctx, func() {
			log.Println("b running")
			time.Sleep(3 * time.Second)
		})
		log.Printf("b try result: %v", err)
	}()
	time.Sleep(1 * time.Second)
	// log.Println("before cancel all tries")
	cancel() // bank stop service
	log.Println("before wait")
	p.Wait()
	log.Println("after wait")
}

@bobheadxi
Copy link
Member

bobheadxi commented Jun 2, 2023

This simulation example sounds to me like a long-lived pool, where multiple publishers/submitters push things into a shared pool/backlog/processing queue, where the publishers have requirements of their own: deadline to submit job, etc. and the pool.Wait() is called in another goroutine separate from the publishers.

This idea is a valid one, but one that currently does not fit into anything provided by conc today IMO. We've discussed this in a few places:

  1. long-lived pools/background jobs #93
  2. FanOut/Until Discussion #86 (comment)

The biggest pitfalls are:

  1. not calling Wait() on error cases (as Camden mentioned)
  2. panics do not propagate until pool.Wait(), which in these use cases is often at service shutdown, which could be a very long time

Does this comparison sound right?

@molon
Copy link
Author

molon commented Jun 5, 2023

This simulation example sounds to me like a long-lived pool, where multiple publishers/submitters push things into a shared pool/backlog/processing queue, where the publishers have requirements of their own: deadline to submit job, etc. and the pool.Wait() is called in another goroutine separate from the publishers.

This idea is a valid one, but one that currently does not fit into anything provided by conc today IMO. We've discussed this in a few places:

  1. long-lived pools/background jobs #93
  2. FanOut/Until Discussion #86 (comment)

The biggest pitfalls are:

  1. not calling Wait() on error cases (as Camden mentioned)
  2. panics do not propagate until pool.Wait(), which in these use cases is often at service shutdown, which could be a very long time

Does this comparison sound right?

Indeed like a long life pool. But maybe the execution process is defined by the publisher, not the subscriber.

About pitfalls:

  1. I don't understand why Wait() is easy to forget. By convention, golang developers should be well aware that some resources need to be closed and released.
  2. Because of the mechanism of conc.WaitGroup, this is indeed a problem. But I personally feel that this problem actually exists in all scenarios, not just the one we are currently discussing. In addition, just like in the scenario of using sync.WaitGroup+gorountine, the default will be panic directly. If you want to change the default behavior, you need to do recover separately. When using conc.WaitGroup, you can also use recover to change the default behavior.

All in all, the key points that I personally care more about are:

  1. Since select chan without default is used, <-ctx.Done() should be provided, which is a good practice.
  2. Since Wait() is provided, the unfinished select chan should not encounter panic: send on closed channel, which is not robust for multi-threaded development.

@molon molon closed this as completed Jan 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants