/
request_response.go
89 lines (73 loc) · 2.67 KB
/
request_response.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package client
import "sync"
var sizingEncoderPool = sync.Pool{
New: func() interface{} {
return NewSizingEncoder()
},
}
// RequestHeader is used to decouple the message header/metadata writing from the actual message.
// It is able to accept a request and encode/write it according to Kafka Wire Protocol format
// adding the correlation id and client id to the request.
type RequestHeader struct {
correlationID int32
clientID string
request Request
}
// NewRequestHeader creates a new RequestHeader holding the correlation id, client id and the actual request.
func NewRequestHeader(correlationID int32, clientID string, request Request) *RequestHeader {
return &RequestHeader{
correlationID: correlationID,
clientID: clientID,
request: request,
}
}
// Size returns the size in bytes needed to write this request, including the length field. This value will be used when allocating memory for a byte array.
func (rw *RequestHeader) Size() int32 {
encoder := sizingEncoderPool.Get().(*SizingEncoder)
rw.Write(encoder)
size := encoder.Size()
encoder.Reset()
sizingEncoderPool.Put(encoder)
return size
}
// Write writes this RequestHeader into a given Encoder.
func (rw *RequestHeader) Write(encoder Encoder) {
// write the size of request excluding the length field with length 4
encoder.WriteInt32(encoder.Size() - 4)
encoder.WriteInt16(rw.request.Key())
encoder.WriteInt16(rw.request.Version())
encoder.WriteInt32(rw.correlationID)
encoder.WriteString(rw.clientID)
rw.request.Write(encoder)
}
// Request is a generic interface for any request issued to Kafka. Must be able to identify and write itself.
type Request interface {
// Writes the Request to the given Encoder.
Write(Encoder)
// Returns the Kafka API key for this Request.
Key() int16
// Returns the Kafka request version for backwards compatibility.
Version() int16
}
// Response is a generic interface for any response received from Kafka. Must be able to read itself.
type Response interface {
// Read the Response from the given Decoder. May return a DecodingError if the response is invalid.
Read(Decoder) *DecodingError
}
// DecodingError is an error that also holds the information about why it happened.
type DecodingError struct {
err error
reason string
}
// NewDecodingError creates a new DecodingError with a given error message and reason.
func NewDecodingError(err error, reason string) *DecodingError {
return &DecodingError{err, reason}
}
// Error returns the error message for this DecodingError.
func (de *DecodingError) Error() error {
return de.err
}
// Reason returns the reason for this DecodingError.
func (de *DecodingError) Reason() string {
return de.reason
}