Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove all in-code use of leadership "instance name" + constrain column to single value #325

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions internal/leadership/elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,11 @@ const (
electIntervalDefault = 5 * time.Second
electIntervalJitterDefault = 1 * time.Second
electIntervalTTLPaddingDefault = 10 * time.Second

// TODO: for now we only support a single instance per database/schema.
// If we want to provide isolation within a single database/schema,
// we'll need to add a client config for this.
instanceNameDefault = "default"
)

type dbLeadershipNotification struct {
Name string `json:"name"`
LeaderID string `json:"leader_id"`
Action string `json:"action"`
LeaderID string `json:"leader_id"`
}

type Notification struct {
Expand Down Expand Up @@ -78,7 +72,6 @@ type Config struct {
ClientID string
ElectInterval time.Duration // period on which each elector attempts elect even without having received a resignation notification
ElectIntervalJitter time.Duration
InstanceName string
}

func (c *Config) mustValidate() *Config {
Expand All @@ -88,9 +81,6 @@ func (c *Config) mustValidate() *Config {
if c.ElectInterval <= 0 {
panic("Config.ElectInterval must be above zero")
}
if c.InstanceName == "" {
panic("Config.InstanceName must be non-empty")
}

return c
}
Expand Down Expand Up @@ -119,7 +109,6 @@ func NewElector(archetype *baseservice.Archetype, exec riverdriver.Executor, not
ClientID: config.ClientID,
ElectInterval: valutil.ValOrDefault(config.ElectInterval, electIntervalDefault),
ElectIntervalJitter: valutil.ValOrDefault(config.ElectIntervalJitter, electIntervalJitterDefault),
InstanceName: valutil.ValOrDefault(config.InstanceName, instanceNameDefault),
}).mustValidate(),
exec: exec,
notifier: notifier,
Expand Down Expand Up @@ -204,7 +193,6 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error {

elected, err := attemptElectOrReelect(ctx, e.exec, false, &riverdriver.LeaderElectParams{
LeaderID: e.config.ClientID,
Name: e.config.InstanceName,
TTL: e.leaderTTL(),
})
if err != nil {
Expand Down Expand Up @@ -261,9 +249,9 @@ func (e *Elector) handleLeadershipNotification(ctx context.Context, topic notifi

e.Logger.InfoContext(ctx, e.Name+": Received notification from notifier", "action", notification.Action, "client_id", e.config.ClientID)

if notification.Action != "resigned" || notification.Name != e.config.InstanceName {
if notification.Action != "resigned" {
// We only care about resignations because we use them to preempt the
// election attempt backoff. And we only care about our own key name.
// election attempt backoff.
return
}

Expand Down Expand Up @@ -341,7 +329,6 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error {

reelected, err := attemptElectOrReelect(ctx, e.exec, true, &riverdriver.LeaderElectParams{
LeaderID: e.config.ClientID,
Name: e.config.InstanceName,
TTL: e.leaderTTL(),
})
if err != nil {
Expand Down Expand Up @@ -418,7 +405,6 @@ func (e *Elector) attemptResign(ctx context.Context, attempt int) error {
resigned, err := e.exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{
LeaderID: e.config.ClientID,
LeadershipTopic: string(notifier.NotificationTopicLeadership),
Name: e.config.InstanceName,
})
if err != nil {
return err
Expand Down Expand Up @@ -512,7 +498,7 @@ func attemptElectOrReelect(ctx context.Context, exec riverdriver.Executor, alrea
defer cancel()

return dbutil.WithTxV(ctx, exec, func(ctx context.Context, exec riverdriver.ExecutorTx) (bool, error) {
if _, err := exec.LeaderDeleteExpired(ctx, params.Name); err != nil {
if _, err := exec.LeaderDeleteExpired(ctx); err != nil {
return false, err
}

Expand Down
36 changes: 7 additions & 29 deletions internal/leadership/elector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"github.com/riverqueue/river/rivertype"
)

const defaultInstanceName = "default"

func TestElector_PollOnly(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -154,15 +152,15 @@ func testElector[TElectorBundle any](

elector.testSignals.GainedLeadership.WaitOrTimeout()

leader, err := bundle.exec.LeaderGetElectedLeader(ctx, defaultInstanceName)
leader, err := bundle.exec.LeaderGetElectedLeader(ctx)
require.NoError(t, err)
require.Equal(t, elector.config.ClientID, leader.LeaderID)

elector.Stop()

elector.testSignals.ResignedLeadership.WaitOrTimeout()

_, err = bundle.exec.LeaderGetElectedLeader(ctx, defaultInstanceName)
_, err = bundle.exec.LeaderGetElectedLeader(ctx)
require.ErrorIs(t, err, rivertype.ErrNotFound)
})

Expand Down Expand Up @@ -235,7 +233,6 @@ func testElector[TElectorBundle any](
_, err := bundle.exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{
LeaderID: elector.config.ClientID,
LeadershipTopic: string(notifier.NotificationTopicLeadership),
Name: defaultInstanceName,
})
require.NoError(t, err)

Expand Down Expand Up @@ -266,7 +263,7 @@ func testElector[TElectorBundle any](
t.Logf("Waiting for %s to gain leadership", elector1.config.ClientID)
elector1.testSignals.GainedLeadership.WaitOrTimeout()

leader, err := bundle.exec.LeaderGetElectedLeader(ctx, defaultInstanceName)
leader, err := bundle.exec.LeaderGetElectedLeader(ctx)
require.NoError(t, err)
require.Equal(t, elector1.config.ClientID, leader.LeaderID)
}
Expand Down Expand Up @@ -300,7 +297,7 @@ func testElector[TElectorBundle any](
elector2.testSignals.ResignedLeadership.WaitOrTimeout()
}

_, err := bundle.exec.LeaderGetElectedLeader(ctx, defaultInstanceName)
_, err := bundle.exec.LeaderGetElectedLeader(ctx)
require.ErrorIs(t, err, rivertype.ErrNotFound)
})

Expand Down Expand Up @@ -349,13 +346,12 @@ func TestAttemptElectOrReelect(t *testing.T) {

elected, err := attemptElectOrReelect(ctx, bundle.exec, false, &riverdriver.LeaderElectParams{
LeaderID: clientID,
Name: leaderInstanceName,
TTL: leaderTTL,
})
require.NoError(t, err)
require.True(t, elected) // won election

leader, err := bundle.exec.LeaderGetElectedLeader(ctx, leaderInstanceName)
leader, err := bundle.exec.LeaderGetElectedLeader(ctx)
require.NoError(t, err)
require.WithinDuration(t, time.Now(), leader.ElectedAt, 100*time.Millisecond)
require.WithinDuration(t, time.Now().Add(leaderTTL), leader.ExpiresAt, 100*time.Millisecond)
Expand All @@ -368,23 +364,21 @@ func TestAttemptElectOrReelect(t *testing.T) {

leader := testfactory.Leader(ctx, t, bundle.exec, &testfactory.LeaderOpts{
LeaderID: ptrutil.Ptr(clientID),
Name: ptrutil.Ptr(leaderInstanceName),
})

// Re-elect the same leader. Use a larger TTL to see if time is updated,
// because we are in a test transaction and the time is frozen at the start of
// the transaction.
elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{
LeaderID: clientID,
Name: leaderInstanceName,
TTL: 30 * time.Second,
})
require.NoError(t, err)
require.True(t, elected) // won re-election

// expires_at should be incremented because this is the same leader that won
// previously and we specified that we're already elected:
updatedLeader, err := bundle.exec.LeaderGetElectedLeader(ctx, leaderInstanceName)
updatedLeader, err := bundle.exec.LeaderGetElectedLeader(ctx)
require.NoError(t, err)
require.Greater(t, updatedLeader.ExpiresAt, leader.ExpiresAt)
})
Expand All @@ -396,12 +390,10 @@ func TestAttemptElectOrReelect(t *testing.T) {

leader := testfactory.Leader(ctx, t, bundle.exec, &testfactory.LeaderOpts{
LeaderID: ptrutil.Ptr(clientID),
Name: ptrutil.Ptr(leaderInstanceName),
})

elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{
LeaderID: "different-client-id",
Name: leaderInstanceName,
TTL: leaderTTL,
})
require.NoError(t, err)
Expand All @@ -410,7 +402,7 @@ func TestAttemptElectOrReelect(t *testing.T) {
// The time should not have changed because we specified that we were not
// already elected, and the elect query is a no-op if there's already a
// updatedLeader:
updatedLeader, err := bundle.exec.LeaderGetElectedLeader(ctx, leaderInstanceName)
updatedLeader, err := bundle.exec.LeaderGetElectedLeader(ctx)
require.NoError(t, err)
require.Equal(t, leader.ExpiresAt, updatedLeader.ExpiresAt)
})
Expand Down Expand Up @@ -458,7 +450,6 @@ func TestElectorHandleLeadershipNotification(t *testing.T) {

return &dbLeadershipNotification{
Action: "resigned",
Name: defaultInstanceName,
LeaderID: "other-client-id",
}
}
Expand Down Expand Up @@ -499,19 +490,6 @@ func TestElectorHandleLeadershipNotification(t *testing.T) {
require.Empty(t, elector.leadershipNotificationChan)
})

t.Run("IgnoresAlternateInstanceName", func(t *testing.T) {
t.Parallel()

elector, _ := setup(t)

change := validLeadershipChange()
change.Name = "alternate_instance_name"

elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, change)))

require.Empty(t, elector.leadershipNotificationChan)
})

t.Run("IgnoresSameClientID", func(t *testing.T) {
t.Parallel()

Expand Down
Loading
Loading