Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why not “p.Running()<capacity”? #142

Closed
sirodeneko opened this issue Mar 13, 2021 · 4 comments
Closed

Why not “p.Running()<capacity”? #142

sirodeneko opened this issue Mar 13, 2021 · 4 comments
Labels
help wanted Extra attention is needed question Further information is requested waiting for response waiting for the response from commenter

Comments

@sirodeneko
Copy link

sirodeneko commented Mar 13, 2021

ants/pool.go

Line 238 in fd8d670

if p.Running() == 0 {

When the worker being executed is less than the capacity, it is already possible not to block, so why wait for all the execution to be completed before you can spawnWorker().
Why not “p.Running()<capacity”?
I am a newbie and would like to get answers,thanks.

当正在执行的worker小于容量的时候,就已经可以不用阻塞了,为什么要等待全部执行完了才可以进行spawnWorker()

@panjf2000
Copy link
Owner

不是你理解的意思,这里之所以有这个判断条件是因为:

ants/pool.go

Lines 87 to 92 in fd8d670

// There might be a situation that all workers have been cleaned up(no any worker is running)
// while some invokers still get stuck in "p.cond.Wait()",
// then it ought to wakes all those invokers.
if p.Running() == 0 {
p.cond.Broadcast()
}

有一种情况是调用者在等待可用的 worker,但是此时所有的 workers 都被定时任务清除掉了,那么该调用者就永远都会阻塞住,所以定时任务每当清除完所有的 workers 之后会去唤醒等待的调用者,此时调用方会判断 p.Running == 0,如果是则需要启动新的 goroutine 去工作。

@panjf2000 panjf2000 added help wanted Extra attention is needed question Further information is requested waiting for response waiting for the response from commenter labels Mar 16, 2021
@sirodeneko
Copy link
Author

我懂你的意思了,是唤醒等待worker的进程,我一开始以为函数会不断的在goto语句中循环运行,现在仔细看了一下,发现原来函数会被阻塞在p.cond.Wait() ,所有函数都在等待唤醒。设计的原则是有空闲worker就用空闲worker,除非没有了才p.workerCache.Get().(*goWorker)对吧。如果采用p.Running()<capacity 会造成浪费。

@panjf2000
Copy link
Owner

p.Running()<capacity 在这个函数的前面已经判断过了,p.Running == 0 就是为了我刚才说的目的而已。

@math345
Copy link

math345 commented Mar 18, 2021

这个问题,我想过。我觉得目前的做法不是很优雅。我之前对这段代码进行了一些推敲,我把我的理解整理如下。

为什么要加这段代码呢?而且 p.lock.Unlock() 后再调用 spawnWorker(),running 个数失去了保护,这个次数就有可能超过了设定的最大值 capacity。

		if p.Running() == 0 {
			p.lock.Unlock()
			spawnWorker()
			return
		}

好吧,我们先忽略上面这个小问题。再来看下面这段代码,为什么不能在 w == nil 时,直接 spawnWorker() 个新的 worker? 因为有可能是收到广播信号,此时不能保证 p.Running() <= capacity 。

		w = p.workers.detach()
		if w == nil {
			goto Reentry
		}

但是,我觉得可以通过修改成下面的代码解决掉这个问题

		w = p.workers.detach()
		if w == nil {
			if p.Running() < capacity {
				spawnWorker()
				p.lock.Unlock()
				return
			}
			goto Reentry
		}

由于作者没有采用类似这样的写法,导致需要增加 if p.Running() == 0 这段代码处理广播信号。

再来看广播信号,广播信号是在定时回收时,满足下面的条件触发的。

		// There might be a situation that all workers have been cleaned up(no any worker is running)
		// while some invokers still get stuck in "p.cond.Wait()",
		// then it ought to wakes all those invokers.
		if p.Running() == 0 {
			p.cond.Broadcast()
		}

为什么需要这段代码?原因在于需要考虑这么一种情况,当 retrieveWorker() 发现 runing() 个数超过了容量,因此在 p.cond.Wait() 中。之后 runing() 的所有 worker f() 都发生了 panic,按当前的代码逻辑,是无法正常归还到工作池中的,revertWorker 不会被调用,也就是 revertWorker 不会发出 Signal 通知。因此,如果我们检测到没有1个worker running,就 Broadcast 信号。

for f := range w.task {
	
	if f == nil {
		return
	}
	
	// 需要考虑 f() panic 的情况,此时不会归还到工作池中
	f()
	
	if ok := w.pool.revertWorker(w); !ok {
		return
	}
}

因此广播信号的加入,确实增加了复杂度。能不能在work run() 中包装调用 f() ,处理 panic ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed question Further information is requested waiting for response waiting for the response from commenter
Projects
None yet
Development

No branches or pull requests

3 participants