Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使用sync.Cond实现一个有限容量的队列 #22

Open
kevinyan815 opened this issue Nov 1, 2020 · 0 comments
Open

使用sync.Cond实现一个有限容量的队列 #22

kevinyan815 opened this issue Nov 1, 2020 · 0 comments

Comments

@kevinyan815
Copy link
Owner

kevinyan815 commented Nov 1, 2020

Cond 是为等待 / 通知场景下的并发问题提供支持的。它提供了条件变量的三个基本方法 Signal、Broadcast 和 Wait,为并发的 goroutine 提供等待 / 通知机制。在实践中,处理等待 / 通知的场景时,我们常常会使用 Channel 替换 Cond,因为 Channel 类型使用起来更简洁,而且不容易出错。但是对于需要重复调用 Broadcast 的场景,比如 Kubernetes 的调度队列ScheduleQueue就是依赖sync.Cond实现的,具体怎么实现的可以看看这篇文章:Kubernetes调度队列对sync.Cond的使用。每次往队列中成功增加了元素后就需要调用 Broadcast 通知所有的等待者,使用 Cond 就再合适不过了。使用 Cond 之所以容易出错,就是 Wait 调用需要加锁,以及被唤醒后一定要检查条件是否真的已经满足。

sync包里的WaitGroup和Cond两个原语有本质上的区别,WaitGroup 是主 goroutine 等待确定数量的子 goroutine 完成任务;而 Cond 是等待某个条件满足,这个条件的修改可以被任意多的 goroutine 更新,而且 Cond 的 Wait 不关心也不知道其他 goroutine 的数量,只关心等待条件。而且 Cond 还有单个通知的机制,也就是 Signal 方法。

下面是用sync.Cond实现的一个类似Kubernetes调度队列的有限队列。

sync.Cond的使用方法和实现原理详见:https://time.geekbang.org/column/article/299312

package main

import (
	"fmt"
	"math/rand"
	"strings"
	"sync"
)

type Queue struct {
	cond *sync.Cond
	data []interface{}
	capc int
	logs []string
}

func NewQueue(capacity int) *Queue {
	return &Queue{cond: &sync.Cond{L: &sync.Mutex{}}, data: make([]interface{}, 0), capc: capacity, logs: make([]string, 0)}
}

func (q *Queue) Enqueue(d interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	for len(q.data) == q.capc {
		q.cond.Wait()
	}
	// FIFO入队
	q.data = append(q.data, d)
	// 记录操作日志
	q.logs = append(q.logs, fmt.Sprintf("En %v\n", d))
	// 通知其他waiter进行Dequeue或Enqueue操作
	q.cond.Broadcast()

}

func (q *Queue) Dequeue() (d interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	for len(q.data) == 0 {
		q.cond.Wait()
	}
	// FIFO出队
	d = q.data[0]
	q.data = q.data[1:]
	// 记录操作日志
	q.logs = append(q.logs, fmt.Sprintf("De %v\n", d))
	// 通知其他waiter进行Dequeue或Enqueue操作
	q.cond.Broadcast()
	return
}

func (q *Queue) Len() int {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	return len(q.data)
}

func (q *Queue) String() string {
	var b strings.Builder
	for _, log := range q.logs {
		//fmt.Fprint(&b, log)
		b.WriteString(log)
	}
	return b.String()
}

func main() {
	Example()
}

func Example() {
	var wg sync.WaitGroup
	//容量为5的阻塞队列
	que := NewQueue(3)

	// 生成随机命令
	for i, cmd := range Commands(20, true) {
		wg.Add(1)

		// 0表示入队,1表示出队
		if cmd == 0 {
			go func(id int) {
				defer wg.Done()
				que.Enqueue(id)
			}(i)
		} else {
			go func(id int) {
				defer wg.Done()
				que.Dequeue()
			}(i)
		}
	}

	/*
		// 当执行出队、入队命令的worker数『不相等』时
		// 最后会有worker阻塞在出队或入队方法上
		// 同时主goroutine会阻塞在wg.Wait()上
		// 此时所有goroutine都阻塞了
		// 下面的goroutine会避免该问题
		// 但仍需新worker唤醒阻塞在队列上的worker
		go func() {
			for {
				select{
				case <-time.After(time.Second):
					runtime.Gosched()
				}
			}
		}()
	*/

	wg.Wait()

	// 输出操作日志
	fmt.Println(que)
}

// Commands 用于产生出队、入队命令
func Commands(N int, random bool) []int {
	if N%2 != 0 {
		panic("will deadlock!")
	}
	// 0表示入队,1表示出队
	commands := make([]int, N)
	for i := 0; i < N; i++ {
		if i%2 == 0 {
			commands[i] = 1
		}
	}

	if random {
		// shuffle algorithms
		for i := len(commands) - 1; i > 0; i-- {
			j := rand.Intn(i + 1)
			commands[i], commands[j] = commands[j], commands[i]
		}
	}

	return commands
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant