forked from couchbase/gocbcore
/
memdclientmux.go
160 lines (129 loc) · 3.45 KB
/
memdclientmux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package gocbcore
import (
"container/list"
"fmt"
)
type memdGetClientFunc func(hostPort string) (*memdClient, error)
type memdClientMux struct {
pipelines []*memdPipeline
deadPipe *memdPipeline
}
func newMemdClientMux(hostPorts []string, poolSize int, queueSize int, getClientFn memdGetClientFunc) *memdClientMux {
mux := &memdClientMux{}
for _, hostPort := range hostPorts {
hostPort := hostPort
getCurClientFn := func() (*memdClient, error) {
return getClientFn(hostPort)
}
pipeline := newPipeline(hostPort, poolSize, queueSize, getCurClientFn)
mux.pipelines = append(mux.pipelines, pipeline)
}
mux.deadPipe = newDeadPipeline(queueSize)
return mux
}
func (mux *memdClientMux) NumPipelines() int {
return len(mux.pipelines)
}
func (mux *memdClientMux) GetPipeline(index int) *memdPipeline {
if index < 0 || index >= len(mux.pipelines) {
return mux.deadPipe
}
return mux.pipelines[index]
}
func (mux *memdClientMux) Start() {
// Initialize new pipelines
for _, pipeline := range mux.pipelines {
pipeline.StartClients()
}
}
func (mux *memdClientMux) Takeover(oldMux *memdClientMux) {
oldPipelines := list.New()
// Gather all our old pipelines up for takeover and what not
if oldMux != nil {
for _, pipeline := range oldMux.pipelines {
oldPipelines.PushBack(pipeline)
}
}
// Build a function to find an existing pipeline
stealPipeline := func(address string) *memdPipeline {
for e := oldPipelines.Front(); e != nil; e = e.Next() {
pipeline, ok := e.Value.(*memdPipeline)
if !ok {
logErrorf("Failed to cast old pipeline")
continue
}
if pipeline.Address() == address {
oldPipelines.Remove(e)
return pipeline
}
}
return nil
}
// Initialize new pipelines (possibly with a takeover)
for _, pipeline := range mux.pipelines {
oldPipeline := stealPipeline(pipeline.Address())
if oldPipeline != nil {
pipeline.Takeover(oldPipeline)
}
pipeline.StartClients()
}
// Shut down any pipelines that were not taken over
for e := oldPipelines.Front(); e != nil; e = e.Next() {
pipeline, ok := e.Value.(*memdPipeline)
if !ok {
logErrorf("Failed to cast old pipeline")
continue
}
err := pipeline.Close()
if err != nil {
logErrorf("Failed to properly close abandoned pipeline (%s)", err)
}
}
if oldMux.deadPipe != nil {
err := oldMux.deadPipe.Close()
if err != nil {
logErrorf("Failed to properly close abandoned dead pipe (%s)", err)
}
}
}
func (mux *memdClientMux) Close() error {
var errs MultiError
for _, pipeline := range mux.pipelines {
err := pipeline.Close()
if err != nil {
errs.add(err)
}
}
if mux.deadPipe != nil {
err := mux.deadPipe.Close()
if err != nil {
errs.add(err)
}
}
return errs.get()
}
// Drain will drain all requests from this muxers pipelines. You must have
// called Takeover against this or Close on this muxer before invoking this...
func (mux *memdClientMux) Drain(cb func(*memdQRequest)) {
for _, pipeline := range mux.pipelines {
logDebugf("Draining queue %+v", pipeline)
pipeline.Drain(cb)
}
if mux.deadPipe != nil {
mux.deadPipe.Drain(cb)
}
}
func (mux *memdClientMux) debugString() string {
var outStr string
for i, n := range mux.pipelines {
outStr += fmt.Sprintf("Pipeline %d:\n", i)
outStr += reindentLog(" ", n.debugString()) + "\n"
}
outStr += "Dead Pipeline:\n"
if mux.deadPipe != nil {
outStr += reindentLog(" ", mux.deadPipe.debugString()) + "\n"
} else {
outStr += " Disabled\n"
}
return outStr
}