/
firehoseclient.go
199 lines (179 loc) · 5.49 KB
/
firehoseclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package firehoseclient
import (
"context"
"crypto/tls"
"sync"
"time"
"github.com/cloudfoundry-community/firehose-to-syslog/stats"
gendiodes "code.cloudfoundry.org/go-diodes"
"github.com/cloudfoundry-community/firehose-to-syslog/diodes"
"github.com/cloudfoundry-community/firehose-to-syslog/eventRouting"
"github.com/cloudfoundry-community/firehose-to-syslog/logging"
"github.com/cloudfoundry-community/firehose-to-syslog/uaatokenrefresher"
"github.com/cloudfoundry/noaa/consumer"
noaerrors "github.com/cloudfoundry/noaa/errors"
"github.com/cloudfoundry/sonde-go/events"
"github.com/gorilla/websocket"
)
type FirehoseNozzle struct {
errs <-chan error
Readerrs chan error
messages <-chan *events.Envelope
consumer *consumer.Consumer
eventRouting eventRouting.EventRouting
config *FirehoseConfig
uaaRefresher consumer.TokenRefresher
envelopeBuffer *diodes.OneToOneEnvelope
stopReading chan struct{}
stopRouting chan struct{}
Stats *stats.Stats
}
type FirehoseConfig struct {
MinRetryDelay time.Duration
MaxRetryDelay time.Duration
MaxRetryCount int
TrafficControllerURL string
InsecureSSLSkipVerify bool
IdleTimeoutSeconds time.Duration
FirehoseSubscriptionID string
BufferSize int
}
var (
wg sync.WaitGroup
)
func NewFirehoseNozzle(uaaR *uaatokenrefresher.UAATokenRefresher,
eventRouting eventRouting.EventRouting,
firehoseconfig *FirehoseConfig,
stats *stats.Stats) *FirehoseNozzle {
return &FirehoseNozzle{
errs: make(<-chan error),
Readerrs: make(chan error),
messages: make(<-chan *events.Envelope),
eventRouting: eventRouting,
config: firehoseconfig,
uaaRefresher: uaaR,
envelopeBuffer: diodes.NewOneToOneEnvelope(firehoseconfig.BufferSize, gendiodes.AlertFunc(func(missed int) {
logging.LogError("Missed Logs ", missed)
})),
stopReading: make(chan struct{}),
stopRouting: make(chan struct{}),
Stats: stats,
}
}
//Start consumer and reading ingest loop
func (f *FirehoseNozzle) Start(ctx context.Context) {
f.consumeFirehose()
wg.Add(2)
go f.routeEvent(ctx)
go f.ReadLogsBuffer(ctx)
}
//Stop reading loop
func (f *FirehoseNozzle) StopReading() {
close(f.stopRouting)
close(f.stopReading)
//Need to be sure both of the GoRoutine are stop
wg.Wait()
}
func (f *FirehoseNozzle) consumeFirehose() {
f.consumer = consumer.New(
f.config.TrafficControllerURL,
&tls.Config{InsecureSkipVerify: f.config.InsecureSSLSkipVerify},
nil)
f.consumer.RefreshTokenFrom(f.uaaRefresher)
f.consumer.SetIdleTimeout(f.config.IdleTimeoutSeconds)
f.consumer.SetMinRetryDelay(f.config.MinRetryDelay)
f.consumer.SetMaxRetryDelay(f.config.MaxRetryDelay)
f.consumer.SetMaxRetryCount(f.config.MaxRetryCount)
f.messages, f.errs = f.consumer.Firehose(f.config.FirehoseSubscriptionID, "")
}
func (f *FirehoseNozzle) ReadLogsBuffer(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
logging.LogStd("Cancel ReadLogsBuffer Goroutine", true)
return
case <-f.stopRouting:
logging.LogStd("Stopping Routing Loop", true)
return
default:
envelope, empty := f.envelopeBuffer.TryNext()
if envelope == nil && !empty {
// Brief sleep to Avoid hammering on CPU
time.Sleep(1 * time.Millisecond)
continue
}
f.handleMessage(envelope)
f.eventRouting.RouteEvent(envelope)
f.Stats.Dec(stats.SubInputBuffer)
}
}
}
func (f *FirehoseNozzle) Draining(ctx context.Context) {
logging.LogStd("Starting Draining", true)
for {
select {
case <-ctx.Done():
logging.LogStd("Stopping ReadLogsBuffer Goroutine", true)
return
default:
envelope, empty := f.envelopeBuffer.TryNext()
if envelope == nil && !empty {
logging.LogStd("Finishing Draining", true)
return
}
f.handleMessage(envelope)
f.eventRouting.RouteEvent(envelope)
f.Stats.Dec(stats.SubInputBuffer)
}
}
}
func (f *FirehoseNozzle) routeEvent(ctx context.Context) {
defer wg.Done()
eventsSelected := f.eventRouting.GetSelectedEvents()
for {
select {
case envelope := <-f.messages:
//Only take what we need
if eventsSelected[envelope.GetEventType().String()] {
f.envelopeBuffer.Set(envelope)
f.Stats.Inc(stats.SubInputBuffer)
}
case err := <-f.errs:
f.handleError(err)
retryrerr := f.handleError(err)
if !retryrerr {
logging.LogError("RouteEvent Loop Error ", err)
return
}
case <-ctx.Done():
logging.LogStd("Closing routing event routine", true)
return
case <-f.stopReading:
logging.LogStd("Stopping Reading from Firehose", true)
return
}
}
}
func (f *FirehoseNozzle) handleError(err error) bool {
logging.LogError("Error while reading from the Firehose: ", err)
switch err.(type) {
case noaerrors.RetryError:
switch noaRetryError := err.(noaerrors.RetryError).Err.(type) {
case *websocket.CloseError:
switch noaRetryError.Code {
case websocket.ClosePolicyViolation:
logging.LogError("Nozzle couldn't keep up. Please try scaling up the Nozzle.", err)
}
}
return true
}
logging.LogStd("Closing connection with Firehose...", true)
f.consumer.Close()
return false
}
func (f *FirehoseNozzle) handleMessage(envelope *events.Envelope) {
if envelope.GetEventType() == events.Envelope_CounterEvent && envelope.CounterEvent.GetName() == "TruncatingBuffer.DroppedMessages" && envelope.GetOrigin() == "doppler" {
logging.LogStd("We've intercepted an upstream message which indicates that the nozzle or the TrafficController is not keeping up. Please try scaling up the nozzle.", true)
}
}