Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ type CompressionCodec interface {
Decode(src []byte) ([]byte, error)
}

const compressionCodecMask int8 = 0x03
const compressionCodecMask int8 = 0x07
const DefaultCompressionLevel int = -1
const CompressionNoneCode = 0
4 changes: 2 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
adjustedDeadline = deadline
return writeFetchRequestV2(
return writeFetchRequestV10(
&c.wbuf,
id,
c.clientID,
Expand All @@ -619,7 +619,7 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
return &Batch{err: dontExpectEOF(err)}
}

throttle, highWaterMark, remain, err := readFetchResponseHeader(&c.rbuf, size)
throttle, highWaterMark, remain, err := readFetchResponseHeaderV10(&c.rbuf, size)
return &Batch{
conn: c,
msgs: newMessageSetReader(&c.rbuf, remain),
Expand Down
105 changes: 105 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,27 @@ const (
TransactionalIDAuthorizationFailed Error = 53
SecurityDisabled Error = 54
BrokerAuthorizationFailed Error = 55
KafkaStorageError Error = 56
LogDirNotFound Error = 57
SASLAuthenticationFailed Error = 58
UnknownProducerId Error = 59
ReassignmentInProgress Error = 60
DelegationTokenAuthDisabled Error = 61
DelegationTokenNotFound Error = 62
DelegationTokenOwnerMismatch Error = 63
DelegationTokenRequestNotAllowed Error = 64
DelegationTokenAuthorizationFailed Error = 65
DelegationTokenExpired Error = 66
InvalidPrincipalType Error = 67
NonEmptyGroup Error = 68
GroupIdNotFound Error = 69
FetchSessionIDNotFound Error = 70
InvalidFetchSessionEpoch Error = 71
ListenerNotFound Error = 72
TopicDeletionDisabled Error = 73
FencedLeaderEpoch Error = 74
UnknownLeaderEpoch Error = 75
UnsupportedCompressionType Error = 76
)

// Error satisfies the error interface.
Expand Down Expand Up @@ -201,6 +222,48 @@ func (e Error) Title() string {
return "Security Disabled"
case BrokerAuthorizationFailed:
return "Broker Authorization Failed"
case KafkaStorageError:
return "Kafka Storage Error"
case LogDirNotFound:
return "Log Dir Not Found"
case SASLAuthenticationFailed:
return "SASL Authentication Failed"
case UnknownProducerId:
return "Unknown Producer ID"
case ReassignmentInProgress:
return "Reassignment In Progress"
case DelegationTokenAuthDisabled:
return "Delegation Token Auth Disabled"
case DelegationTokenNotFound:
return "Delegation Token Not Found"
case DelegationTokenOwnerMismatch:
return "Delegation Token Owner Mismatch"
case DelegationTokenRequestNotAllowed:
return "Delegation Token Request Not Allowed"
case DelegationTokenAuthorizationFailed:
return "Delegation Token Authorization Failed"
case DelegationTokenExpired:
return "Delegation Token Expired"
case InvalidPrincipalType:
return "Invalid Principal Type"
case NonEmptyGroup:
return "Non Empty Group"
case GroupIdNotFound:
return "Group ID Not Found"
case FetchSessionIDNotFound:
return "Fetch Session ID Not Found"
case InvalidFetchSessionEpoch:
return "Invalid Fetch Session Epoch"
case ListenerNotFound:
return "Listener Not Found"
case TopicDeletionDisabled:
return "Topic Deletion Disabled"
case FencedLeaderEpoch:
return "Fenced Leader Epoch"
case UnknownLeaderEpoch:
return "Unknown Leader Epoch"
case UnsupportedCompressionType:
return "Unsupported Compression Type"
}
return ""
}
Expand Down Expand Up @@ -318,6 +381,48 @@ func (e Error) Description() string {
return "the security features are disabled"
case BrokerAuthorizationFailed:
return "the broker authorization failed"
case KafkaStorageError:
return "disk error when trying to access log file on the disk"
case LogDirNotFound:
return "the user-specified log directory is not found in the broker config"
case SASLAuthenticationFailed:
return "SASL Authentication failed"
case UnknownProducerId:
return "the broker could not locate the producer metadata associated with the producer ID"
case ReassignmentInProgress:
return "a partition reassignment is in progress"
case DelegationTokenAuthDisabled:
return "delegation token feature is not enabled"
case DelegationTokenNotFound:
return "delegation token is not found on server"
case DelegationTokenOwnerMismatch:
return "specified principal is not valid owner/renewer"
case DelegationTokenRequestNotAllowed:
return "delegation token requests are not allowed on plaintext/1-way ssl channels and on delegation token authenticated channels"
case DelegationTokenAuthorizationFailed:
return "delegation token authorization failed"
case DelegationTokenExpired:
return "delegation token is expired"
case InvalidPrincipalType:
return "supplied principaltype is not supported"
case NonEmptyGroup:
return "the group is not empty"
case GroupIdNotFound:
return "the group ID does not exist"
case FetchSessionIDNotFound:
return "the fetch session ID was not found"
case InvalidFetchSessionEpoch:
return "the fetch session epoch is invalid"
case ListenerNotFound:
return "there is no listener on the leader broker that matches the listener on which metadata request was processed"
case TopicDeletionDisabled:
return "topic deletion is disabled"
case FencedLeaderEpoch:
return "the leader epoch in the request is older than the epoch on the broker"
case UnknownLeaderEpoch:
return "the leader epoch in the request is newer than the epoch on the broker"
case UnsupportedCompressionType:
return "the requesting client does not support the compression type of given partition"
}
return ""
}
Expand Down
21 changes: 21 additions & 0 deletions error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,27 @@ func TestError(t *testing.T) {
TransactionalIDAuthorizationFailed,
SecurityDisabled,
BrokerAuthorizationFailed,
KafkaStorageError,
LogDirNotFound,
SASLAuthenticationFailed,
UnknownProducerID,
ReassignmentInProgress,
DelegationTokenAuthDisabled,
DelegationTokenNotFound,
DelegationTokenOwnerMismatch,
DelegationTokenRequestNotAllowed,
DelegationTokenAuthorizationFailed,
DelegationTokenExpired,
InvalidPrincipalType,
NonEmptyGroup,
GroupIDNotFound,
FetchSessionIDNotFound,
InvalidFetchSessionEpoch,
ListenerNotFound,
TopicDeletionDisabled,
FencedLeaderEpoch,
UnknownLeaderEpoch,
UnsupportedCompressionType,
}

for _, err := range errorCodes {
Expand Down
9 changes: 5 additions & 4 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ const (
type apiVersion int16

const (
v0 apiVersion = 0
v1 apiVersion = 1
v2 apiVersion = 2
v3 apiVersion = 3
v0 apiVersion = 0
v1 apiVersion = 1
v2 apiVersion = 2
v3 apiVersion = 3
v10 apiVersion = 10
)

type requestHeader struct {
Expand Down
78 changes: 78 additions & 0 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,84 @@ func readFetchResponseHeader(r *bufio.Reader, size int) (throttle int32, waterma
return
}

func readFetchResponseHeaderV10(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) {
var (
n int32
errorCode int16
fetchSessionID int32
p struct {
Partition int32
ErrorCode int16
HighwaterMarkOffset int64
LastStableOffset int64
LogStartOffset int64
}
)

if remain, err = readInt32(r, size, &throttle); err != nil {
return
}

// Global error code
if remain, err = readInt16(r, remain, &errorCode); err != nil {
return
}
if errorCode != 0 {
err = Error(errorCode)
return
}

if remain, err = readInt32(r, remain, &fetchSessionID); err != nil {
return
}

if remain, err = readInt32(r, remain, &n); err != nil {
return
}

// This error should never trigger, unless there's a bug in the kafka client
// or server.
if n != 1 {
err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n)
return
}

// We ignore the topic name because we've requests messages for a single
// topic, unless there's a bug in the kafka server we will have received
// the name of the topic that we requested.
if remain, err = discardString(r, remain); err != nil {
return
}

if remain, err = readInt32(r, remain, &n); err != nil {
return
}

// This error should never trigger, unless there's a bug in the kafka client
// or server.
if n != 1 {
err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n)
return
}

if remain, err = read(r, remain, &p); err != nil {
return
}

if p.ErrorCode != 0 {
err = Error(p.ErrorCode)
return
}

// Ignore the aborted transactions since we don't support it
if remain, err = discardInt32(r, remain); err != nil {
return
}

watermark = p.HighwaterMarkOffset
return
}

func readMessageHeader(r *bufio.Reader, sz int) (offset int64, attributes int8, timestamp int64, remain int, err error) {
var version int8

Expand Down
55 changes: 55 additions & 0 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,61 @@ func writeFetchRequestV2(w *bufio.Writer, correlationID int32, clientID, topic s
return w.Flush()
}

// writeFetchRequestV10 implements the v10 of the FetchRequest.
// v10 is required to use zstd compression.
// Defined in http://kafka.apache.org/protocol.html#The_Messages_Fetch
func writeFetchRequestV10(w *bufio.Writer, correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration) error {
h := requestHeader{
ApiKey: int16(fetchRequest),
ApiVersion: int16(v10),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
4 + // replica ID
4 + // max wait time
4 + // min bytes
4 + // max bytes
1 + // isolation level
4 + // fetch session ID
4 + // fetch session epoch
4 + // topic array length
sizeofString(topic) +
4 + // partition array length
4 + // partition
4 + // current leader epoch
8 + // offset
8 + // log start offset
4 + // partition max bytes
4 // forgotten topics array length

h.writeTo(w)
writeInt32(w, -1) // replica ID
writeInt32(w, milliseconds(maxWait))
writeInt32(w, int32(minBytes))
writeInt32(w, int32(maxBytes))
writeInt8(w, 0) // no support for isolation level yet
writeInt32(w, 0) // no support for fetch session id yet
writeInt32(w, -1) // no support for fetch session epoch yet

// topic array
writeArrayLen(w, 1)
writeString(w, topic)

// partition array
writeArrayLen(w, 1)
writeInt32(w, partition)
writeInt32(w, -1) // no partition leader epoch
writeInt64(w, offset)
writeInt64(w, -1) // log start offset
writeInt32(w, int32(maxBytes))

// forgotten topics array
writeArrayLen(w, 0) // forgotten topics not supported yet

return w.Flush()
}

func writeListOffsetRequestV1(w *bufio.Writer, correlationID int32, clientID, topic string, partition int32, time int64) error {
h := requestHeader{
ApiKey: int16(listOffsetRequest),
Expand Down