forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lb.go
150 lines (132 loc) · 4.07 KB
/
lb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package lb
import (
"sync"
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/op"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
)
// LB balances the sending of events between multiple connections.
//
// The balancing algorithm is mostly pull-based, with multiple workers trying to pull
// some amount of work from a shared queue. Workers will try to get a new work item
// only if they have a working/active connection. Workers without active connection
// do not participate until a connection has been re-established.
// Due to the pull based nature the algorithm will load-balance events by random
// with workers having less latencies/turn-around times potentially getting more
// work items then other workers with higher latencies. Thusly the algorithm
// dynamically adapts to resource availability of server events are forwarded to.
//
// Workers not participating in the load-balancing will continuously try to reconnect
// to their configured endpoints. Once a new connection has been established,
// these workers will participate in in load-balancing again.
//
// If a connection becomes unavailable, the events are rescheduled for another
// connection to pick up. Rescheduling events is limited to a maximum number of
// send attempts. If events have not been send after maximum number of allowed
// attemps has been passed, they will be dropped.
//
// Like network connections, distributing events to workers is subject to
// timeout. If no worker is available to pickup a message for sending, the message
// will be dropped internally after max_retries. If mode or message requires
// guaranteed send, message is retried infinitely.
type LB struct {
ctx context
// waitGroup + signaling channel for handling shutdown
wg sync.WaitGroup
}
var (
debugf = logp.MakeDebug("output")
)
func NewSync(
clients []mode.ProtocolClient,
maxAttempts int,
waitRetry, timeout, maxWaitRetry time.Duration,
) (*LB, error) {
return New(SyncClients(clients, waitRetry, maxWaitRetry),
maxAttempts, timeout)
}
func NewAsync(
clients []mode.AsyncProtocolClient,
maxAttempts int,
waitRetry, timeout, maxWaitRetry time.Duration,
) (*LB, error) {
return New(AsyncClients(clients, waitRetry, maxWaitRetry),
maxAttempts, timeout)
}
// New create a new load balancer connection mode.
func New(
makeWorkers WorkerFactory,
maxAttempts int,
timeout time.Duration,
) (*LB, error) {
debugf("configure maxattempts: %v", maxAttempts)
// maxAttempts signals infinite retry. Convert to -1, so attempts left and
// and infinite retry can be more easily distinguished by load balancer
if maxAttempts == 0 {
maxAttempts = -1
}
m := &LB{
ctx: makeContext(makeWorkers.count(), maxAttempts, timeout),
}
if err := m.start(makeWorkers); err != nil {
return nil, err
}
return m, nil
}
// Close stops all workers and closes all open connections. In flight events
// are signaled as failed.
func (m *LB) Close() error {
m.ctx.Close()
m.wg.Wait()
return nil
}
func (m *LB) start(makeWorkers WorkerFactory) error {
var waitStart sync.WaitGroup
run := func(w worker) {
defer m.wg.Done()
waitStart.Done()
w.run()
}
workers, err := makeWorkers.mk(m.ctx)
if err != nil {
return err
}
for _, w := range workers {
m.wg.Add(1)
waitStart.Add(1)
go run(w)
}
waitStart.Wait()
return nil
}
// PublishEvent forwards the event to some load balancing worker.
func (m *LB) PublishEvent(
signaler op.Signaler,
opts outputs.Options,
event common.MapStr,
) error {
return m.publishEventsMessage(opts, eventsMessage{
worker: -1,
signaler: signaler,
event: event,
})
}
// PublishEvents forwards events to some load balancing worker.
func (m *LB) PublishEvents(
signaler op.Signaler,
opts outputs.Options,
events []common.MapStr,
) error {
return m.publishEventsMessage(opts, eventsMessage{
worker: -1,
signaler: signaler,
events: events,
})
}
func (m *LB) publishEventsMessage(opts outputs.Options, msg eventsMessage) error {
m.ctx.pushEvents(msg, opts.Guaranteed)
return nil
}