/
remoteserver.go
403 lines (364 loc) · 11.1 KB
/
remoteserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
// Copyright 2016 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package pubsub
import (
"fmt"
"sync"
"time"
"github.com/juju/clock"
"github.com/juju/collections/deque"
"github.com/juju/errors"
"github.com/juju/pubsub/v2"
"github.com/juju/retry"
"github.com/juju/worker/v3"
"gopkg.in/tomb.v2"
"github.com/juju/juju/api"
"github.com/juju/juju/pubsub/forwarder"
"github.com/juju/juju/rpc/params"
)
// RemoteServer represents the public interface of the worker
// responsible for forwarding messages to a single other API server.
type RemoteServer interface {
worker.Worker
Reporter
UpdateAddresses(addresses []string)
Publish(message *params.PubSubMessage)
}
// remoteServer is responsible for taking messages and sending them to the
// pubsub endpoint on the remote server. If the connection is dropped, the
// remoteServer will try to reconnect. Messages are not sent until the
// connection either succeeds the first time, or fails to connect. Once there
// is a failure, incoming messages are dropped until reconnection is complete,
// then messages will flow again.
type remoteServer struct {
origin string
target string
info *api.Info
logger Logger
newWriter func(*api.Info) (MessageWriter, error)
connection MessageWriter
hub *pubsub.StructuredHub
tomb tomb.Tomb
clock clock.Clock
mutex sync.Mutex
pending *deque.Deque
data chan struct{}
stopConnecting chan struct{}
connectionReset chan struct{}
sent uint64
unsubscribe func()
}
// RemoteServerConfig defines all the attributes that are needed for a RemoteServer.
type RemoteServerConfig struct {
// Hub is used to publish connection messages
Hub *pubsub.StructuredHub
Origin string
Target string
Clock clock.Clock
Logger Logger
// APIInfo is initially populated with the addresses of the target machine.
APIInfo *api.Info
NewWriter func(*api.Info) (MessageWriter, error)
}
// NewRemoteServer creates a new RemoteServer that will connect to the remote
// apiserver and pass on messages to the pubsub endpoint of that apiserver.
func NewRemoteServer(config RemoteServerConfig) (RemoteServer, error) {
remote := &remoteServer{
origin: config.Origin,
target: config.Target,
info: config.APIInfo,
logger: config.Logger,
newWriter: config.NewWriter,
hub: config.Hub,
clock: config.Clock,
pending: deque.New(),
data: make(chan struct{}),
}
unsub, err := remote.hub.Subscribe(forwarder.ConnectedTopic, remote.onForwarderConnection)
if err != nil {
return nil, errors.Trace(err)
}
remote.unsubscribe = unsub
remote.tomb.Go(remote.loop)
return remote, nil
}
// Report provides information to the engine report.
// It should be fast and minimally blocking.
func (r *remoteServer) Report() map[string]interface{} {
r.mutex.Lock()
defer r.mutex.Unlock()
var status string
if r.connection == nil {
status = "disconnected"
} else {
status = "connected"
}
result := map[string]interface{}{
"status": status,
"sent": r.sent,
}
if r.info != nil {
result["addresses"] = r.info.Addrs
}
if r.pending != nil {
result["queue-len"] = r.pending.Len()
}
return result
}
// IntrospectionReport is the method called by the subscriber to get
// information about this server.
func (r *remoteServer) IntrospectionReport() string {
r.mutex.Lock()
defer r.mutex.Unlock()
var status string
if r.connection == nil {
status = "disconnected"
} else {
status = "connected"
}
return fmt.Sprintf(""+
" Status: %s\n"+
" Addresses: %v\n"+
" Queue length: %d\n"+
" Sent count: %d\n",
status, r.info.Addrs, r.pending.Len(), r.sent)
}
func (r *remoteServer) onForwarderConnection(topic string, details forwarder.OriginTarget, err error) {
if err != nil {
// This should never happen.
r.logger.Errorf("subscriber callback error: %v", err)
return
}
if details.Target == r.origin && details.Origin == r.target {
// If we have just been connected to by the apiserver that we are
// trying to connect to, interrupt any waiting we may be doing and try
// again as we may be in the middle of a long wait.
r.interruptConnecting()
}
}
// UpdateAddresses will update the addresses held for the target API server.
// If we are currently trying to connect to the target, interrupt it so we
// can try again with the new addresses.
func (r *remoteServer) UpdateAddresses(addresses []string) {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.connection == nil && r.stopConnecting != nil {
// We are probably trying to reconnect, so interrupt that so we don't
// get a race between setting addresses and trying to read them to
// connect. Note that we don't call the interruptConnecting method
// here because that method also tries to lock the mutex.
r.logger.Debugf("interrupting connecting due to new addresses: %v", addresses)
close(r.stopConnecting)
r.stopConnecting = nil
}
r.info.Addrs = addresses
}
// Publish queues up the message if and only if we have an active connection to
// the target apiserver.
func (r *remoteServer) Publish(message *params.PubSubMessage) {
select {
case <-r.tomb.Dying():
r.logger.Tracef("dying, don't send %q", message.Topic)
default:
r.mutex.Lock()
// Only queue the message up if we are currently connected.
notifyData := false
if r.connection != nil {
r.logger.Tracef("queue up topic %q", message.Topic)
r.pending.PushBack(message)
notifyData = r.pending.Len() == 1
} else {
r.logger.Tracef("skipping %q for %s as not connected", message.Topic, r.target)
}
r.mutex.Unlock()
if notifyData {
select {
case r.data <- struct{}{}:
case <-r.connectionReset:
r.logger.Debugf("connection reset while notifying %q for %s", message.Topic, r.target)
}
}
}
}
// nextMessage returns the next queued message, and a flag to indicate empty.
func (r *remoteServer) nextMessage() *params.PubSubMessage {
r.mutex.Lock()
defer r.mutex.Unlock()
val, ok := r.pending.PopFront()
if !ok {
// nothing to do
return nil
}
// Even though it isn't exactly sent right now, it effectively will
// be very soon, and we want to keep this counter in the mutex lock.
r.sent++
return val.(*params.PubSubMessage)
}
func (r *remoteServer) connect() bool {
stop := make(chan struct{})
r.mutex.Lock()
r.stopConnecting = stop
r.mutex.Unlock()
var connection MessageWriter
r.logger.Debugf("connecting to %s", r.target)
_ = retry.Call(retry.CallArgs{
Func: func() error {
r.logger.Debugf("open api to %s: %v", r.target, r.info.Addrs)
conn, err := r.newWriter(r.info)
if err != nil {
r.logger.Tracef("unable to get message writer for %s, reconnecting... : %v\n%s", r.target, err, errors.ErrorStack(err))
return errors.Trace(err)
}
connection = conn
return nil
},
Attempts: retry.UnlimitedAttempts,
Delay: time.Second,
MaxDelay: 5 * time.Minute,
BackoffFunc: retry.DoubleDelay,
Stop: stop,
Clock: r.clock,
})
r.mutex.Lock()
r.stopConnecting = nil
defer r.mutex.Unlock()
if connection != nil {
r.connection = connection
r.connectionReset = make(chan struct{})
r.logger.Infof("forwarding connected %s -> %s", r.origin, r.target)
_, err := r.hub.Publish(
forwarder.ConnectedTopic,
// NOTE: origin is filled in by the the central hub annotations.
forwarder.OriginTarget{Target: r.target})
if err != nil {
r.logger.Errorf("%v", err)
}
return true
}
return false
}
func (r *remoteServer) loop() error {
defer r.unsubscribe()
var delay <-chan time.Time
messageToSend := make(chan *params.PubSubMessage)
messageSent := make(chan *params.PubSubMessage)
go r.forwardMessages(messageToSend, messageSent)
for {
if r.connection == nil {
// If we don't have a current connection, try to get one.
if r.connect() {
delay = nil
} else {
// Skip through the select to try to reconnect.
delay = r.clock.After(time.Second)
}
}
select {
case <-r.tomb.Dying():
r.logger.Debugf("worker shutting down")
r.resetConnection()
return tomb.ErrDying
case <-r.data:
// Has new data been pushed on?
r.logger.Tracef("new messages")
case <-delay:
// If we failed to connect for whatever reason, this means we don't cycle
// immediately.
r.logger.Tracef("connect delay")
}
r.logger.Tracef("send pending messages")
r.sendPendingMessages(messageToSend, messageSent)
}
}
func (r *remoteServer) sendPendingMessages(messageToSend chan<- *params.PubSubMessage, messageSent <-chan *params.PubSubMessage) {
for message := r.nextMessage(); message != nil; message = r.nextMessage() {
select {
case <-r.tomb.Dying():
return
case messageToSend <- message:
// Just in case the worker dies while we are trying to send.
}
select {
case <-r.tomb.Dying():
// This will cause the main loop to iterate around, and close
// the connection before returning.
return
case <-messageSent:
// continue on to next
}
}
}
func (r *remoteServer) resetConnection() {
r.mutex.Lock()
defer r.mutex.Unlock()
// If we have already been reset, just return
if r.connection == nil {
return
}
r.logger.Debugf("closing connection and clearing pending")
r.connection.Close()
r.connection = nil
close(r.connectionReset)
// Discard all pending messages.
r.pending = deque.New()
// Tell everyone what we have been disconnected.
_, err := r.hub.Publish(
forwarder.DisconnectedTopic,
// NOTE: origin is filled in by the the central hub annotations.
forwarder.OriginTarget{Target: r.target})
if err != nil {
r.logger.Errorf("%v", err)
}
}
// forwardMessages is a goroutine whose sole purpose is to get messages off
// the messageToSend channel, try to send them over the API, and say when they
// are done with this message. This allows for the potential blocking call of
// `ForwardMessage`. If this does block for whatever reason and the worker is
// asked to shutdown, the main loop method is able to do so. That would cause
// the API connection to be closed, which would cause the `ForwardMessage` to
// be unblocked due to the error of the socket closing.
func (r *remoteServer) forwardMessages(messageToSend <-chan *params.PubSubMessage, messageSent chan<- *params.PubSubMessage) {
var message *params.PubSubMessage
for {
select {
case <-r.tomb.Dying():
return
case message = <-messageToSend:
}
r.mutex.Lock()
conn := r.connection
r.mutex.Unlock()
r.logger.Tracef("forwarding %q to %s, data %v", message.Topic, r.target, message.Data)
if conn != nil {
err := conn.ForwardMessage(message)
if err != nil {
// Some problem sending, so log, close the connection, and try to reconnect.
r.logger.Infof("unable to forward message, reconnecting... : %v", err)
r.resetConnection()
}
}
select {
case <-r.tomb.Dying():
return
case messageSent <- message:
}
}
}
func (r *remoteServer) interruptConnecting() {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.stopConnecting != nil {
r.logger.Debugf("interrupting the pending connect loop")
close(r.stopConnecting)
r.stopConnecting = nil
}
}
// Kill is part of the worker.Worker interface.
func (r *remoteServer) Kill() {
r.tomb.Kill(nil)
r.interruptConnecting()
}
// Wait is part of the worker.Worker interface.
func (r *remoteServer) Wait() error {
return r.tomb.Wait()
}