Skip to content

Commit

Permalink
jobloop: add WithLabel option
Browse files Browse the repository at this point in the history
This allows running multiple instances of the same Job in parallel while
keeping the individual instances separated. I need this in Limes where
we run one scrape job per service type. I cannot just use a
ProducerConsumerJob because the overall scrape job is not
ConcurrencySafe. Only the individual job instances are safe to run
concurrently to each other because they are all confined to their own
service type and thus cannot interfere with each other.

This interface is my second attempt at covering this usecase, and is
much less invasive than the first attempt. The major change is that
pretty much everything needs to have a jobConfig now, which also means
that the public API needs to change to be able to pass the WithLabel
option into ProcessOne. Thankfully, that's a backwards-compatible change
because only a variadic argument is added.
  • Loading branch information
majewsky committed Aug 17, 2023
1 parent 7ed3dcb commit d6d2e72
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 37 deletions.
31 changes: 19 additions & 12 deletions jobloop/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,35 @@ type cronJobImpl struct {
j *CronJob
}

// ProcessOne implements the Job interface.
func (i cronJobImpl) ProcessOne(ctx context.Context) error {
// Core behavior of ProcessOne(). This is a separate function because it is reused in runOnce().
func (i cronJobImpl) processOne(ctx context.Context, cfg jobConfig) error {
j := i.j

labels := j.Metadata.makeLabels()
labels := j.Metadata.makeLabels(cfg)
err := j.Task(ctx, labels)
j.Metadata.countTask(labels, err)
return err
}

// ProcessOne implements the Job interface.
func (i cronJobImpl) ProcessOne(ctx context.Context, opts ...Option) error {
return i.processOne(ctx, newJobConfig(opts))
}

// Run implements the Job interface.
func (i cronJobImpl) Run(ctx context.Context, opts ...Option) {
cfg := newJobConfig(opts)
runOnce := func() {
err := i.processOne(ctx, cfg)
if err != nil {
logg.Error("could not run task%s for job %q: %s",
cfg.PrefilledLabelsAsString(), i.j.Metadata.ReadableName, err.Error())
}
}

if i.j.InitialDelay != 0 {
time.Sleep(i.j.InitialDelay)
i.runOnce(ctx)
runOnce()
}

ticker := time.NewTicker(i.j.Interval)
Expand All @@ -88,14 +102,7 @@ func (i cronJobImpl) Run(ctx context.Context, opts ...Option) {
case <-ctx.Done():
return
case <-ticker.C:
i.runOnce(ctx)
runOnce()
}
}
}

func (i cronJobImpl) runOnce(ctx context.Context) {
err := i.ProcessOne(ctx)
if err != nil {
logg.Error("could not run task for job %q: %s", i.j.Metadata.ReadableName, err.Error())
}
}
7 changes: 4 additions & 3 deletions jobloop/jobloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
type Job interface {
// ProcessOne finds and executes exactly one task, aborting early if `ctx` expires.
// If no task is available to be executed, `sql.ErrNoRows` is returned.
ProcessOne(ctx context.Context) error
// The runtime behavior of the job can be configured through Option arguments.
ProcessOne(ctx context.Context, opts ...Option) error
// Run blocks the current goroutine and executes tasks until `ctx` expires.
// The runtime behavior of the job can be configured through Option arguments.
Run(ctx context.Context, opts ...Option)
Expand All @@ -42,9 +43,9 @@ type Job interface {
// be executed, `sql.ErrNoRows` is returned. If any error is encountered, processing stops early.
//
// If only go would support member functions on interfaces...
func ProcessMany(j Job, ctx context.Context, count int) error {
func ProcessMany(j Job, ctx context.Context, count int, opts ...Option) error {
for i := 1; i <= count; i++ {
err := j.ProcessOne(ctx)
err := j.ProcessOne(ctx, opts...)
if err != nil {
return fmt.Errorf("failed in iteration %d: %w", i, err)
}
Expand Down
5 changes: 4 additions & 1 deletion jobloop/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@ func (m *JobMetadata) setup(registerer prometheus.Registerer) {

// Internal API for job implementations: Fills a fresh label set with default
// values for all labels defined for this job's CounterVec.
func (m *JobMetadata) makeLabels() prometheus.Labels {
func (m *JobMetadata) makeLabels(cfg jobConfig) prometheus.Labels {
labels := make(prometheus.Labels, len(m.CounterLabels)+1)
for _, label := range m.CounterLabels {
labels[label] = "early-db-access"
}
for label, value := range cfg.PrefilledLabels {
labels[label] = value
}
return labels
}

Expand Down
43 changes: 42 additions & 1 deletion jobloop/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@

package jobloop

import (
"fmt"
"sort"
"strings"

"github.com/prometheus/client_golang/prometheus"
)

// Option is a configuration option for a Job. Currently, only the number of
// goroutines can be configured, but more options could be added in the future.
//
Expand All @@ -27,7 +35,8 @@ package jobloop
type Option func(*jobConfig)

type jobConfig struct {
NumGoroutines uint
NumGoroutines uint
PrefilledLabels prometheus.Labels
}

func newJobConfig(opts []Option) jobConfig {
Expand All @@ -43,11 +52,43 @@ func newJobConfig(opts []Option) jobConfig {
return cfg
}

// PrefilledLabelsAsString returns a representation of cfg.PrefilledLabels
// that is suitable for log messages.
func (cfg jobConfig) PrefilledLabelsAsString() string {
if len(cfg.PrefilledLabels) == 0 {
return ""
}

fields := make([]string, 0, len(cfg.PrefilledLabels))
for label, value := range cfg.PrefilledLabels {
fields = append(fields, fmt.Sprintf("%s=%q", label, value))
}
sort.Strings(fields)
return fmt.Sprintf(" (%s)", strings.Join(fields, ", "))
}

// NumGoroutines is an option for a Job that allows the Job to use multiple
// goroutines, up to the specified number. The default value is 1, meaning that
// no concurrency will be employed.
//
// This option is always ignored during ProcessOne(), because a single task
// does not require concurrency on the level of the job runtime.
func NumGoroutines(n uint) Option {
return func(cfg *jobConfig) {
cfg.NumGoroutines = n
}
}

// WithLabel is an option for a Job that prefills one of the CounterLabels
// declared in the job's metadata before each task. This is useful for running
// multiple instances of a job in parallel while reusing the JobMetadata, task
// callbacks, and Prometheus metrics. Task callbacks can inspect the overridden
// label value to discover which particular instance of the job they belong to.
func WithLabel(label, value string) Option {
return func(cfg *jobConfig) {
if cfg.PrefilledLabels == nil {
cfg.PrefilledLabels = make(prometheus.Labels)
}
cfg.PrefilledLabels[label] = value
}
}
47 changes: 27 additions & 20 deletions jobloop/producer_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,36 +106,43 @@ type producerConsumerJobImpl[T any] struct {

// Core producer-side behavior. This is used by ProcessOne in unit tests, as
// well as by runSingleThreaded and runMultiThreaded in production.
func (j *ProducerConsumerJob[T]) produceOne(ctx context.Context) (T, prometheus.Labels, error) {
labels := j.Metadata.makeLabels()
func (j *ProducerConsumerJob[T]) produceOne(ctx context.Context, cfg jobConfig, annotateErrors bool) (T, prometheus.Labels, error) {
labels := j.Metadata.makeLabels(cfg)
task, err := j.DiscoverTask(ctx, labels)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
err = fmt.Errorf("could not select task for job %q: %w", j.Metadata.ReadableName, err)
if err != nil && !errors.Is(err, sql.ErrNoRows) && annotateErrors {
err = fmt.Errorf("could not select task%s for job %q: %w",
cfg.PrefilledLabelsAsString(), j.Metadata.ReadableName, err)
j.Metadata.countTask(labels, err)
}
return task, labels, err
}

// Core consumer-side behavior. This is used by ProcessOne in unit tests, as
// well as by runSingleThreaded and runMultiThreaded in production.
func (j *ProducerConsumerJob[T]) consumeOne(ctx context.Context, task T, labels prometheus.Labels) error {
func (j *ProducerConsumerJob[T]) consumeOne(ctx context.Context, cfg jobConfig, task T, labels prometheus.Labels, annotateErrors bool) error {
err := j.ProcessTask(ctx, task, labels)
if err != nil {
err = fmt.Errorf("could not process task for job %q: %w", j.Metadata.ReadableName, err)
if err != nil && annotateErrors {
err = fmt.Errorf("could not process task%s for job %q: %w",
cfg.PrefilledLabelsAsString(), j.Metadata.ReadableName, err)
}
j.Metadata.countTask(labels, err)
return err
}

// ProcessOne implements the jobloop.Job interface.
func (i producerConsumerJobImpl[T]) ProcessOne(ctx context.Context) error {
// Core behavior of ProcessOne(). This is a separate function because it is reused in runSingleThreaded().
func (i producerConsumerJobImpl[T]) processOne(ctx context.Context, cfg jobConfig) error {
j := i.j

task, labels, err := j.produceOne(ctx)
task, labels, err := j.produceOne(ctx, cfg, false)
if err != nil {
return err
}
return j.consumeOne(ctx, task, labels)
return j.consumeOne(ctx, cfg, task, labels, false)
}

// ProcessOne implements the jobloop.Job interface.
func (i producerConsumerJobImpl[T]) ProcessOne(ctx context.Context, opts ...Option) error {
return i.processOne(ctx, newJobConfig(opts))
}

// Run implements the jobloop.Job interface.
Expand All @@ -146,19 +153,19 @@ func (i producerConsumerJobImpl[T]) Run(ctx context.Context, opts ...Option) {
case 0:
panic("ProducerConsumerJob.Run() called with numGoroutines == 0")
case 1:
i.runSingleThreaded(ctx)
i.runSingleThreaded(ctx, cfg)
default:
if !i.j.Metadata.ConcurrencySafe {
panic("ProducerConsumerJob.Run() called with numGoroutines > 1, but job is not ConcurrencySafe")
}
i.runMultiThreaded(ctx, cfg.NumGoroutines)
i.runMultiThreaded(ctx, cfg)
}
}

// Implementation of Run() for `cfg.NumGoroutines == 1`.
func (i producerConsumerJobImpl[T]) runSingleThreaded(ctx context.Context) {
func (i producerConsumerJobImpl[T]) runSingleThreaded(ctx context.Context, cfg jobConfig) {
for ctx.Err() == nil { //while ctx has not expired
err := i.ProcessOne(ctx)
err := i.processOne(ctx, cfg)
logAndSlowDownOnError(err)
}
}
Expand All @@ -169,7 +176,7 @@ type taskWithLabels[T any] struct {
}

// Implementation of Run() for `cfg.NumGoroutines > 1`.
func (i producerConsumerJobImpl[T]) runMultiThreaded(ctx context.Context, numGoroutines uint) {
func (i producerConsumerJobImpl[T]) runMultiThreaded(ctx context.Context, cfg jobConfig) {
j := i.j
ch := make(chan taskWithLabels[T]) //unbuffered!
var wg sync.WaitGroup
Expand All @@ -179,7 +186,7 @@ func (i producerConsumerJobImpl[T]) runMultiThreaded(ctx context.Context, numGor
go func(ch chan<- taskWithLabels[T]) {
defer wg.Done()
for ctx.Err() == nil { //while ctx has not expired
task, labels, err := j.produceOne(ctx)
task, labels, err := j.produceOne(ctx, cfg, true)
if err == nil {
ch <- taskWithLabels[T]{task, labels}
} else {
Expand All @@ -195,12 +202,12 @@ func (i producerConsumerJobImpl[T]) runMultiThreaded(ctx context.Context, numGor
//
//We use `numGoroutines-1` here since we already have spawned one goroutine
//for the polling above.
wg.Add(int(numGoroutines - 1))
for i := uint(0); i < numGoroutines-1; i++ {
wg.Add(int(cfg.NumGoroutines - 1))
for i := uint(0); i < cfg.NumGoroutines-1; i++ {
go func(ch <-chan taskWithLabels[T]) {
defer wg.Done()
for item := range ch {
err := j.consumeOne(ctx, item.Task, item.Labels)
err := j.consumeOne(ctx, cfg, item.Task, item.Labels, true)
if err != nil {
logg.Error(err.Error())
}
Expand Down

0 comments on commit d6d2e72

Please sign in to comment.