Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

数组越界BUG #4

Open
buguang01 opened this issue Nov 1, 2019 · 14 comments
Open

数组越界BUG #4

buguang01 opened this issue Nov 1, 2019 · 14 comments

Comments

@buguang01
Copy link

func (m *Master) AdjustSize(newSize int) {
	if int64(newSize) > m.maxNum {
		newSize = int(m.maxNum)
	}

	m.Lock()
	defer m.Unlock()

	if diff := newSize - int(m.ingNum); diff > 0 {
		for i := 0; i < diff; i++ {
			m.workers[m.ingNum] = newWorker()
			atomic.AddInt64(&m.ingNum, 1)
		}
	} else if diff < 0 {
		atomic.StoreInt64(&m.ingNum, int64(newSize))
		if cursor := atomic.LoadInt64(&m.cursor); cursor > int64(newSize) {
			atomic.StoreInt64(&m.cursor, int64(newSize))
		}
		for _, w := range m.workers[newSize:] {
			if w == nil {
				break
			}
			w.shutdown()
		}
		m.workers = m.workers[0:newSize]
	}
}

func (m *Master) Running() int64 {
	return atomic.LoadInt64(&m.ingNum)
}

func (m *Master) Shutdown() {
	m.AdjustSize(0) // 关闭所有worker
}

func (m *Master) getWorker() *worker {
	atomic.CompareAndSwapInt64(&m.cursor, m.ingNum, 0)
	idx := atomic.AddInt64(&m.cursor, 1)
	w := m.workers[idx-1]
	return w
}

当cursor与原ingNum相等时,你要进行缩容
这个时候,如果是先把ingNum改成了新的值,再
atomic.CompareAndSwapInt64(&m.cursor, m.ingNum, 0)
运行这行时,就不会修改成功,然后你对cursor,进行了累加,导致越界,或有可能拿到一个nil对象。
还有就是你的代码里,对workers初始化的时候,用的是maxNum
但后面修改的时候,又用了ingNum做为他的长度。
我原以为你是要一开始声明一个确定长度的数组。但后面又换了,如果按你后面的逻辑
你一开始应该是
workers: make([]*worker, initNum,maxNum)
才对。

letsfire added a commit that referenced this issue Nov 5, 2019
@letsfire letsfire mentioned this issue Nov 5, 2019
@letsfire
Copy link
Owner

letsfire commented Nov 5, 2019

大佬,帮忙Review

@buguang01
Copy link
Author

buguang01 commented Nov 6, 2019

你写了一个比之前复杂的保证安全的逻辑,但仔细看,还是会有问题

if atomic.CompareAndSwapInt32(&m.tryFlag, 0, 1) {
		atomic.StoreInt64(&m.cursor, 0)
		atomic.StoreInt32(&m.tryFlag, 0)
		m.synCond.Broadcast()
	} else {
		m.synCond.L.Lock()
		m.synCond.Wait()
		m.synCond.L.Unlock()
	}

你不能保证你在m.synCond.Broadcast()时,所有其他协程都在m.synCond.Wait()
虽然你的逻辑也不能保证m.synCond.Broadcast()之前,只有一个协程进入if的then里,但如果只有一个进入了then而有2个进入了else其中一个到了Wait,还有一个在Lock()等待进入中
这个时候你的m.synCond.Broadcast()只能让之前在wait的出来,然后Lock()就进入了wait。
这个时候,如果没有新的协程去m.synCond.Broadcast(),那这个Wait就会一直wiat下去。

虽然Wait方法里面会在添加监听后,释放锁,让别的协程可以进来,但是依然存在着我上面说的那种情况发生的概率;

@buguang01
Copy link
Author

buguang01 commented Nov 6, 2019

你把workers从数组改成了双map进行切换,但你写的并没有达到目标,只是map在获取的时候,可以检查而已,并不需要双map
如果使用双map那每次就应该只更新其中一个map,但那样的话就会导致资源的不释放问题,和你写这个东西的初衷就违背了。

@buguang01
Copy link
Author

还有一个问题,那就是你的双map是同一个map,导致一修改数量,就会出现
fatal error: concurrent map read and map write

@buguang01
Copy link
Author

buguang01 commented Nov 6, 2019

func TestMaster(t *testing.T) {
	wg := sync.WaitGroup{}
	wg2 := sync.WaitGroup{}
	w := NewMaster(10, 2)
	ctx, cel := context.WithCancel(context.Background())
	li := w.AddLine("testli", func(e interface{}) {
		defer wg.Done()
	})
	for i := 0; i < 100; i++ {
		go func() {
			wg2.Add(1)
			defer wg2.Done()
			for {
				select {
				case <-ctx.Done():
					return
				default:
					wg.Add(1)
					li.Submit(0)
				}
			}
		}()
	}
	time.Sleep(time.Second * 5)
	fmt.Println("adjustsize")
	w.AdjustSize(6)
	time.Sleep(time.Second * 2)
	w.AdjustSize(1)
	time.Sleep(time.Second * 2)
	cel() //关闭发协程
	w.AdjustSize(2)
	time.Sleep(time.Second * 2)
	wg2.Wait() //确认发协程是否能关闭,多半会卡在这里
	fmt.Println("down")
	w.Shutdown()
	fmt.Println("Wait")
	wg.Wait()
	fmt.Println("End")
}

给你一段测试代码。
改到你能看到输出end就行

@buguang01
Copy link
Author

记的多跑几次,几个问题都会暴露出来的。

letsfire added a commit that referenced this issue Nov 18, 2019
@letsfire
Copy link
Owner

感谢,这段工作较忙,问题已经修复,使用了sync.Map,您的测试用例已通过。再次感谢

@buguang01
Copy link
Author

你的代码需要优化。用了好多互斥锁。单线程,原子锁。

@letsfire
Copy link
Owner

还请指点,resetGuard里面是多,但是这段代码执行少,复杂度可以忽略吧

@buguang01
Copy link
Author

Master.resetGuard
这个是用来干什么用的?

@letsfire
Copy link
Owner

确保只有一个协程能重置m.cursor

@buguang01
Copy link
Author

确保只有一个协程能重置m.cursor

原因呢?
你里面本来就是一个原子操作了

@buguang01
Copy link
Author

还有就是getWorker为什么是递归?不应该是循环吗?

@letsfire
Copy link
Owner

m.workers.Load(idx) 同一时间会有多个协程拿不到worker进而去下面重置cursor,这时候下面的guard就是仅让其中一个协程进行重置,其他等待重置完毕继续获取从头获取

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants