Skip to content

Commit

Permalink
emit subscription events on pause + resume
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry committed May 2, 2024
1 parent 92c27c8 commit e638231
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 71 deletions.
50 changes: 33 additions & 17 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
MaxWorkers: queueConfig.MaxWorkers,
Notifier: client.notifier,
Queue: queue,
QueueEventCallback: func(queue string, pausedAt *time.Time) {
client.distributeEvent(nil, nil, &rivertype.Queue{Name: queue, PausedAt: pausedAt})
},
RetryPolicy: config.RetryPolicy,
SchedulerInterval: config.schedulerInterval,
StatusFunc: client.monitor.SetProducerStatus,
Expand Down Expand Up @@ -909,8 +912,11 @@ func (c *Client[TTx]) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, f
return subChan, cancel
}

// Distribute a single job into any listening subscriber channels.
func (c *Client[TTx]) distributeJob(job *rivertype.JobRow, stats *JobStatistics) {
// Distribute a single event into any listening subscriber channels.
//
// Job events should specify the job and stats, while queue events should only specify
// the queue.
func (c *Client[TTx]) distributeEvent(job *rivertype.JobRow, stats *JobStatistics, queue *rivertype.Queue) {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

Expand All @@ -920,20 +926,30 @@ func (c *Client[TTx]) distributeJob(job *rivertype.JobRow, stats *JobStatistics)
}

var event *Event
switch job.State {
case rivertype.JobStateCancelled:
event = &Event{Kind: EventKindJobCancelled, Job: job, JobStats: stats}
case rivertype.JobStateCompleted:
event = &Event{Kind: EventKindJobCompleted, Job: job, JobStats: stats}
case rivertype.JobStateScheduled:
event = &Event{Kind: EventKindJobSnoozed, Job: job, JobStats: stats}
case rivertype.JobStateAvailable, rivertype.JobStateDiscarded, rivertype.JobStateRetryable, rivertype.JobStateRunning:
event = &Event{Kind: EventKindJobFailed, Job: job, JobStats: stats}
case rivertype.JobStatePending:
panic("completion subscriber unexpectedly received job in pending state, river bug")
default:
// linter exhaustive rule prevents this from being reached
panic("unreachable state to distribute, river bug")

if job != nil {
switch job.State {
case rivertype.JobStateCancelled:
event = &Event{Kind: EventKindJobCancelled, Job: job, JobStats: stats}
case rivertype.JobStateCompleted:
event = &Event{Kind: EventKindJobCompleted, Job: job, JobStats: stats}
case rivertype.JobStateScheduled:
event = &Event{Kind: EventKindJobSnoozed, Job: job, JobStats: stats}
case rivertype.JobStateAvailable, rivertype.JobStateDiscarded, rivertype.JobStateRetryable, rivertype.JobStateRunning:
event = &Event{Kind: EventKindJobFailed, Job: job, JobStats: stats}
case rivertype.JobStatePending:
panic("completion subscriber unexpectedly received job in pending state, river bug")
default:
// linter exhaustive rule prevents this from being reached
panic("unreachable state to distribute, river bug")
}
} else {
switch queue.PausedAt {
case nil:
event = &Event{Kind: EventKindQueueResumed, Queue: queue}
default:
event = &Event{Kind: EventKindQueuePaused, Queue: queue}
}
}

// All subscription channels are non-blocking so this is always fast and
Expand Down Expand Up @@ -963,7 +979,7 @@ func (c *Client[TTx]) distributeJobCompleterCallback(update jobcompleter.Complet
c.statsNumJobs++
}()

c.distributeJob(update.Job, jobStatisticsFromInternal(update.JobStats))
c.distributeEvent(update.Job, jobStatisticsFromInternal(update.JobStats), nil)
}

// Dump aggregate stats from job completions to logs periodically. These
Expand Down
105 changes: 51 additions & 54 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ func Test_Client(t *testing.T) {
EventKindJobCompleted,
EventKindJobFailed,
EventKindJobSnoozed,
EventKindQueuePaused,
EventKindQueueResumed,
)
t.Cleanup(cancel)
return subscribeChan
Expand Down Expand Up @@ -464,44 +466,40 @@ func Test_Client(t *testing.T) {
config, bundle := setupConfig(t)
client := newTestClient(t, bundle.dbPool, config)

jobStartedChan := make(chan int64)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
return nil
}))

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

client.producersByQueueName[QueueDefault].testSignals.Init()

insertRes1, err := client.Insert(ctx, &JobArgs{}, nil)
insertRes1, err := client.Insert(ctx, &noOpArgs{}, nil)
require.NoError(t, err)

startedJobID := riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes1.Job.ID, startedJobID)
event := riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes1.Job.ID, event.Job.ID)

require.NoError(t, client.QueuePause(ctx, QueueDefault, nil))
client.producersByQueueName[QueueDefault].testSignals.Paused.WaitOrTimeout()
event = riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindQueuePaused, event.Kind)
require.Equal(t, QueueDefault, event.Queue.Name)
require.WithinDuration(t, time.Now(), *event.Queue.PausedAt, 2*time.Second)

insertRes2, err := client.Insert(ctx, &JobArgs{}, nil)
insertRes2, err := client.Insert(ctx, &noOpArgs{}, nil)
require.NoError(t, err)

select {
case <-jobStartedChan:
case <-subscribeChan:
t.Fatal("expected job 2 to not start on paused queue")
case <-time.After(500 * time.Millisecond):
}

require.NoError(t, client.QueueResume(ctx, QueueDefault, nil))
client.producersByQueueName[QueueDefault].testSignals.Resumed.WaitOrTimeout()
event = riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindQueueResumed, event.Kind)
require.Equal(t, QueueDefault, event.Queue.Name)
require.Nil(t, event.Queue.PausedAt)

startedJobID = riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes2.Job.ID, startedJobID)
event = riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes2.Job.ID, event.Job.ID)
})

t.Run("PauseAndResumeMultipleQueues", func(t *testing.T) {
Expand All @@ -511,74 +509,73 @@ func Test_Client(t *testing.T) {
config.Queues["alternate"] = QueueConfig{MaxWorkers: 10}
client := newTestClient(t, bundle.dbPool, config)

jobStartedChan := make(chan int64)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
return nil
}))

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

client.producersByQueueName[QueueDefault].testSignals.Init()
client.producersByQueueName["alternate"].testSignals.Init()

insertRes1, err := client.Insert(ctx, &JobArgs{}, nil)
insertRes1, err := client.Insert(ctx, &noOpArgs{}, nil)
require.NoError(t, err)

startedJobID := riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes1.Job.ID, startedJobID)
event := riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes1.Job.ID, event.Job.ID)

// Pause only the default queue:
require.NoError(t, client.QueuePause(ctx, QueueDefault, nil))
client.producersByQueueName[QueueDefault].testSignals.Paused.WaitOrTimeout()
event = riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindQueuePaused, event.Kind)
require.Equal(t, QueueDefault, event.Queue.Name)

insertRes2, err := client.Insert(ctx, &JobArgs{}, nil)
insertRes2, err := client.Insert(ctx, &noOpArgs{}, nil)
require.NoError(t, err)

select {
case <-jobStartedChan:
case <-subscribeChan:
t.Fatal("expected job 2 to not start on paused queue")
case <-time.After(500 * time.Millisecond):
}

// alternate queue should still be running:
insertResAlternate1, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{Queue: "alternate"})
insertResAlternate1, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "alternate"})
require.NoError(t, err)

startedJobID = riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertResAlternate1.Job.ID, startedJobID)
event = riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertResAlternate1.Job.ID, event.Job.ID)

// Pause all queues:
require.NoError(t, client.QueuePause(ctx, rivercommon.AllQueuesString, nil))
client.producersByQueueName["alternate"].testSignals.Paused.WaitOrTimeout()
event = riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindQueuePaused, event.Kind)
require.Equal(t, "alternate", event.Queue.Name)

insertResAlternate2, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{Queue: "alternate"})
insertResAlternate2, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "alternate"})
require.NoError(t, err)

select {
case <-jobStartedChan:
case <-subscribeChan:
t.Fatal("expected alternate job 2 to not start on paused queue")
case <-time.After(500 * time.Millisecond):
}

// Resume only the alternate queue:
require.NoError(t, client.QueueResume(ctx, "alternate", nil))
client.producersByQueueName["alternate"].testSignals.Resumed.WaitOrTimeout()
event = riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindQueueResumed, event.Kind)
require.Equal(t, "alternate", event.Queue.Name)

startedJobID = riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertResAlternate2.Job.ID, startedJobID)
event = riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertResAlternate2.Job.ID, event.Job.ID)

// Resume all queues:
require.NoError(t, client.QueueResume(ctx, rivercommon.AllQueuesString, nil))
client.producersByQueueName[QueueDefault].testSignals.Resumed.WaitOrTimeout()
event = riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindQueueResumed, event.Kind)
require.Equal(t, QueueDefault, event.Queue.Name)

startedJobID = riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes2.Job.ID, startedJobID)
event = riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes2.Job.ID, event.Job.ID)
})

t.Run("PausedBeforeStart", func(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ const (

// EventKindJobSnoozed occurs when a job is snoozed.
EventKindJobSnoozed EventKind = "job_snoozed"

// EventKindQueuePaused occurs when a queue is paused.
EventKindQueuePaused EventKind = "queue_paused"

// EventKindQueueResumed occurs when a queue is resumed.
EventKindQueueResumed EventKind = "queue_resumed"
)

// All known event kinds, used to validate incoming kinds. This is purposely not
Expand All @@ -35,6 +41,8 @@ var allKinds = map[EventKind]struct{}{ //nolint:gochecknoglobals
EventKindJobCompleted: {},
EventKindJobFailed: {},
EventKindJobSnoozed: {},
EventKindQueuePaused: {},
EventKindQueueResumed: {},
}

// Event wraps an event that occurred within a River client, like a job being
Expand All @@ -50,6 +58,9 @@ type Event struct {

// JobStats are statistics about the run of a job.
JobStats *JobStatistics

// Queue contains queue-related information.
Queue *rivertype.Queue
}

// JobStatistics contains information about a single execution of a job.
Expand Down
10 changes: 10 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/riverqueue/river/internal/notifier"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/util/chanutil"
"github.com/riverqueue/river/internal/util/ptrutil"
"github.com/riverqueue/river/internal/workunit"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivertype"
Expand Down Expand Up @@ -69,6 +70,9 @@ type producerConfig struct {
Notifier *notifier.Notifier

Queue string
// QueueEventCallback gets called when the queue is paused or resumed so that
// events can be emitted to subscriptions.
QueueEventCallback func(queue string, pausedAt *time.Time)

// QueuePollInterval is the amount of time between periodic checks for
// queue setting changes. This is only used in poll-only mode (when no
Expand Down Expand Up @@ -427,13 +431,19 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit
p.paused = true
p.Logger.DebugContext(workCtx, p.Name+": Paused", slog.String("queue", p.config.Queue), slog.String("queue_in_message", msg.Queue))
p.testSignals.Paused.Signal(struct{}{})
if p.config.QueueEventCallback != nil {
p.config.QueueEventCallback(p.config.Queue, ptrutil.Ptr(time.Now().UTC()))
}
case controlActionResume:
if !p.paused {
continue
}
p.paused = false
p.Logger.DebugContext(workCtx, p.Name+": Resumed", slog.String("queue", p.config.Queue), slog.String("queue_in_message", msg.Queue))
p.testSignals.Resumed.Signal(struct{}{})
if p.config.QueueEventCallback != nil {
p.config.QueueEventCallback(p.config.Queue, nil)
}
case controlActionCancel:
// Separate this case to make linter happy:
p.Logger.DebugContext(workCtx, p.Name+": Unhandled queue control action", "action", msg.Action)
Expand Down

0 comments on commit e638231

Please sign in to comment.