forked from nsqio/nsq
/
client_v2.go
142 lines (123 loc) · 3.27 KB
/
client_v2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package main
import (
"../nsq"
"bufio"
"bytes"
"log"
"net"
"sync"
"sync/atomic"
"time"
)
type ClientV2 struct {
net.Conn
sync.Mutex
frameBuf bytes.Buffer
Reader *bufio.Reader
State int32
ReadyCount int64
LastReadyCount int64
InFlightCount int64
MessageCount uint64
FinishCount uint64
RequeueCount uint64
ConnectTime time.Time
Channel *Channel
ReadyStateChan chan int
ExitChan chan int
ShortIdentifier string
LongIdentifier string
}
func NewClientV2(conn net.Conn) *ClientV2 {
var identifier string
if conn != nil {
identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String())
}
return &ClientV2{
net.Conn: conn,
ReadyStateChan: make(chan int, 1),
ExitChan: make(chan int),
ConnectTime: time.Now(),
ShortIdentifier: identifier,
LongIdentifier: identifier,
}
}
func (c *ClientV2) String() string {
return c.RemoteAddr().String()
}
func (c *ClientV2) Stats() ClientStats {
return ClientStats{
version: "V2",
address: c.RemoteAddr().String(),
name: c.ShortIdentifier,
state: atomic.LoadInt32(&c.State),
readyCount: atomic.LoadInt64(&c.ReadyCount),
inFlightCount: atomic.LoadInt64(&c.InFlightCount),
messageCount: atomic.LoadUint64(&c.MessageCount),
finishCount: atomic.LoadUint64(&c.FinishCount),
requeueCount: atomic.LoadUint64(&c.RequeueCount),
connectTime: c.ConnectTime,
}
}
func (c *ClientV2) IsReadyForMessages() bool {
if c.Channel.IsPaused() {
return false
}
readyCount := atomic.LoadInt64(&c.ReadyCount)
lastReadyCount := atomic.LoadInt64(&c.LastReadyCount)
inFlightCount := atomic.LoadInt64(&c.InFlightCount)
if *verbose {
log.Printf("[%s] state rdy: %4d lastrdy: %4d inflt: %4d", c,
readyCount, lastReadyCount, inFlightCount)
}
if inFlightCount >= lastReadyCount || readyCount <= 0 {
return false
}
return true
}
func (c *ClientV2) SetReadyCount(count int64) {
atomic.StoreInt64(&c.ReadyCount, count)
atomic.StoreInt64(&c.LastReadyCount, count)
c.tryUpdateReadyState()
}
func (c *ClientV2) tryUpdateReadyState() {
// you can always *try* to write to ReadyStateChan because in the cases
// where you cannot the message pump loop would have iterated anyway.
// the atomic integer operations guarantee correctness of the value.
select {
case c.ReadyStateChan <- 1:
default:
}
}
func (c *ClientV2) FinishedMessage() {
atomic.AddUint64(&c.FinishCount, 1)
atomic.AddInt64(&c.InFlightCount, -1)
c.tryUpdateReadyState()
}
func (c *ClientV2) SendingMessage() {
atomic.AddInt64(&c.ReadyCount, -1)
atomic.AddInt64(&c.InFlightCount, 1)
atomic.AddUint64(&c.MessageCount, 1)
}
func (c *ClientV2) TimedOutMessage() {
atomic.AddInt64(&c.InFlightCount, -1)
c.tryUpdateReadyState()
}
func (c *ClientV2) RequeuedMessage() {
atomic.AddUint64(&c.RequeueCount, 1)
atomic.AddInt64(&c.InFlightCount, -1)
c.tryUpdateReadyState()
}
func (c *ClientV2) StartClose() {
// Force the client into ready 0
c.SetReadyCount(0)
// mark this client as closing
atomic.StoreInt32(&c.State, nsq.StateClosing)
// TODO: start a timer to actually close the channel (in case the client doesn't do it first)
}
func (c *ClientV2) Pause() {
c.tryUpdateReadyState()
}
func (c *ClientV2) UnPause() {
c.tryUpdateReadyState()
}