Skip to content

Commit

Permalink
jobloop: pass context.Context around everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
majewsky committed May 24, 2023
1 parent 44f0894 commit d3df298
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 29 deletions.
8 changes: 4 additions & 4 deletions jobloop/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type CronJob struct {
// Metadata.CounterLabels and all label values set to "early-db-access".
// The implementation is expected to substitute the actual label values as
// soon as they become known.
Task func(prometheus.Labels) error
Task func(context.Context, prometheus.Labels) error
}

// Setup builds the Job interface for this job and registers the counter
Expand All @@ -60,11 +60,11 @@ type cronJobImpl struct {
}

// ProcessOne implements the Job interface.
func (i cronJobImpl) ProcessOne() error {
func (i cronJobImpl) ProcessOne(ctx context.Context) error {
j := i.j

labels := j.Metadata.makeLabels()
err := j.Task(labels)
err := j.Task(ctx, labels)
j.Metadata.countTask(labels, err)
return err
}
Expand All @@ -79,7 +79,7 @@ func (i cronJobImpl) Run(ctx context.Context, opts ...Option) {
case <-ctx.Done():
return
case <-ticker.C:
err := i.ProcessOne()
err := i.ProcessOne(ctx)
if err != nil {
logg.Error("could not run task for job %q: %s", i.j.Metadata.ReadableName, err.Error())
}
Expand Down
11 changes: 6 additions & 5 deletions jobloop/jobloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (

// Job describes a loop that executes instances of a specific type of task.
type Job interface {
// ProcessOne finds and executes exactly one task. If no task is available to
// be executed, `sql.ErrNoRows` is returned.
ProcessOne() error
// ProcessOne finds and executes exactly one task, aborting early if `ctx`
// expires. If no task is available to be executed, `sql.ErrNoRows` is
// returned.
ProcessOne(ctx context.Context) error
// Run blocks the current goroutine and executes tasks until `ctx` expires.
// The runtime behavior of the job can be configured through Option arguments.
Run(ctx context.Context, opts ...Option)
Expand All @@ -42,9 +43,9 @@ type Job interface {
// be executed, `sql.ErrNoRows` is returned. If any error is encountered, processing stops early.
//
// If only go would support member functions on interfaces...
func ProcessMany(j Job, count int) error {
func ProcessMany(j Job, ctx context.Context, count int) error {
for i := 1; i <= count; i++ {
err := j.ProcessOne()
err := j.ProcessOne(ctx)
if err != nil {
return fmt.Errorf("failed in iteration %d: %w", i, err)
}
Expand Down
24 changes: 12 additions & 12 deletions jobloop/producer_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ type ProducerConsumerJob[T any] struct {
// Metadata.CounterLabels and all label values set to "early-db-access". The
// implementation is expected to substitute the actual label values as soon
// as they become known.
DiscoverTask func(prometheus.Labels) (T, error)
DiscoverTask func(context.Context, prometheus.Labels) (T, error)
// A function that will be used to process a task that has been discovered
// within this job.
//
// The provided label set will have been prefilled with the labels from
// Metadata.CounterLabels and all label values set to "early-db-access". The
// implementation is expected to substitute the actual label values as soon
// as they become known.
ProcessTask func(T, prometheus.Labels) error
ProcessTask func(context.Context, T, prometheus.Labels) error
}

// Setup builds the Job interface for this job and registers the counter
Expand All @@ -105,9 +105,9 @@ type producerConsumerJobImpl[T any] struct {

// Core producer-side behavior. This is used by ProcessOne in unit tests, as
// well as by runSingleThreaded and runMultiThreaded in production.
func (j *ProducerConsumerJob[T]) produceOne() (T, prometheus.Labels, error) {
func (j *ProducerConsumerJob[T]) produceOne(ctx context.Context) (T, prometheus.Labels, error) {
labels := j.Metadata.makeLabels()
task, err := j.DiscoverTask(labels)
task, err := j.DiscoverTask(ctx, labels)
if err != nil && err != sql.ErrNoRows {
err = fmt.Errorf("could not select task for job %q: %w", j.Metadata.ReadableName, err)
j.Metadata.countTask(labels, err)
Expand All @@ -117,8 +117,8 @@ func (j *ProducerConsumerJob[T]) produceOne() (T, prometheus.Labels, error) {

// Core consumer-side behavior. This is used by ProcessOne in unit tests, as
// well as by runSingleThreaded and runMultiThreaded in production.
func (j *ProducerConsumerJob[T]) consumeOne(task T, labels prometheus.Labels) error {
err := j.ProcessTask(task, labels)
func (j *ProducerConsumerJob[T]) consumeOne(ctx context.Context, task T, labels prometheus.Labels) error {
err := j.ProcessTask(ctx, task, labels)
if err != nil {
err = fmt.Errorf("could not process task for job %q: %w", j.Metadata.ReadableName, err)
}
Expand All @@ -127,14 +127,14 @@ func (j *ProducerConsumerJob[T]) consumeOne(task T, labels prometheus.Labels) er
}

// ProcessOne implements the jobloop.Job interface.
func (i producerConsumerJobImpl[T]) ProcessOne() error {
func (i producerConsumerJobImpl[T]) ProcessOne(ctx context.Context) error {
j := i.j

task, labels, err := j.produceOne()
task, labels, err := j.produceOne(ctx)
if err != nil {
return err
}
return j.consumeOne(task, labels)
return j.consumeOne(ctx, task, labels)
}

// Run implements the jobloop.Job interface.
Expand All @@ -157,7 +157,7 @@ func (i producerConsumerJobImpl[T]) Run(ctx context.Context, opts ...Option) {
// Implementation of Run() for `cfg.NumGoroutines == 1`.
func (i producerConsumerJobImpl[T]) runSingleThreaded(ctx context.Context) {
for ctx.Err() == nil { //while ctx has not expired
err := i.ProcessOne()
err := i.ProcessOne(ctx)
logAndSlowDownOnError(err)
}
}
Expand All @@ -178,7 +178,7 @@ func (i producerConsumerJobImpl[T]) runMultiThreaded(ctx context.Context, numGor
go func(ch chan<- taskWithLabels[T]) {
defer wg.Done()
for ctx.Err() == nil { //while ctx has not expired
task, labels, err := j.produceOne()
task, labels, err := j.produceOne(ctx)
if err == nil {
ch <- taskWithLabels[T]{task, labels}
} else {
Expand All @@ -199,7 +199,7 @@ func (i producerConsumerJobImpl[T]) runMultiThreaded(ctx context.Context, numGor
go func(ch <-chan taskWithLabels[T]) {
defer wg.Done()
for item := range ch {
err := j.consumeOne(item.Task, item.Labels)
err := j.consumeOne(ctx, item.Task, item.Labels)
if err != nil {
logg.Error(err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions jobloop/producer_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (e *producerConsumerEngine) Job(registerer prometheus.Registerer) Job {
}).Setup(registerer)
}

func (e *producerConsumerEngine) DiscoverTask(labels prometheus.Labels) (int, error) {
func (e *producerConsumerEngine) DiscoverTask(ctx context.Context, labels prometheus.Labels) (int, error) {
e.mutex.Lock()
defer e.mutex.Unlock()

Expand All @@ -73,7 +73,7 @@ func (e *producerConsumerEngine) DiscoverTask(labels prometheus.Labels) (int, er
return e.discovered, nil
}

func (e *producerConsumerEngine) ProcessTask(value int, labels prometheus.Labels) error {
func (e *producerConsumerEngine) ProcessTask(ctx context.Context, value int, labels prometheus.Labels) error {
//signal to the test that ProcessTask has been started (the test uses this to
//wait until the expected number of tasks were scheduled)
e.wgProcessorsReady.Done()
Expand Down
13 changes: 7 additions & 6 deletions jobloop/tx_guarded.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package jobloop

import (
"context"
"database/sql"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -70,14 +71,14 @@ type TxGuardedJob[Tx sqlext.Rollbacker, P any] struct {
// Metadata.CounterLabels and all label values set to "early-db-access". The
// implementation is expected to substitute the actual label values as soon
// as they become known.
DiscoverRow func(Tx, prometheus.Labels) (P, error)
DiscoverRow func(context.Context, Tx, prometheus.Labels) (P, error)
// A function that will be called once for each discovered row to process it.
//
// The provided label set will have been prefilled with the labels from
// Metadata.CounterLabels and all label values set to "early-db-access". The
// implementation is expected to substitute the actual label values as soon
// as they become known.
ProcessRow func(Tx, P, prometheus.Labels) error
ProcessRow func(context.Context, Tx, P, prometheus.Labels) error
}

// Setup builds the Job interface for this job and registers the counter
Expand Down Expand Up @@ -108,7 +109,7 @@ type txGuardedTask[Tx sqlext.Rollbacker, P any] struct {

// Core producer-side behavior. This is used by ProcessOne in unit tests, as
// well as by runSingleThreaded and runMultiThreaded in production.
func (j *TxGuardedJob[Tx, P]) discoverTask(labels prometheus.Labels) (task *txGuardedTask[Tx, P], returnedError error) {
func (j *TxGuardedJob[Tx, P]) discoverTask(ctx context.Context, labels prometheus.Labels) (task *txGuardedTask[Tx, P], returnedError error) {
tx, err := j.BeginTx()
if err != nil {
return nil, err
Expand All @@ -119,7 +120,7 @@ func (j *TxGuardedJob[Tx, P]) discoverTask(labels prometheus.Labels) (task *txGu
}
}()

payload, err := j.DiscoverRow(tx, labels)
payload, err := j.DiscoverRow(ctx, tx, labels)
if err != nil {
if err == sql.ErrNoRows {
//nolint:errcheck
Expand All @@ -134,7 +135,7 @@ func (j *TxGuardedJob[Tx, P]) discoverTask(labels prometheus.Labels) (task *txGu
}, nil
}

func (j *TxGuardedJob[Tx, P]) processTask(task *txGuardedTask[Tx, P], labels prometheus.Labels) error {
func (j *TxGuardedJob[Tx, P]) processTask(ctx context.Context, task *txGuardedTask[Tx, P], labels prometheus.Labels) error {
defer sqlext.RollbackUnlessCommitted(task.Transaction)
return j.ProcessRow(task.Transaction, task.Payload, labels)
return j.ProcessRow(ctx, task.Transaction, task.Payload, labels)
}

0 comments on commit d3df298

Please sign in to comment.