Skip to content

Latest commit

 

History

History
456 lines (359 loc) · 13.2 KB

golang_lockfree_disruptor.md

File metadata and controls

456 lines (359 loc) · 13.2 KB

golang disruptor lockfree 高性能无锁队列的设计实现原理

腾讯的老哥在社区中开源了 golang lockfree 的库,本人看到该库的设计很有意思,就参与该项目的开发改进,已经是该项目的 contributor 了. 该库使用 golang 开发,相比社区中其他 golang lockfree 来说,api 更友好,性能更好,其提供了各种的休眠阻塞策略,避免过度的 spin 引发 cpu 开销升高。

项目地址: https://github.com/bruceshao/lockfree

lockfree queue 的基本原理还是好理解的,设计上借鉴参考了无锁队列的标杆 java disruptor,另外在代码中很多性能的优化点也是参照了 java disruptor。

lockfree queue 无锁的设计大概流程是这样,首先需要一个原子递增的发号器。生产者写数据时,先拿到一个 seq 序号,通过位运算找到 ringbuffer 的位置,如何 ringbuffer 还有空余空间,只需写到 ringbuffer 对应位置即可,如果空间已满,则需要等待。而读取数据只需判断对应结构的 seq 跟 consumer seq 是否一致即可。

什么场景下需要使用 lockfree 无锁队列?

就 go lockfree 和 disruptor 的设计来说,首先对性能很敏感,另外 consumer 为计算密集型或非阻塞操作,比如你使用 disruptor 队列做通知队列,consumer 收到事件后,进行 http 请求操作,这显然是无法体现 lockfree 队列的性能。反而因为 consumer 消费过慢,引发 producer 和 consumer 的 spin 自旋开销。

我的理解,性能敏感且 consumer 处理的足够快的场景可以使用 lockfree 无锁队列。

lockfree vs channel 的性能表现

如下所述.

  1. 在goroutine数量比较小时,lockfree 和 channel 性能差别不明显;
  2. 当goroutine打到一定数量(大于1000)后,lockfree无论从时间还是 QR 都远远超过chan;

一句话,大多数场景不需要 lockfree 无锁队列,除非追求机制的性能体验。平时用 golang channel 足矣了,如果 golang channel 出现并发的性能瓶颈,其实也可以变通下,切分多个 channel 来分担 mutex 锁竞争冲突,以提高 channel 的读写并行吞吐。

lockfree 如何使用 ?

该库的使用很简单。先实例化 lockfree 队列对象,然后就可并发写,读则是注册一个回调方法,lockfree consumer 消费数后会回调注册的方法。

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/bruceshao/lockfree"
)

var (
	goSize    = 10000
	sizePerGo = 10000

	total = goSize * sizePerGo
)

func main() {
	// lockfree计时
	now := time.Now()

	// 创建事件处理器
	handler := &eventHandler[uint64]{
		signal: make(chan struct{}, 0),
		now:    now,
	}

	// 创建消费端串行处理的Lockfree
	lf := lockfree.NewLockfree[uint64](
		1024*1024,
		handler,
		lockfree.NewSleepBlockStrategy(time.Millisecond),
	)

	// 启动Lockfree
	if err := lf.Start(); err != nil {
		panic(err)
	}

	// 获取生产者对象
	producer := lf.Producer()

	// 并发写入
	var wg sync.WaitGroup
	wg.Add(goSize)
	for i := 0; i < goSize; i++ {
		go func(start int) {
			for j := 0; j < sizePerGo; j++ {
				err := producer.Write(uint64(start*sizePerGo + j + 1))
				if err != nil {
					panic(err)
				}
			}
			wg.Done()
		}(i)
	}

	// wait for producer
	wg.Wait()

	fmt.Printf("producer has been writed, write count: %v, time cost: %v \n", total, time.Since(now).String())

	// wait for consumer
	handler.wait()

	// 关闭Lockfree
	lf.Close()
}

type eventHandler[T uint64] struct {
	signal   chan struct{}
	gcounter uint64
	now      time.Time
}

func (h *eventHandler[T]) OnEvent(v uint64) {
	cur := atomic.AddUint64(&h.gcounter, 1)
	if cur == uint64(total) {
		fmt.Printf("eventHandler has been consumed already, read count: %v, time cose: %v\n", total, time.Since(h.now))
		close(h.signal)
		return
	}

	if cur%10000000 == 0 {
		fmt.Printf("eventHandler consume %v\n", cur)
	}
}

func (h *eventHandler[T]) wait() {
	<-h.signal
}

go lockfree 源码分析

实例化 lockfree 对象

type Lockfree[T any] struct {
	writer   *Producer[T]
	consumer *consumer[T]
	status   int32
}

func NewLockfree[T any](capacity int, handler EventHandler[T], blocks blockStrategy) *Lockfree[T] {
	// 重新计算正确的容量
	capacity = minSuitableCap(capacity)

	// 发号器
	seqer := newSequencer(capacity)

	// 实例化 ringbuffer,队列是环形数组
	rbuf := newRingBuffer[T](capacity)

	// 实例化定义消费者
	cmer := newConsumer[T](rbuf, handler, seqer, blocks)

	// 实例化生产者
	writer := newProducer[T](seqer, rbuf, blocks)
	return &Lockfree[T]{
		writer:   writer,
		consumer: cmer,
		status:   READY,
	}
}

启动 lockfree

Start() 用来启动 lockfree,对于 consumer 来说会启动一个消费协程消费 ringbuffer 队列中的数据,而对于 producer 来说,只是把 status 只为 running 即可。

func (d *Lockfree[T]) Start() error {
	// 保证并发安全
	if atomic.CompareAndSwapInt32(&d.status, READY, RUNNING) {
		// 启动消费者
		if err := d.consumer.start(); err != nil {
			// 启动失败,重置 status
			atomic.CompareAndSwapInt32(&d.status, RUNNING, READY)
			return err
		}

		// 启动生产者
		if err := d.writer.start(); err != nil {
			// 恢复现场
			atomic.CompareAndSwapInt32(&d.status, RUNNING, READY)
			return err
		}
		return nil
	}
	return fmt.Errorf(StartErrorFormat, "Disruptor")
}

发号器

lockfree queue 通常都需要一个原子发号器的,其实就是递增的 uint64。lockfree 库里为了避免并发下带来的伪共享问题,使用填充的方式让 uint64 独占缓存行 cache line。

type sequencer struct {
	wc       *cursor
	rc       uint64 // 读取游标,因为该值仅会被一个g修改,所以不需要使用cursor
	capacity uint64
}

type cursor struct {
	// cache line 填充,这里适配了 64 和 128 字节的缓存行。
	p1, p2, p3, p4, p5, p6, p7       uint64
	v                                uint64
	p9, p10, p11, p12, p13, p14, p15 uint64
}

func newSequencer(capacity int) *sequencer {
	return &sequencer{
		wc:       newCursor(),
		rc:       1,
		capacity: uint64(capacity),
	}
}

// nextRead 获取下个要读取的位置
// 使用原子操作解决data race问题
func (s *sequencer) nextRead() uint64 {
	return atomic.LoadUint64(&s.rc)
}

func (s *sequencer) readIncrement() uint64 {
	return atomic.AddUint64(&s.rc, 1)
}

生产者

初始化 producer 对象

type Producer[T any] struct {
	seqer    *sequencer
	rbuf     *ringBuffer[T]
	blocks   blockStrategy
	capacity uint64
	status   int32
}

func newProducer[T any](seqer *sequencer, rbuf *ringBuffer[T], blocks blockStrategy) *Producer[T] {
	return &Producer[T]{
		seqer:    seqer,
		rbuf:     rbuf,
		blocks:   blocks,
		capacity: rbuf.cap(),
		status:   READY,
	}
}

func (q *Producer[T]) start() error {
	if atomic.CompareAndSwapInt32(&q.status, READY, RUNNING) {
		return nil
	}
	return fmt.Errorf(StartErrorFormat, "Producer")
}

写数据

先获取写的 seq 序号,判断 ringbuffer 是否可写,如可写,直接写入到对应的 ringbuffer 位置即可,不可写则需要自旋判断,直到可写为止。

func (q *Producer[T]) Write(v T) error {
	if q.closed() {
		return ClosedError
	}

	// 获取写的位置
	next := q.seqer.wc.increment()

	for {
		r := atomic.LoadUint64(&q.seqer.rc) - 1

		// 判断 ringbuffer 是否已满,consumer 的 seq + ringbuffer 长度为判断条件.
		if next <= r+q.capacity {
			// 可以写入数据,将数据写入到指定位置
			q.rbuf.write(next-1, v)
			// 释放,防止消费端阻塞
			q.blocks.release()
			return nil
		}

		// 队列已满,则暂时让出 cpu 资源,等待 runtime 下次调度,如果 consumer 消费不及时,那么这里会引发自旋问题。
		runtime.Gosched()

		// 再次判断是否已关闭
		if q.closed() {
			return ClosedError
		}
	}
}

如果消费者处理速度慢一些,那么队列会无空闲位置,也就是已满。那么生产者无法 write,内部会触发 runtime.Gosched 退让资源和切出调度,如果一段时间内未消费,生产者必然会因为这个 spin 操作引发 cpu 开销飙高。

所以,上面一直有说 lockfree / disruptor 的使用场景,消费者要足够的快。

runtime.Gosched 是暂停当前的 G,然后把该 G 放到队列尾部,重新切 g0 选择新的 G 进行调度。这个设计很好,社区中一些开源库有使用 atomic cas 实现的锁,当无法 cas 时,会执行 gosched 切换调度。

但就 lockfree 场景来说,如果 consumer 消费的不够快,那么 producer write 失败后会一直频繁的 gosched。这时候 runq 只有 producer,runtime goready 调度起来后又无法写,这个操作一直循环下来会使 cpu 飙高满载。

所以我这边又做了进一步的调整,写入时可以选择是否 sleep 休眠一下。经过压测得出 sleep 开销如下。

  • 10us 的空闲时间,cpu开销在 3% 左右;
  • 5us 的空间实现,cpu 开销在 10% 左右。
  • < 5us 的空间实现,cpu 开销接近 100%。
// WriteSleep 当 ringbuffer 已满时,先 cpu pause 则 sched 让出资源,最后进行 sleep 休眠,以此避免频繁 spin 引发的 cpu 开销。
func (q *Producer[T]) WriteSleep(v T, dur time.Duration) error {
	return q.write(v, dur)
}

func (q *Producer[T]) write(v T, dur time.Duration) error {
	if q.closed() {
		return ClosedError
	}

	next := q.seqer.wc.increment()
	var i = 0
	for {
		// 判断是否可以写入
		r := atomic.LoadUint64(&q.seqer.rc) - 1
		if next <= r+q.capacity {
			// 可以写入数据,将数据写入到指定位置
			q.rbuf.write(next-1, v)
			// 释放,防止消费端阻塞
			q.blocks.release()
			return nil
		}

		if i < spin {
			procyield(30)
		} else if i < spin+passiveSpin {
			runtime.Gosched()
		} else {
			time.Sleep(dur)
			i = 0
		}
		i++

		// 再次判断是否已关闭
		if q.closed() {
			return ClosedError
		}
	}
}

😁 关于 lockfree write 写失败 spin 引发的 cpu 过高问题,已经给作者提价 pull request (pr),但作者还没给合并,说要考虑用户的选择再考虑下。我表示不理解。

bruceshao/lockfree#15

consumer 消费者

初始化 consumer

type consumer[T any] struct {
	status int32 // 运行状态
	rbuf   *ringBuffer[T]
	seqer  *sequencer
	blocks blockStrategy
	hdl    EventHandler[T]
}

func newConsumer[T any](rbuf *ringBuffer[T], hdl EventHandler[T], sequer *sequencer, blocks blockStrategy) *consumer[T] {
	return &consumer[T]{
		rbuf:   rbuf,
		seqer:  sequer,
		hdl:    hdl,
		blocks: blocks,
		status: READY,
	}
}

func (c *consumer[T]) start() error {
	// 保证单例
	if atomic.CompareAndSwapInt32(&c.status, READY, RUNNING) {
		go c.handle()
		return nil
	}
	return fmt.Errorf(StartErrorFormat, "Consumer")
}

consumer 消费数据

func (c *consumer[T]) handle() {
	// 获取 read 的 位置
	rc := c.seqer.nextRead()
	for {
		if c.closed() {
			return
		}
		var i = 0
		for {
			if c.closed() {
				return
			}

			// 位置的对象是否可读,可读获取对象的数据.
			if v, exist := c.rbuf.contains(rc - 1); exist {
				// 既然可读了,则递增 read 位置,下次轮询新位置.
				rc = c.seqer.readIncrement()

				// 调用注册的回调方法,把数据传递过去.
				c.hdl.OnEvent(v)
				i = 0
				break
			}

			// 当无数据可读时, 可按照策略进行阻塞等待.
			// 当轮询小于 4 次时, 每次执行 30 次 cpu pause 
			if i < spin {
				procyield(30)
			} else if i < spin+passiveSpin {
				// 空闲阻塞策略,当轮询在 4-6 次时, 每次执行 runtime sched 让出调度资源
				runtime.Gosched()
			} else {
				// 自定义阻塞策略
				c.blocks.block()

				// 重置为 0
				i = 0
			}
			i++
		}
	}
}

判断 ringbuffer 某个 seq 是否可读 ?

lockfree 开始的设计使用 bitmap 判断 ringbuffer 各个位置的数据情况,后来改成在每个 entry 对象里存放了 producer 写入的 seq。这样判断位置是否可读只需判断 entry.seq 是否一致即可。

func (r *ringBuffer[T]) contains(c uint64) (T, bool) {
	x := &r.buf[c&r.capMask]
	if atomic.LoadUint64(&x.c) == c+1 {
		v := x.val
		return v, true
	}
	return r.tDefault, false
}

总结

lockfree queue 的设计还是很简单,首先需要一个原子递增的发号器,生产者并发写下,先拿到一个 seq 序号,然后 ringbuffer 不满下,只需写到 ringbuffer 对应位置即可。而读取数据只需判断对应结构的 seq 跟 consumer seq 是否一致即可。

至于 lockfree 的性能表现,如下所述.

  1. 在goroutine数量比较小时,lockfree和chan性能差别不明显;
  2. 当goroutine打到一定数量(大于1000)后,lockfree无论从时间还是QR都远远超过chan;

一句话,大多数场景不需要 lockfree 无锁队列,除非追求机制的性能体验。平时用 golang channel 足矣了,如果 golang channel 出现并发的性能瓶颈,其实也可以变通下,切分多个 channel 来分担 mutex 锁竞争冲突,以提高 channel 的读写并行吞吐。