diff --git a/compression.go b/compression.go index 386138b68..a54b6ac34 100644 --- a/compression.go +++ b/compression.go @@ -47,6 +47,6 @@ type CompressionCodec interface { Decode(src []byte) ([]byte, error) } -const compressionCodecMask int8 = 0x03 +const compressionCodecMask int8 = 0x07 const DefaultCompressionLevel int = -1 const CompressionNoneCode = 0 diff --git a/conn.go b/conn.go index 853ea6945..3dcb30194 100644 --- a/conn.go +++ b/conn.go @@ -598,7 +598,7 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch { now := time.Now() deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) adjustedDeadline = deadline - return writeFetchRequestV2( + return writeFetchRequestV10( &c.wbuf, id, c.clientID, @@ -619,7 +619,7 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch { return &Batch{err: dontExpectEOF(err)} } - throttle, highWaterMark, remain, err := readFetchResponseHeader(&c.rbuf, size) + throttle, highWaterMark, remain, err := readFetchResponseHeaderV10(&c.rbuf, size) return &Batch{ conn: c, msgs: newMessageSetReader(&c.rbuf, remain), diff --git a/error.go b/error.go index 0559af628..8034308c8 100644 --- a/error.go +++ b/error.go @@ -64,6 +64,27 @@ const ( TransactionalIDAuthorizationFailed Error = 53 SecurityDisabled Error = 54 BrokerAuthorizationFailed Error = 55 + KafkaStorageError Error = 56 + LogDirNotFound Error = 57 + SASLAuthenticationFailed Error = 58 + UnknownProducerId Error = 59 + ReassignmentInProgress Error = 60 + DelegationTokenAuthDisabled Error = 61 + DelegationTokenNotFound Error = 62 + DelegationTokenOwnerMismatch Error = 63 + DelegationTokenRequestNotAllowed Error = 64 + DelegationTokenAuthorizationFailed Error = 65 + DelegationTokenExpired Error = 66 + InvalidPrincipalType Error = 67 + NonEmptyGroup Error = 68 + GroupIdNotFound Error = 69 + FetchSessionIDNotFound Error = 70 + InvalidFetchSessionEpoch Error = 71 + ListenerNotFound Error = 72 + TopicDeletionDisabled Error = 73 + FencedLeaderEpoch Error = 74 + UnknownLeaderEpoch Error = 75 + UnsupportedCompressionType Error = 76 ) // Error satisfies the error interface. @@ -201,6 +222,48 @@ func (e Error) Title() string { return "Security Disabled" case BrokerAuthorizationFailed: return "Broker Authorization Failed" + case KafkaStorageError: + return "Kafka Storage Error" + case LogDirNotFound: + return "Log Dir Not Found" + case SASLAuthenticationFailed: + return "SASL Authentication Failed" + case UnknownProducerId: + return "Unknown Producer ID" + case ReassignmentInProgress: + return "Reassignment In Progress" + case DelegationTokenAuthDisabled: + return "Delegation Token Auth Disabled" + case DelegationTokenNotFound: + return "Delegation Token Not Found" + case DelegationTokenOwnerMismatch: + return "Delegation Token Owner Mismatch" + case DelegationTokenRequestNotAllowed: + return "Delegation Token Request Not Allowed" + case DelegationTokenAuthorizationFailed: + return "Delegation Token Authorization Failed" + case DelegationTokenExpired: + return "Delegation Token Expired" + case InvalidPrincipalType: + return "Invalid Principal Type" + case NonEmptyGroup: + return "Non Empty Group" + case GroupIdNotFound: + return "Group ID Not Found" + case FetchSessionIDNotFound: + return "Fetch Session ID Not Found" + case InvalidFetchSessionEpoch: + return "Invalid Fetch Session Epoch" + case ListenerNotFound: + return "Listener Not Found" + case TopicDeletionDisabled: + return "Topic Deletion Disabled" + case FencedLeaderEpoch: + return "Fenced Leader Epoch" + case UnknownLeaderEpoch: + return "Unknown Leader Epoch" + case UnsupportedCompressionType: + return "Unsupported Compression Type" } return "" } @@ -318,6 +381,48 @@ func (e Error) Description() string { return "the security features are disabled" case BrokerAuthorizationFailed: return "the broker authorization failed" + case KafkaStorageError: + return "disk error when trying to access log file on the disk" + case LogDirNotFound: + return "the user-specified log directory is not found in the broker config" + case SASLAuthenticationFailed: + return "SASL Authentication failed" + case UnknownProducerId: + return "the broker could not locate the producer metadata associated with the producer ID" + case ReassignmentInProgress: + return "a partition reassignment is in progress" + case DelegationTokenAuthDisabled: + return "delegation token feature is not enabled" + case DelegationTokenNotFound: + return "delegation token is not found on server" + case DelegationTokenOwnerMismatch: + return "specified principal is not valid owner/renewer" + case DelegationTokenRequestNotAllowed: + return "delegation token requests are not allowed on plaintext/1-way ssl channels and on delegation token authenticated channels" + case DelegationTokenAuthorizationFailed: + return "delegation token authorization failed" + case DelegationTokenExpired: + return "delegation token is expired" + case InvalidPrincipalType: + return "supplied principaltype is not supported" + case NonEmptyGroup: + return "the group is not empty" + case GroupIdNotFound: + return "the group ID does not exist" + case FetchSessionIDNotFound: + return "the fetch session ID was not found" + case InvalidFetchSessionEpoch: + return "the fetch session epoch is invalid" + case ListenerNotFound: + return "there is no listener on the leader broker that matches the listener on which metadata request was processed" + case TopicDeletionDisabled: + return "topic deletion is disabled" + case FencedLeaderEpoch: + return "the leader epoch in the request is older than the epoch on the broker" + case UnknownLeaderEpoch: + return "the leader epoch in the request is newer than the epoch on the broker" + case UnsupportedCompressionType: + return "the requesting client does not support the compression type of given partition" } return "" } diff --git a/error_test.go b/error_test.go index 145590848..c56143e23 100644 --- a/error_test.go +++ b/error_test.go @@ -64,6 +64,27 @@ func TestError(t *testing.T) { TransactionalIDAuthorizationFailed, SecurityDisabled, BrokerAuthorizationFailed, + KafkaStorageError, + LogDirNotFound, + SASLAuthenticationFailed, + UnknownProducerID, + ReassignmentInProgress, + DelegationTokenAuthDisabled, + DelegationTokenNotFound, + DelegationTokenOwnerMismatch, + DelegationTokenRequestNotAllowed, + DelegationTokenAuthorizationFailed, + DelegationTokenExpired, + InvalidPrincipalType, + NonEmptyGroup, + GroupIDNotFound, + FetchSessionIDNotFound, + InvalidFetchSessionEpoch, + ListenerNotFound, + TopicDeletionDisabled, + FencedLeaderEpoch, + UnknownLeaderEpoch, + UnsupportedCompressionType, } for _, err := range errorCodes { diff --git a/protocol.go b/protocol.go index 30d987aff..5a6034726 100644 --- a/protocol.go +++ b/protocol.go @@ -29,10 +29,11 @@ const ( type apiVersion int16 const ( - v0 apiVersion = 0 - v1 apiVersion = 1 - v2 apiVersion = 2 - v3 apiVersion = 3 + v0 apiVersion = 0 + v1 apiVersion = 1 + v2 apiVersion = 2 + v3 apiVersion = 3 + v10 apiVersion = 10 ) type requestHeader struct { diff --git a/read.go b/read.go index fee5e5b24..70d355d9f 100644 --- a/read.go +++ b/read.go @@ -335,6 +335,84 @@ func readFetchResponseHeader(r *bufio.Reader, size int) (throttle int32, waterma return } +func readFetchResponseHeaderV10(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) { + var ( + n int32 + errorCode int16 + fetchSessionID int32 + p struct { + Partition int32 + ErrorCode int16 + HighwaterMarkOffset int64 + LastStableOffset int64 + LogStartOffset int64 + } + ) + + if remain, err = readInt32(r, size, &throttle); err != nil { + return + } + + // Global error code + if remain, err = readInt16(r, remain, &errorCode); err != nil { + return + } + if errorCode != 0 { + err = Error(errorCode) + return + } + + if remain, err = readInt32(r, remain, &fetchSessionID); err != nil { + return + } + + if remain, err = readInt32(r, remain, &n); err != nil { + return + } + + // This error should never trigger, unless there's a bug in the kafka client + // or server. + if n != 1 { + err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n) + return + } + + // We ignore the topic name because we've requests messages for a single + // topic, unless there's a bug in the kafka server we will have received + // the name of the topic that we requested. + if remain, err = discardString(r, remain); err != nil { + return + } + + if remain, err = readInt32(r, remain, &n); err != nil { + return + } + + // This error should never trigger, unless there's a bug in the kafka client + // or server. + if n != 1 { + err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n) + return + } + + if remain, err = read(r, remain, &p); err != nil { + return + } + + if p.ErrorCode != 0 { + err = Error(p.ErrorCode) + return + } + + // Ignore the aborted transactions since we don't support it + if remain, err = discardInt32(r, remain); err != nil { + return + } + + watermark = p.HighwaterMarkOffset + return +} + func readMessageHeader(r *bufio.Reader, sz int) (offset int64, attributes int8, timestamp int64, remain int, err error) { var version int8 diff --git a/write.go b/write.go index 216da873f..ec3bdb615 100644 --- a/write.go +++ b/write.go @@ -149,6 +149,61 @@ func writeFetchRequestV2(w *bufio.Writer, correlationID int32, clientID, topic s return w.Flush() } +// writeFetchRequestV10 implements the v10 of the FetchRequest. +// v10 is required to use zstd compression. +// Defined in http://kafka.apache.org/protocol.html#The_Messages_Fetch +func writeFetchRequestV10(w *bufio.Writer, correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration) error { + h := requestHeader{ + ApiKey: int16(fetchRequest), + ApiVersion: int16(v10), + CorrelationID: correlationID, + ClientID: clientID, + } + h.Size = (h.size() - 4) + + 4 + // replica ID + 4 + // max wait time + 4 + // min bytes + 4 + // max bytes + 1 + // isolation level + 4 + // fetch session ID + 4 + // fetch session epoch + 4 + // topic array length + sizeofString(topic) + + 4 + // partition array length + 4 + // partition + 4 + // current leader epoch + 8 + // offset + 8 + // log start offset + 4 + // partition max bytes + 4 // forgotten topics array length + + h.writeTo(w) + writeInt32(w, -1) // replica ID + writeInt32(w, milliseconds(maxWait)) + writeInt32(w, int32(minBytes)) + writeInt32(w, int32(maxBytes)) + writeInt8(w, 0) // no support for isolation level yet + writeInt32(w, 0) // no support for fetch session id yet + writeInt32(w, -1) // no support for fetch session epoch yet + + // topic array + writeArrayLen(w, 1) + writeString(w, topic) + + // partition array + writeArrayLen(w, 1) + writeInt32(w, partition) + writeInt32(w, -1) // no partition leader epoch + writeInt64(w, offset) + writeInt64(w, -1) // log start offset + writeInt32(w, int32(maxBytes)) + + // forgotten topics array + writeArrayLen(w, 0) // forgotten topics not supported yet + + return w.Flush() +} + func writeListOffsetRequestV1(w *bufio.Writer, correlationID int32, clientID, topic string, partition int32, time int64) error { h := requestHeader{ ApiKey: int16(listOffsetRequest),