diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..239168c --- /dev/null +++ b/doc.go @@ -0,0 +1,27 @@ +// Package workerpoolxt wraps gammazero/workerpool +/* + * ------------------------------------------------------------------------------------------ + * Many thanks to + * github.com/gammazero/workerpool & github.com/gammazero/deque + * github.com/cenkalti/backoff + * + * Please give them a star on GitHub! + * + * They're really doing all the heavy lifting for us + * ------------------------------------------------------------------------------------------ + */ +// +// Response Channel +// +// If our timeout has passed, return an error object, disregarding any response from +// job which timed out. Same reason we check for context error before placing results +// on the response chan. We have no idea what Job.Task will be doing..lets say sending +// a GET request. If the http request takes 11 seconds, but the timeout is 10 seconds, +// our hands are tied. +// +// * We cannot cancel in flight requests, whether long running http or simply `time.Sleep`. +// So we just ignore the response when it eventually comes on the 12th second. +// +// TODO: * +// ... +package workerpoolxt diff --git a/workerpoolxt.go b/workerpoolxt.go index 4daef18..375e40e 100644 --- a/workerpoolxt.go +++ b/workerpoolxt.go @@ -33,37 +33,60 @@ type WorkerPoolXT struct { responses []Response } -// SubmitXT submits a job. Allows you to not only submit a job, but get the response from it +// SubmitXT submits a job. Allows you to not +// only submit a job, but get the response from it func (wp *WorkerPoolXT) SubmitXT(job Job) { wp.Submit(wp.wrap(job)) } -// StopWaitXT gets results then kills the worker pool. You cannot add jobs after calling `StopWaitXT()` +// StopWaitXT gets results then kills the worker pool. +// You cannot add jobs after calling `StopWaitXT()` func (wp *WorkerPoolXT) StopWaitXT() (rs []Response) { wp.stop(false) return wp.responses } -// WithOptions sets default options for each job You can also supply options on a per job basis, +// WithOptions sets default options for each job +// You can also supply options on a per job basis, // which will override the default options. func (wp *WorkerPoolXT) WithOptions(o Options) { wp.options = o } -// stop either stops the worker pool now or later -func (wp *WorkerPoolXT) stop(now bool) { - wp.once.Do(func() { - if now { - wp.Stop() - } else { - wp.StopWait() - } - close(wp.responsesChan) - wp.killswitch <- true - }) +// getBackoff determines if a job is using Retry - if it is +// we configure the backoff +// +// TODO... I would love a 'native' way to retry jobs, though! +// +func (wp *WorkerPoolXT) getBackoff(job Job) backoff.BackOff { + var rbo backoff.BackOff + if job.Retry > 0 { + rbo = backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(job.Retry)) + } + return rbo +} + +// getOptions decides which options to use : default or job +func (wp *WorkerPoolXT) getOptions(job Job) Options { + if job.Options != nil { + return job.Options + } + if wp.options != nil { + return wp.options + } + return make(Options) } -// processResponses listens for anything on the responses chan and aggregates results +// getTimeout decides which timeout to use : default or job +func (wp *WorkerPoolXT) getTimeout(job Job) time.Duration { + if job.Timeout != 0 { + return job.Timeout + } + return wp.defaultTimeout +} + +// processResponses listens for anything on +// the responses chan and aggregates results func (wp *WorkerPoolXT) processResponses() { for { select { @@ -78,59 +101,81 @@ Done: <-wp.killswitch } -// work should be ran on it's own goroutine -// Sorts out job metadata and options -// Runs user provided Job.Task -func (wp *WorkerPoolXT) work(ctx context.Context, done context.CancelFunc, j Job, ts time.Time, bkoff backoff.BackOff) { +// stop either stops the worker pool now or later +func (wp *WorkerPoolXT) stop(now bool) { + wp.once.Do(func() { + if now { + wp.Stop() + } else { + wp.StopWait() + } + close(wp.responsesChan) + wp.killswitch <- true + }) +} + +// work should be ran on it's own goroutine. Sorts out job metadata and options and runs user provided Job.Task +func (wp *WorkerPoolXT) work(ctx context.Context, done context.CancelFunc, j Job, ts time.Time, bo backoff.BackOff) { jobtask := func() error { o := wp.getOptions(j) - // Run user provided task & set job specific metadata + // Run job provided by caller f := j.Task(o) - if f.Error != nil && bkoff != nil { + // We only want to return an error (or nil if no error) + // if the job is using Retry (which ultimately calls `backoff`). + // This is how `backoff` knows the job failed, but if we are + // not using Retry it will cause errors without this check. + if f.Error != nil && bo != nil { return f.Error } + f.runtimeDuration = time.Since(ts) f.name = j.Name - // This check is important as it keeps from sending duplicate responses on our responses chan + + // This check is important as it keeps + // from sending duplicate responses on our responses chan + // ** See `./doc.go`, section `Response Channel` for more info ** if ctx.Err() == nil { wp.responsesChan <- f } return nil } - if bkoff != nil { - if e := backoff.Retry(jobtask, bkoff); e != nil { - wp.responsesChan <- Response{name: j.Name, Error: e} + if bo != nil { + // If the job is using Retry + jobErr := backoff.Retry(jobtask, bo) + if jobErr != nil { + // Send a Response, with our error, to our response chan. + wp.responsesChan <- Response{name: j.Name, Error: jobErr} } } else { + // If job is not using Retry, + // simply call the job we were given jobtask() } + // context.CancelFunc done() } // wrap generates the func that we pass to Submit. -// - If a timeout is not supplied with the job, we use the global default supplied when `New()` is called -// - If options are not supplied with the job, we use the global default supplied when `New()` is called -// - Responsible for injecting timeout and runtime duration func (wp *WorkerPoolXT) wrap(job Job) func() { timeout := wp.getTimeout(job) + // This is the func() that ultimately + // gets passed to `workerpool.Submit(f)` return func() { ctx, cancel := context.WithTimeout(context.Background(), timeout) start := time.Now() + retryBackoff := wp.getBackoff(job) - if job.Retry == 0 { - go wp.work(ctx, cancel, job, start, nil) - } else { - boff := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(job.Retry)) - go wp.work(ctx, cancel, job, start, boff) - } + go wp.work(ctx, cancel, job, start, retryBackoff) select { - // If our timeout has passed, return an error object, disregarding any response from job which timed out case <-ctx.Done(): switch ctx.Err() { case context.DeadlineExceeded: + // If our timeout has passed, return an error object, + // disregarding any response from job which timed out. + // ** See `./doc.go`, section `Response Channel` for more info ** wp.responsesChan <- Response{ Error: context.DeadlineExceeded, name: job.Name, @@ -141,25 +186,6 @@ func (wp *WorkerPoolXT) wrap(job Job) func() { } } -// getTimeout decides which timeout to use : default or job -func (wp *WorkerPoolXT) getTimeout(job Job) time.Duration { - if job.Timeout != 0 { - return job.Timeout - } - return wp.defaultTimeout -} - -// getOptions decides which options to use : default or job -func (wp *WorkerPoolXT) getOptions(job Job) Options { - if job.Options != nil { - return job.Options - } - if wp.options != nil { - return wp.options - } - return make(Options) -} - // Options hold misc options type Options map[string]interface{}