-
-
Notifications
You must be signed in to change notification settings - Fork 157
/
hooks.go
358 lines (322 loc) · 13.5 KB
/
hooks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
package kgo
import (
"net"
"time"
)
// Hook is a hook to be called when something happens in kgo.
//
// The base Hook interface is useless, but wherever a hook can occur in kgo,
// the client checks if your hook implements an appropriate interface. If so,
// your hook is called.
//
// This allows you to only hook in to behavior you care about, and it allows
// the client to add more hooks in the future.
//
// All hook interfaces in this package have Hook in the name. Hooks must be
// safe for concurrent use. It is expected that hooks are fast; if a hook needs
// to take time, then copy what you need and ensure the hook is async.
type Hook interface{}
type hooks []Hook
func (hs hooks) each(fn func(Hook)) {
for _, h := range hs {
fn(h)
}
}
// HookNewClient is called in NewClient after a client is initialized. This
// hook can be used to perform final setup work in your hooks.
type HookNewClient interface {
// OnNewClient is passed the newly initialized client, before any
// client goroutines are started.
OnNewClient(*Client)
}
//////////////////
// BROKER HOOKS //
//////////////////
// HookBrokerConnect is called after a connection to a broker is opened.
type HookBrokerConnect interface {
// OnBrokerConnect is passed the broker metadata, how long it took to
// dial, and either the dial's resulting net.Conn or error.
OnBrokerConnect(meta BrokerMetadata, dialDur time.Duration, conn net.Conn, err error)
}
// HookBrokerDisconnect is called when a connection to a broker is closed.
type HookBrokerDisconnect interface {
// OnBrokerDisconnect is passed the broker metadata and the connection
// that is closing.
OnBrokerDisconnect(meta BrokerMetadata, conn net.Conn)
}
// HookBrokerWrite is called after a write to a broker.
//
// Kerberos SASL does not cause write hooks, since it directly writes to the
// connection.
type HookBrokerWrite interface {
// OnBrokerWrite is passed the broker metadata, the key for the request
// that was written, the number of bytes that were written (may not be
// the whole request if there was an error), how long the request
// waited before being written (including throttling waiting), how long
// it took to write the request, and any error.
//
// The bytes written does not count any tls overhead.
OnBrokerWrite(meta BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error)
}
// HookBrokerRead is called after a read from a broker.
//
// Kerberos SASL does not cause read hooks, since it directly reads from the
// connection.
type HookBrokerRead interface {
// OnBrokerRead is passed the broker metadata, the key for the response
// that was read, the number of bytes read (may not be the whole read
// if there was an error), how long the client waited before reading
// the response, how long it took to read the response, and any error.
//
// The bytes read does not count any tls overhead.
OnBrokerRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error)
}
// BrokerE2E tracks complete information for a write of a request followed by a
// read of that requests's response.
//
// Note that if this is for a produce request with no acks, there will be no
// read wait / time to read.
type BrokerE2E struct {
// BytesWritten is the number of bytes written for this request.
//
// This may not be the whole request if there was an error while writing.
BytesWritten int
// BytesRead is the number of bytes read for this requests's response.
//
// This may not be the whole response if there was an error while
// reading, and this will be zero if there was a write error.
BytesRead int
// WriteWait is the time spent waiting from when this request was
// generated internally in the client to just before the request is
// written to the connection. This number is not included in the
// DurationE2E method.
WriteWait time.Duration
// TimeToWrite is how long a request took to be written on the wire.
// This specifically tracks only how long conn.Write takes.
TimeToWrite time.Duration
// ReadWait tracks the span of time immediately following conn.Write
// until conn.Read begins.
ReadWait time.Duration
// TimeToRead tracks how long conn.Read takes for this request to be
// entirely read. This includes the time it takes to allocate a buffer
// for the response after the initial four size bytes are read.
TimeToRead time.Duration
// WriteErr is any error encountered during writing. If a write error is
// encountered, no read will be attempted.
WriteErr error
// ReadErr is any error encountered during reading.
ReadErr error
}
// DurationE2E returns the e2e time from the start of when a request is written
// to the end of when the response for that request was fully read. If a write
// or read error occurs, this hook is called with all information possible at
// the time (e.g., if a write error occurs, all write info is specified).
//
// Kerberos SASL does not cause this hook, since it directly reads from the
// connection.
func (e *BrokerE2E) DurationE2E() time.Duration {
return e.TimeToWrite + e.ReadWait + e.TimeToRead
}
// Err returns the first of either the write err or the read err. If this
// return is non-nil, the request/response had an error.
func (e *BrokerE2E) Err() error {
if e.WriteErr != nil {
return e.WriteErr
}
return e.ReadErr
}
// HookBrokerE2E is called after a write to a broker that errors, or after a
// read to a broker.
//
// This differs from HookBrokerRead and HookBrokerWrite by tracking all E2E
// info for a write and a read, which allows for easier e2e metrics. This hook
// can replace both the read and write hook.
type HookBrokerE2E interface {
// OnBrokerE2E is passed the broker metadata, the key for the
// request/response that was written/read, and the e2e info for the
// request and response.
OnBrokerE2E(meta BrokerMetadata, key int16, e2e BrokerE2E)
}
// HookBrokerThrottle is called after a response to a request is read
// from a broker, and the response identifies throttling in effect.
type HookBrokerThrottle interface {
// OnBrokerThrottle is passed the broker metadata, the imposed
// throttling interval, and whether the throttle was applied before
// Kafka responded to them request or after.
//
// For Kafka < 2.0, the throttle is applied before issuing a response.
// For Kafka >= 2.0, the throttle is applied after issuing a response.
//
// If throttledAfterResponse is false, then Kafka already applied the
// throttle. If it is true, the client internally will not send another
// request until the throttle deadline has passed.
OnBrokerThrottle(meta BrokerMetadata, throttleInterval time.Duration, throttledAfterResponse bool)
}
//////////
// MISC //
//////////
// HookGroupManageError is called after every error that causes the client,
// operating as a group member, to break out of the group managing loop and
// backoff temporarily.
//
// Specifically, any error that would result in OnPartitionsLost being called
// will result in this hook being called.
type HookGroupManageError interface {
// OnGroupManageError is passed the error that killed a group session.
// This can be used to detect potentially fatal errors and act on them
// at runtime to recover (such as group auth errors, or group max size
// reached).
OnGroupManageError(error)
}
///////////////////////////////
// PRODUCE & CONSUME BATCHES //
///////////////////////////////
// ProduceBatchMetrics tracks information about successful produces to
// partitions.
type ProduceBatchMetrics struct {
// NumRecords is the number of records that were produced in this
// batch.
NumRecords int
// UncompressedBytes is the number of bytes the records serialized as
// before compression.
//
// For record batches (Kafka v0.11.0+), this is the size of the records
// in a batch, and does not include record batch overhead.
//
// For message sets, this size includes message set overhead.
UncompressedBytes int
// CompressedBytes is the number of bytes actually written for this
// batch, after compression. If compression is not used, this will be
// equal to UncompresedBytes.
//
// For record batches, this is the size of the compressed records, and
// does not include record batch overhead.
//
// For message sets, this is the size of the compressed message set.
CompressedBytes int
// CompressionType signifies which algorithm the batch was compressed
// with.
//
// 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is
// zstd.
CompressionType uint8
}
// HookProduceBatchWritten is called whenever a batch is known to be
// successfully produced.
type HookProduceBatchWritten interface {
// OnProduceBatchWritten is called per successful batch written to a
// topic partition
OnProduceBatchWritten(meta BrokerMetadata, topic string, partition int32, metrics ProduceBatchMetrics)
}
// FetchBatchMetrics tracks information about fetches of batches.
type FetchBatchMetrics struct {
// NumRecords is the number of records that were fetched in this batch.
//
// Note that this number includes transaction markers, which are not
// actually returned to the user.
//
// If the batch has an encoding error, this will be 0.
NumRecords int
// UncompressedBytes is the number of bytes the records deserialized
// into after decompresion.
//
// For record batches (Kafka v0.11.0+), this is the size of the records
// in a batch, and does not include record batch overhead.
//
// For message sets, this size includes message set overhead.
//
// Note that this number may be higher than the corresponding number
// when producing, because as an "optimization", Kafka can return
// partial batches when fetching.
UncompressedBytes int
// CompressedBytes is the number of bytes actually read for this batch,
// before decompression. If the batch was not compressed, this will be
// equal to UncompressedBytes.
//
// For record batches, this is the size of the compressed records, and
// does not include record batch overhead.
//
// For message sets, this is the size of the compressed message set.
CompressedBytes int
// CompressionType signifies which algorithm the batch was compressed
// with.
//
// 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is
// zstd.
CompressionType uint8
}
// HookFetchBatchRead is called whenever a batch if read within the client.
//
// Note that this hook is called when processing, but a batch may be internally
// discarded after processing in some uncommon specific circumstances.
//
// If the client reads v0 or v1 message sets, and they are not compressed, then
// this hook will be called per record.
type HookFetchBatchRead interface {
// OnFetchBatchRead is called per batch read from a topic partition.
OnFetchBatchRead(meta BrokerMetadata, topic string, partition int32, metrics FetchBatchMetrics)
}
///////////////////////////////
// PRODUCE & CONSUME RECORDS //
///////////////////////////////
// HookProduceRecordBuffered is called when a record is buffered internally in
// the client from a call to Produce.
//
// This hook can be used to write metrics that gather the number of records or
// bytes buffered, or the hook can be used to write interceptors that modify a
// record's key / value / headers before being produced. If you just want a
// metric for the number of records buffered, use the client's
// BufferedProduceRecords method, as it is faster.
//
// Note that this hook may slow down high-volume producing a bit.
type HookProduceRecordBuffered interface {
// OnProduceRecordBuffered is passed a record that is buffered.
//
// This hook is called immediately after Produce is called, after the
// function potentially sets the default topic.
OnProduceRecordBuffered(*Record)
}
// HookProduceRecordUnbuffered is called just before a record's promise is
// finished; this is effectively a mirror of a record promise.
//
// As an example, if using HookProduceRecordBuffered for a gauge of how many
// record bytes are buffered, this hook can be used to decrement the gauge.
//
// Note that this hook may slow down high-volume producing a bit.
type HookProduceRecordUnbuffered interface {
// OnProduceRecordUnbuffered is passed a record that is just about to
// have its produce promise called, as well as the error that the
// promise will be called with.
OnProduceRecordUnbuffered(*Record, error)
}
// HookFetchRecordBuffered is called when a record is internally buffered after
// fetching, ready to be polled.
//
// This hook can be used to write gauge metrics regarding the number of records
// or bytes buffered, or to write interceptors that modify a record before
// being returned from polling. If you just want a metric for the number of
// records buffered, use the client's BufferedFetchRecords method, as it is
// faster.
//
// Note that this hook may slow down high-volume consuming a bit.
type HookFetchRecordBuffered interface {
// OnFetchRecordBuffered is passed a record that is now buffered, ready
// to be polled.
OnFetchRecordBuffered(*Record)
}
// HookFetchRecordUnbuffered is called when a fetched record is unbuffered.
//
// A record can be internally discarded after being in some scenarios without
// being polled, such as when the internal assignment changes.
//
// As an example, if using HookFetchRecordBuffered for a gauge of how many
// record bytes are buffered ready to be polled, this hook can be used to
// decrement the gauge.
//
// Note that this hook may slow down high-volume consuming a bit.
type HookFetchRecordUnbuffered interface {
// OnFetchRecordUnbuffered is passwed a record that is being
// "unbuffered" within the client, and whether the record is being
// returned from polling.
OnFetchRecordUnbuffered(r *Record, polled bool)
}