Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for consumer group #1022

Merged
merged 10 commits into from
Nov 8, 2022
5 changes: 4 additions & 1 deletion consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,14 +470,17 @@ func (g *Generation) heartbeatLoop(interval time.Duration) {
case <-ctx.Done():
return
case <-ticker.C:
_, err := g.coord.heartbeat(ctx, &HeartbeatRequest{
resp, err := g.coord.heartbeat(ctx, &HeartbeatRequest{
GroupID: g.GroupID,
GenerationID: g.ID,
MemberID: g.MemberID,
})
if err != nil {
return
}
if resp.Error != nil {
return
}
}
}
})
Expand Down
64 changes: 64 additions & 0 deletions consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,67 @@ func TestGenerationStartsFunctionAfterClosed(t *testing.T) {
}
}
}

func TestGenerationEndsOnHeartbeatError(t *testing.T) {
gen := Generation{
coord: &mockCoordinator{
heartbeatFunc: func(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) {
return nil, errors.New("some error")
},
},
done: make(chan struct{}),
joined: make(chan struct{}),
log: func(func(Logger)) {},
logError: func(func(Logger)) {},
}

ch := make(chan error)
gen.Start(func(ctx context.Context) {
<-ctx.Done()
ch <- ctx.Err()
})

gen.heartbeatLoop(time.Millisecond)

select {
case <-time.After(time.Second):
t.Fatal("timed out waiting for func to run")
case err := <-ch:
if !errors.Is(err, ErrGenerationEnded) {
t.Fatalf("expected %v but got %v", ErrGenerationEnded, err)
}
}
}

func TestGenerationEndsOnHeartbeatRebalaceInProgress(t *testing.T) {
gen := Generation{
coord: &mockCoordinator{
heartbeatFunc: func(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) {
return &HeartbeatResponse{
Error: makeError(int16(RebalanceInProgress), ""),
}, nil
},
},
done: make(chan struct{}),
joined: make(chan struct{}),
log: func(func(Logger)) {},
logError: func(func(Logger)) {},
}

ch := make(chan error)
gen.Start(func(ctx context.Context) {
<-ctx.Done()
ch <- ctx.Err()
})

gen.heartbeatLoop(time.Millisecond)

select {
case <-time.After(time.Second):
t.Fatal("timed out waiting for func to run")
case err := <-ch:
if !errors.Is(err, ErrGenerationEnded) {
t.Fatalf("expected %v but got %v", ErrGenerationEnded, err)
}
}
}
16 changes: 15 additions & 1 deletion joingroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package kafka
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net"
"time"

Expand Down Expand Up @@ -163,7 +165,9 @@ func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGro

for _, member := range r.Members {
var meta consumer.Subscription
err = protocol.Unmarshal(member.Metadata, consumer.MaxVersionSupported, &meta)
metaVersion := makeInt16(member.Metadata[0:2])
err = protocol.Unmarshal(member.Metadata, metaVersion, &meta)
err = joinGroupSubscriptionMetaError(err, metaVersion)
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
}
Expand All @@ -188,6 +192,16 @@ func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGro
return res, nil
}

// sarama indicates there are some misbehaving clients out there that
// set the version as 1 but don't include the OwnedPartitions section
// https://github.com/Shopify/sarama/blob/610514edec1825240d59b62e4d7f1aba4b1fa000/consumer_group_members.go#L43
func joinGroupSubscriptionMetaError(err error, version int16) error {
if version >= 1 && errors.Is(err, io.ErrUnexpectedEOF) {
return nil
}
return err
}

type groupMetadata struct {
Version int16
Topics []string
Expand Down
2 changes: 1 addition & 1 deletion protocol/heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type Response struct {
// type.
_ struct{} `kafka:"min=v4,max=v4,tag"`

ErrorCode int16 `kafka:"min=v0,max=v4"`
ThrottleTimeMs int32 `kafka:"min=v1,max=v4"`
ErrorCode int16 `kafka:"min=v0,max=v4"`
}

func (r *Response) ApiKey() protocol.ApiKey {
Expand Down
10 changes: 7 additions & 3 deletions syncgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,13 @@ func (c *Client) SyncGroup(ctx context.Context, req *SyncGroupRequest) (*SyncGro
r := m.(*syncgroup.Response)

var assignment consumer.Assignment
err = protocol.Unmarshal(r.Assignments, consumer.MaxVersionSupported, &assignment)
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err)
var metaVersion int16
if len(r.Assignments) > 2 {
metaVersion = makeInt16(r.Assignments[0:2])
err = protocol.Unmarshal(r.Assignments, metaVersion, &assignment)
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).SyncGroup: %w", err)
}
}

res := &SyncGroupResponse{
Expand Down