Skip to content

Commit

Permalink
Merge pull request redpanda-data#6 from twmb/bifurcate
Browse files Browse the repository at this point in the history
Bifurcate
  • Loading branch information
twmb committed Feb 2, 2020
2 parents 0bbbfb8 + 04f9b84 commit 2be016c
Show file tree
Hide file tree
Showing 31 changed files with 2,743 additions and 1,510 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ KIPS

TODO
----
- KIP-360 (safe epoch bumping for `UNKNOWN_PRODUCER_ID`; 2.5.0)
- KIP-447 (transaction changes to better support group changes)

NOT YET (KIP under discussion / unmerged PR)
-------
- KIP-359 (verify leader epoch in produce requests)
- KIP-392 (fetch request from closest replica w/ rack; 2.2.0)
- KIP-423 (no rebalance on JoinGroup from leader in certain cases; under discussion)
- KIP-447 (transaction changes to better support group changes)
- KIP-497 (new inter broker admin command "alter isr")
- KIP-516 (topic.id field in some commands, including fetch)
- KIP-518 (list groups by state command change)
Expand Down Expand Up @@ -60,6 +59,7 @@ DONE
- KIP-341 (sticky group bug fix)
- KIP-342 (oauth extensions; 2.1.0)
- KIP-345 (static group membership, see KAFKA-8224)
- KIP-360 (safe epoch bumping for `UNKNOWN_PRODUCER_ID`; 2.5.0)
- KIP-368 (periodically reauth sasl; 2.2.0)
- KIP-369 (always round robin produce partitioner; 2.4.0)
- KIP-380 (inter-broker command changes; 2.2.0)
Expand Down
36 changes: 27 additions & 9 deletions generate/DEFINITIONS
Original file line number Diff line number Diff line change
Expand Up @@ -1012,8 +1012,12 @@ TxnMetadataValue => not top level, with version field
Version: int16
// ProducerID is the ID in use by the transactional ID.
ProducerID: int64
// LastProducerID is the last ID in use for a producer; see KIP-360.
LastProducerID: int64 // v1+
// ProducerEpoch is the epoch associated with the producer ID.
ProducerEpoch: int16
// LastProducerEpoch is the last epoch in use for a producer; see KIP-360.
LastProducerEpoch: int16 // v1+
// TimeoutMillis is the timeout of this transaction in milliseconds.
TimeoutMillis: int32
// State is the state this transaction is in,
Expand Down Expand Up @@ -1136,7 +1140,7 @@ OffsetCommitResponse =>

// OffsetFetchRequest requests the most recent committed offsets for topic
// partitions in a group.
OffsetFetchRequest => key 9, max version 6, flexible v6+, group coordinator
OffsetFetchRequest => key 9, max version 7, flexible v6+, group coordinator
// Group is the group to fetch offsets for.
Group: string
// Topics contains topics to fetch offets for. Version 2+ allows this to be
Expand All @@ -1146,6 +1150,11 @@ OffsetFetchRequest => key 9, max version 6, flexible v6+, group coordinator
Topic: string
// Partitions in a list of partitions in a group to fetch offsets for.
Partitions: [int32]
// RequireStable signifies whether the broker should wait on returning
// unstable offsets, instead setting a retriable error on the relevant
// unstable partitions (UNSTABLE_OFFSET_COMMIT). See KIP-447 for more
// details.
RequireStable: bool // v7+

// OffsetFetchResponse is returned from an OffsetFetchRequest.
OffsetFetchResponse =>
Expand Down Expand Up @@ -1191,6 +1200,9 @@ OffsetFetchResponse =>
//
// UNKNOWN_TOPIC_OR_PARTITION is returned if the requested topic or partition
// is unknown.
//
// UNSTABLE_OFFSET_COMMIT is returned for v7+ if the request set RequireStable.
// See KIP-447 for more details.
ErrorCode: int16
// ErrorCode is a top level error code that applies to all topic/partitions.
// This will be any group error.
Expand Down Expand Up @@ -1661,7 +1673,7 @@ ListGroupsResponse =>

// SASLHandshakeRequest begins the sasl authentication flow. Note that Kerberos
// GSSAPI authentication has its own unique flow.
SASLHandshakeRequest => key 17, max version 1
SASLHandshakeRequest => key 17, max version 2, flexible v2+
// Mechanism is the mechanism to use for the sasl handshake (e.g., "PLAIN").
//
// For version 0, if this mechanism is supported, it is expected that the
Expand Down Expand Up @@ -2266,7 +2278,7 @@ WriteTxnMarkersResponse =>
// TxnOffsetCommitRequest sends offsets that are a part of this transaction
// to be committed once the transaction itself finishes. This effectively
// replaces OffsetCommitRequest for when using transactions.
TxnOffsetCommitRequest => key 28, max version 2, group coordinator
TxnOffsetCommitRequest => key 28, max version 3, flexible v3+, group coordinator
// TransactionalID is the transactional ID to use for this request.
TransactionalID: string
// Group is the group consumed in this transaction and to be used for
Expand All @@ -2278,6 +2290,12 @@ TxnOffsetCommitRequest => key 28, max version 2, group coordinator
// ProducerEpoch is the producer epoch of the client for this transactional ID
// as received from InitProducerID.
ProducerEpoch: int16
// Generation is the group generation this heartbeat is for.
Generation: int32 // v3+
// MemberID is the member ID this member is for.
MemberID: string // v3+
// InstanceID is the instance ID of this member in the group (KIP-345, KIP-447).
InstanceID: nullable-string // v3+
// Topics are topics to add for pending commits.
Topics: [=>]
// Topic is a topic to add for a pending commit.
Expand Down Expand Up @@ -2361,7 +2379,7 @@ TxnOffsetCommitResponse =>
// "types" of filters in this request: the resource filter and the entry
// filter, with entries corresponding to users. The first three fields form the
// resource filter, the last four the entry filter.
DescribeACLsRequest => key 29, max version 1, admin
DescribeACLsRequest => key 29, max version 2, flexible v2+, admin
// ResourceType is the type of resource to describe.
//
// UNKNOWN, 0, is unknown; you do not describe unknown types. Kafka replies
Expand Down Expand Up @@ -2806,7 +2824,7 @@ DescribeLogDirsResponse =>
// this wrapping is that Kafka can indicate errors in the response, rather than
// just closing the connection. Additionally, the response allows for further
// extension fields.
SASLAuthenticateRequest => key 36, max version 1
SASLAuthenticateRequest => key 36, max version 2, flexible v2+
// SASLAuthBytes contains bytes for a SASL client request.
SASLAuthBytes: bytes

Expand All @@ -2825,7 +2843,7 @@ SASLAuthenticateResponse =>
SessionLifetimeMillis: int64 // v1+

// CreatePartitionsRequest creates additional partitions for topics.
CreatePartitionsRequest => key 37, max version 1, admin
CreatePartitionsRequest => key 37, max version 2, flexible v2+, admin
// Topics contains topics to create partitions for.
Topics: [=>]
// Topic is a topic for which to create additional partitions for.
Expand Down Expand Up @@ -2951,7 +2969,7 @@ CreateDelegationTokenResponse =>
// RenewDelegationTokenRequest is a request to renew a delegation token that
// has not yet hit its max timestamp. Note that a client using a token cannot
// renew its own token.
RenewDelegationTokenRequest => key 39, max version 1, admin
RenewDelegationTokenRequest => key 39, max version 2, flexible v2+, admin
// HMAC is the HMAC of the token to be renewed.
HMAC: bytes
// RenewTimeMillis is how long to renew the token for. If -1, Kafka uses its
Expand All @@ -2975,7 +2993,7 @@ RenewDelegationTokenResponse =>
// ExpireDelegationTokenRequest is a request to change the expiry timestamp
// of a delegation token. Note that a client using a token cannot expire its
// own token.
ExpireDelegationTokenRequest => key 40, max version 1, admin
ExpireDelegationTokenRequest => key 40, max version 2, flexible v2+, admin
// HMAC is the HMAC of the token to change the expiry timestamp of.
HMAC: bytes
// ExpiryPeriodMillis changes the delegation token's expiry timestamp to
Expand All @@ -3002,7 +3020,7 @@ ExpireDelegationTokenResponse =>
ThrottleMillis: int32

// DescribeDelegationTokenRequest is a request to describe delegation tokens.
DescribeDelegationTokenRequest => key 41, max version 1, admin
DescribeDelegationTokenRequest => key 41, max version 2, flexible v2+, admin
// Owners contains owners to describe delegation tokens for, or null for all.
// If non-null, only tokens created from a matching principal type, name
// combination are printed.
Expand Down
15 changes: 10 additions & 5 deletions generate/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ func (Int8) TypeName() string { return "int8" }
func (Int16) TypeName() string { return "int16" }
func (Int32) TypeName() string { return "int32" }
func (Int64) TypeName() string { return "int64" }
func (Float64) TypeName() string { return "float64" }
func (Uint32) TypeName() string { return "uint32" }
func (Varint) TypeName() string { return "int32" }
func (Varlong) TypeName() string { return "int64" }
Expand Down Expand Up @@ -41,16 +42,19 @@ func (Int8) WriteAppend(l *LineWriter) { primAppend("Int8", l) }
func (Int16) WriteAppend(l *LineWriter) { primAppend("Int16", l) }
func (Int32) WriteAppend(l *LineWriter) { primAppend("Int32", l) }
func (Int64) WriteAppend(l *LineWriter) { primAppend("Int64", l) }
func (Float64) WriteAppend(l *LineWriter) { primAppend("Float64", l) }
func (Uint32) WriteAppend(l *LineWriter) { primAppend("Uint32", l) }
func (Varint) WriteAppend(l *LineWriter) { primAppend("Varint", l) }
func (Varlong) WriteAppend(l *LineWriter) { primAppend("Varlong", l) }
func (VarintString) WriteAppend(l *LineWriter) { primAppend("VarintString", l) }
func (VarintBytes) WriteAppend(l *LineWriter) { primAppend("VarintBytes", l) }

func (v String) WriteAppend(l *LineWriter) { compactAppend(v.FromFlexible, "String", l) }
func (v NullableString) WriteAppend(l *LineWriter) { compactAppend(v.FromFlexible, "NullableString", l) }
func (v Bytes) WriteAppend(l *LineWriter) { compactAppend(v.FromFlexible, "Bytes", l) }
func (v NullableBytes) WriteAppend(l *LineWriter) { compactAppend(v.FromFlexible, "NullableBytes", l) }
func (v String) WriteAppend(l *LineWriter) { compactAppend(v.FromFlexible, "String", l) }
func (v NullableString) WriteAppend(l *LineWriter) {
compactAppend(v.FromFlexible, "NullableString", l)
}
func (v Bytes) WriteAppend(l *LineWriter) { compactAppend(v.FromFlexible, "Bytes", l) }
func (v NullableBytes) WriteAppend(l *LineWriter) { compactAppend(v.FromFlexible, "NullableBytes", l) }

func (FieldLengthMinusBytes) WriteAppend(l *LineWriter) {
l.Write("dst = append(dst, v...)")
Expand Down Expand Up @@ -189,6 +193,7 @@ func (Int8) WriteDecode(l *LineWriter) { primDecode("Int8", l) }
func (Int16) WriteDecode(l *LineWriter) { primDecode("Int16", l) }
func (Int32) WriteDecode(l *LineWriter) { primDecode("Int32", l) }
func (Int64) WriteDecode(l *LineWriter) { primDecode("Int64", l) }
func (Float64) WriteDecode(l *LineWriter) { primDecode("Float64", l) }
func (Uint32) WriteDecode(l *LineWriter) { primDecode("Uint32", l) }
func (Varint) WriteDecode(l *LineWriter) { primDecode("Varint", l) }
func (Varlong) WriteDecode(l *LineWriter) { primDecode("Varlong", l) }
Expand Down Expand Up @@ -350,7 +355,7 @@ func (s Struct) WriteDecode(l *LineWriter) {
}

switch f.Type.(type) {
case Bool, Int8, Int16, Int32, Int64, Uint32, Varint:
case Bool, Int8, Int16, Int32, Int64, Float64, Uint32, Varint:
default:
die("type %v unsupported in decode! fix this!", f.Type)
}
Expand Down
1 change: 1 addition & 0 deletions generate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type (
Int16 struct{}
Int32 struct{}
Int64 struct{}
Float64 struct{}
Uint32 struct{}
Varint struct{}
Varlong struct{}
Expand Down
1 change: 1 addition & 0 deletions generate/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var types = map[string]Type{
"int16": Int16{},
"int32": Int32{},
"int64": Int64{},
"float64": Float64{},
"uint32": Uint32{},
"varint": Varint{},
"varlong": Varlong{},
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ go 1.13

require (
github.com/davecgh/go-spew v1.1.1
github.com/frankban/quicktest v1.7.2 // indirect
github.com/golang/snappy v0.0.1
github.com/google/go-cmp v0.3.1
github.com/klauspost/compress v1.8.2
github.com/klauspost/cpuid v1.2.1 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/klauspost/compress v1.9.8
github.com/pierrec/lz4 v2.4.1+incompatible
github.com/twmb/go-rbtree v1.0.0
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
)
17 changes: 11 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.7.2 h1:2QxQoC1TS09S7fhCPsrvqYdvP1H5M1P1ih5ABm3BTYk=
github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs=
github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg=
github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
22 changes: 20 additions & 2 deletions pkg/kbin/primitives.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package kbin
import (
"encoding/binary"
"errors"
"math"
"math/bits"
)

Expand Down Expand Up @@ -43,7 +44,15 @@ func AppendInt32(dst []byte, i int32) []byte {

// AppendInt64 appends a big endian int64 to dst.
func AppendInt64(dst []byte, i int64) []byte {
u := uint64(i)
return appendUint64(dst, uint64(i))
}

// AppendFloat64 appends a big endian float64 to dst.
func AppendFloat64(dst []byte, f float64) []byte {
return appendUint64(dst, math.Float64bits(f))
}

func appendUint64(dst []byte, u uint64) []byte {
return append(dst, byte(u>>56), byte(u>>48), byte(u>>40), byte(u>>32),
byte(u>>24), byte(u>>16), byte(u>>8), byte(u))
}
Expand Down Expand Up @@ -478,12 +487,21 @@ func (b *Reader) Int32() int32 {

// Int64 returns an int64 from the reader.
func (b *Reader) Int64() int64 {
return int64(b.readUint64())
}

// Float64 returns a float64 from the reader.
func (b *Reader) Float64() float64 {
return math.Float64frombits(b.readUint64())
}

func (b *Reader) readUint64() uint64 {
if len(b.Src) < 8 {
b.bad = true
b.Src = nil
return 0
}
r := int64(binary.BigEndian.Uint64(b.Src))
r := binary.BigEndian.Uint64(b.Src)
b.Src = b.Src[8:]
return r
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kerr/kerr.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ var (
NoReassignmentInProgress = &Error{"NO_REASSIGNMENT_IN_PROGRESS", 85, false, "No partition reassignment is in progress."}
GroupSubscribedToTopic = &Error{"GROUP_SUBSCRIBED_TO_TOPIC", 86, false, "Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it."}
InvalidRecord = &Error{"INVALID_RECORD", 87, false, "This record has failed the validation on broker and hence be rejected."}
UnstableOffsetCommit = &Error{"UNSTABLE_OFFSET_COMMIT", 88, true, "There are unstable offsets that need to be cleared."}
)

var code2err = map[int16]error{
Expand Down Expand Up @@ -243,4 +244,5 @@ var code2err = map[int16]error{
85: NoReassignmentInProgress,
86: GroupSubscribedToTopic,
87: InvalidRecord,
88: UnstableOffsetCommit,
}
39 changes: 26 additions & 13 deletions pkg/kgo/atomic_maybe_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@ const (
stateContinueWorking
)

// maybeBeginWork changes workState to some form of working and returns whether
// the caller needs to begin work.
func maybeBeginWork(workState *uint32) bool {
type workLoop struct{ state uint32 }

// maybeBegin returns whether a work loop should begin.
func (l *workLoop) maybeBegin() bool {
var state uint32
var done bool
for !done {
switch state = atomic.LoadUint32(workState); state {
switch state = atomic.LoadUint32(&l.state); state {
case stateUnstarted:
done = atomic.CompareAndSwapUint32(workState, state, stateWorking)
done = atomic.CompareAndSwapUint32(&l.state, state, stateWorking)
state = stateWorking
case stateWorking:
done = atomic.CompareAndSwapUint32(workState, state, stateContinueWorking)
done = atomic.CompareAndSwapUint32(&l.state, state, stateContinueWorking)
state = stateContinueWorking
case stateContinueWorking:
done = true
Expand All @@ -29,25 +30,37 @@ func maybeBeginWork(workState *uint32) bool {
return state == stateWorking
}

// maybeTryFinishWork demotes workState and returns whether work should
// continue.
// maybeFinish demotes loop's internal state and returns whether work should
// keep going. This function should be called before looping to continue
// work.
//
// If again is true, this will avoid demoting from working to not
// working. Again would be true if the loop knows it should continue working;
// calling this function is necessary even in this case to update loop's
// internal state.
//
// If again is true, this will avoid demoting from working to not working.
func maybeTryFinishWork(workState *uint32, again bool) bool {
switch state := atomic.LoadUint32(workState); state {
// This function is a no-op if the loop is already finished, but generally,
// since the loop itself calls MaybeFinish after it has been started, this
// should never be called if the loop is unstarted.
func (l *workLoop) maybeFinish(again bool) bool {
switch state := atomic.LoadUint32(&l.state); state {
// Working:
// If again, we know we should continue; keep our state.
// If not again, we try to downgrade state and stop.
// If we cannot, then something slipped in to say keep going.
case stateWorking:
if !again {
again = !atomic.CompareAndSwapUint32(workState, state, stateUnstarted)
again = !atomic.CompareAndSwapUint32(&l.state, state, stateUnstarted)
}
// Continue: demote ourself and run again no matter what.
case stateContinueWorking:
atomic.StoreUint32(workState, stateWorking)
atomic.StoreUint32(&l.state, stateWorking)
again = true
}

return again
}

func (l *workLoop) hardFinish() {
atomic.StoreUint32(&l.state, stateUnstarted)
}
Loading

0 comments on commit 2be016c

Please sign in to comment.