Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关闭风险 #3

Open
buguang01 opened this issue Nov 1, 2019 · 8 comments
Open

关闭风险 #3

buguang01 opened this issue Nov 1, 2019 · 8 comments

Comments

@buguang01
Copy link

func newWorker() (w *worker) {
	w = &worker{
		params: make(chan interface{}),
	}
	go func(w *worker) {
		for {
			if w.process() {
				break
			}
			atomic.StoreInt32(&w.isBusy, 0)
		}
		// 置为繁忙状态
		atomic.StoreInt32(&w.isBusy, 1)
		// 可能存在任务
		select {
		case params := <-w.params:
			w.action(params)
		default:
		}
		// 关闭任务通道
		close(w.params)
	}(w)
	return
}

在退出的时候,你这边考虑了有可能有某个任务正在写入新的参数。但是在你的这段代码运行到select的时候,也有可能那边还没开始写入参数,导致你这边运行到了default然后就把通道关闭了,导致写入方出现异常。

@buguang01
Copy link
Author

还有,就算你在这里读到了任务参数,开始运行了,但是你在select中做的w.action(params),没有做异常保护。也可能导致出问题。

@letsfire
Copy link
Owner

letsfire commented Nov 1, 2019

第一个问题,我已经先把work标记成busy了,不会再有新写入进入了吧
第二个问题周末修复一下,感谢您的反馈

@buguang01
Copy link
Author

buguang01 commented Nov 1, 2019

如果在
// 置为繁忙状态
atomic.StoreInt32(&w.isBusy, 1)
之前,
if atomic.CompareAndSwapInt32(&w.isBusy, 0, 1) {
这个刚被执行呢?

letsfire added a commit that referenced this issue Nov 18, 2019
letsfire added a commit that referenced this issue Nov 18, 2019
@letsfire
Copy link
Owner

我加了一个定时器解决您提的这个问题,至于action出错,我的设想是action自行处理的,line.go里面也有SetPanicHandler,worker.go里的process也只是防止worker死掉,出错了组件这里也是不知道怎么回补的,这些应该action自己处理吧

@buguang01
Copy link
Author

func (w *worker) assign(action func(interface{}), params interface{}) bool {
	if atomic.CompareAndSwapInt32(&w.isBusy, 0, 1) {
		time.Sleep(time.Second)
		w.action = action
		w.params <- params
		return true
	}
	return false
}

如果你在这里加上一个sleep我说的这个BUG就很容易暴露了。
这是一个风险。

@buguang01
Copy link
Author

func TestMaster(t *testing.T) {
	wg := sync.WaitGroup{}
	wg2 := sync.WaitGroup{}
	w := factory.NewMaster(100, 2)
	ctx, cel := context.WithCancel(context.Background())
	li := w.AddLine("testli", func(e interface{}) {
		defer wg.Done()
		time.Sleep(time.Second * 1)
	})
	for i := 0; i < 100; i++ {
		go func() {
			wg2.Add(1)
			defer wg2.Done()
			for {
				select {
				case <-ctx.Done():
					return
				default:
					wg.Add(1)
					li.Submit(0)
				}
			}
		}()
	}
	time.Sleep(time.Second * 5)
	fmt.Println("adjustsize")
	for i := 0; i < 100; i++ {
		w.AdjustSize(100)
		time.Sleep(time.Second)
		w.AdjustSize(1)
		time.Sleep(time.Second)
	}
	fmt.Println("for end")
	cel()      //关闭发协程
	wg2.Wait() //确认发协程是否能关闭,多半会卡在这里
	fmt.Println("down")
	w.Shutdown()
	fmt.Println("Wait")
	wg.Wait()
	fmt.Println("End")
}

测试

letsfire added a commit that referenced this issue Nov 19, 2019
@buguang01
Copy link
Author

buguang01 commented Nov 19, 2019

你用isBusy的0,1表示是否在被占用,那关闭就是一直被占用,你可以在发关闭信号的时候,把这个改成2,然后写入那个值。

func (w *worker) shutdown() {
        for !atomic.CompareAndSwapInt32(&w.isBusy, 0, 2) {
	        
        }
        w.params <- exitSignal{}
       close(w.params)
}

然后在

for {
	if w.process() {
		break
	}
	atomic.CompareAndSwapInt32(&w.isBusy, 1,0)
}

其他地方你再改改

@buguang01
Copy link
Author

优化的好一点,其实还可以在shutdown方法里强改,或chan的缓存为2等等。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants