Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd入口分析 #17

Open
loadlj opened this issue Jul 19, 2018 · 0 comments
Open

nsqd入口分析 #17

loadlj opened this issue Jul 19, 2018 · 0 comments

Comments

@loadlj
Copy link
Owner

loadlj commented Jul 19, 2018

Main入口分析

nsqd的入口调用了go-svc的Run方法,这里的go-svc有点类似于watchdog监测用户signal的行为,可以很好的兼容Linux和Windows。svc_other里面声明的signalNotify是从signal.Notify这里拿到的,这个函数的主要作用是监听用户输入的signal。
Main还声明了一个tcpListener和一个httpListener,tcpListener是用来处理跟client的连接的。

	tcpServer := &tcpServer{ctx: ctx}
	n.waitGroup.Wrap(func() {
		protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
	})

tcpServer有一个handle方法,用来处理tcp的连接的。里面只会接受" V2"的protocol,然后对每个连接进行IOLoop。
IOLoop会对每个client先进行messagePump

messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan

messagePump主要做的事:发送心跳包给客户端,获取channel中的消息,发送给客户端。

case <-heartbeatChan:
			err = p.Send(client, frameTypeResponse, heartbeatBytes)
			if err != nil {
				goto exit
			}
case b := <-backendMsgChan:
case msg := <-memoryMsgChan:

然后在IOLoop中,有新的for循环去监听客户端的消息并进行处理。这里大概理一下,后面详细分析

回到Main中,后面新开了两个goroutine,一个queueScanLoop处理消息,一个lookupLoop处理和nsqlookupd的连接的。

n.waitGroup.Wrap(func() { n.queueScanLoop() })
n.waitGroup.Wrap(func() { n.lookupLoop() })

queueScanLoop

  • workTicker
    workerTicker是一个主动去刷新queue的ticker
  • refreshTicker
    refreshTicker是一个被动的ticker,每间隔5s去刷新一次,参照了redis的过期刷新算法。

随机挑选若干个key,删除所有过期的,如果过期的key的占比大于25%,则继续循环删除
在nsqd里面的定义是如果一个channel还有work在处理则认为是"dirty",判断dirty的占比如果大于QueueScanDirtyPercent就会继续循环给workCh发送channel。

resizePool

func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
	idealPoolSize := int(float64(num) * 0.25)
	if idealPoolSize < 1 {
		idealPoolSize = 1
	} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
		idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
	}
	for {
		if idealPoolSize == n.poolSize {
			break
		} else if idealPoolSize < n.poolSize {
			// contract
			closeCh <- 1
			n.poolSize--
		} else {
			// expand
			n.waitGroup.Wrap(func() {
				n.queueScanWorker(workCh, responseCh, closeCh)
			})
			n.poolSize++
		}
	}
}

resizePool是用来动态调整queueScanWorker的,idealPoolSize是当前的num(channels) / 4,会跟n.poolSize去做比较,这里表示的是queueScanWorker的数量。如果idealPoolSize要比当前的workpool小,再去新起一个queueScanWorker

queueScanWorker

在nsq中inFlight指的是正在投递但还没确认投递成功的消息,defferred指的是投递失败,等待重新投递的消息。 initPQ创建的字典和队列主要用于索引和存放这两类消息。其中两个字典使用消息ID作索引。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant