Skip to content

Commit

Permalink
Add test for group coordinator failover
Browse files Browse the repository at this point in the history
  • Loading branch information
tylertreat committed Feb 13, 2022
1 parent 7d24823 commit c56afd8
Showing 1 changed file with 129 additions and 0 deletions.
129 changes: 129 additions & 0 deletions server/groups_test.go
Expand Up @@ -9,6 +9,7 @@ import (

lift "github.com/liftbridge-io/go-liftbridge/v2"
proto "github.com/liftbridge-io/liftbridge/server/protocol"
natsdTest "github.com/nats-io/nats-server/v2/test"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -96,6 +97,43 @@ LOOP:
group, cons, stream, partition)
}

func getGroupCoordinator(t *testing.T, timeout time.Duration, groupName string, servers ...*Server) (
*Server, uint64) {
var (
epoch uint64
coordinator string
deadline = time.Now().Add(timeout)
)
LOOP:
for time.Now().Before(deadline) {
for _, s := range servers {
group := s.metadata.GetConsumerGroup(groupName)
if group == nil {
time.Sleep(15 * time.Millisecond)
continue LOOP
}
coor, ep := group.GetCoordinator()
if ep >= epoch {
coordinator = coor
epoch = ep
}
}
if coordinator == "" {
time.Sleep(15 * time.Millisecond)
continue LOOP
}

for _, s := range servers {
if s.config.Clustering.ServerID == coordinator {
return s, epoch
}
}
}

stackFatalf(t, "No coordinator found for group [id=%s]", groupName)
return nil, 0
}

// Ensure assignPartition and removeStreamAssignments update the consumer's
// assignments and assignedCount appropriately.
func TestConsumerGroupAssignments(t *testing.T) {
Expand Down Expand Up @@ -503,6 +541,97 @@ func TestConsumerGroupStreamDeleted(t *testing.T) {
require.Equal(t, "bar", streams[0])
}

// Ensure when a group coordinator becomes unavailable, the consumer reports it
// and it fails over to another server.
func TestConsumerGroupCoordinatorFailover(t *testing.T) {
defer cleanupStorage(t)

// Use an external NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.EmbeddedNATS = false
s1Config.CursorsStream.Partitions = 1
s1Config.Groups.CoordinatorTimeout = 50 * time.Millisecond
s1 := runServerWithConfig(t, s1Config)
defer s1.Stop()

// Configure second server.
s2Config := getTestConfig("b", false, 5051)
s2Config.CursorsStream.Partitions = 1
s2Config.Groups.CoordinatorTimeout = 50 * time.Millisecond
s2 := runServerWithConfig(t, s2Config)
defer s2.Stop()

// Configure third server.
s3Config := getTestConfig("c", false, 5052)
s3Config.CursorsStream.Partitions = 1
s3Config.Groups.CoordinatorTimeout = 50 * time.Millisecond
s3 := runServerWithConfig(t, s3Config)
defer s3.Stop()

getMetadataLeader(t, 10*time.Second, s1, s2, s3)

client, err := lift.Connect([]string{"localhost:5050", "localhost:5051", "localhost:5052"})
require.NoError(t, err)
defer client.Close()

// Create stream.
err = client.CreateStream(context.Background(), "foo", "foo")
require.NoError(t, err)

var (
group = "my-group"
consID = "cons"
)

// Create consumer.
cons, err := client.CreateConsumer(group, lift.ConsumerID(consID),
lift.FetchAssignmentsInterval(func(timeout time.Duration) time.Duration {
return 10 * time.Millisecond
}))
require.NoError(t, err)

err = cons.Subscribe(context.Background(), []string{"foo"},
func(msg *lift.Message, err error) {
})
require.NoError(t, err)

waitForConsumer(t, 5*time.Second, group, consID, s1, s2, s3)
coordinator, epoch := getGroupCoordinator(t, 5*time.Second, group, s1, s2, s3)

// Stop coordinator to force a failover.
require.NoError(t, coordinator.Stop())
var followers []*Server
if coordinator == s1 {
followers = []*Server{s2, s3}
} else if coordinator == s2 {
followers = []*Server{s1, s3}
} else {
followers = []*Server{s1, s2}
}

// Wait for new coordinator.
var (
deadline = time.Now().Add(5 * time.Second)
newCoordinator *Server
newEpoch uint64
)
for time.Now().Before(deadline) {
newCoordinator, newEpoch = getGroupCoordinator(t, 10*time.Second, group, followers...)
if newCoordinator == coordinator {
time.Sleep(15 * time.Millisecond)
continue
}
break
}

require.NotEqual(t, newCoordinator, coordinator)
require.Greater(t, newEpoch, epoch)
}

// Ensure when a consumer's timeout is reached the group invokes the expired
// handler.
func TestConsumerGroupConsumerTimeout(t *testing.T) {
Expand Down

0 comments on commit c56afd8

Please sign in to comment.