Skip to content

Commit

Permalink
clean up code; add comments
Browse files Browse the repository at this point in the history
Clean up the "notes-style" code I had for POC purposes
  • Loading branch information
oze4 committed Oct 13, 2020
1 parent 497f46a commit 31ceb3e
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 54 deletions.
27 changes: 27 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Package workerpoolxt wraps gammazero/workerpool
/*
* ------------------------------------------------------------------------------------------
* Many thanks to
* github.com/gammazero/workerpool & github.com/gammazero/deque
* github.com/cenkalti/backoff
*
* Please give them a star on GitHub!
*
* They're really doing all the heavy lifting for us
* ------------------------------------------------------------------------------------------
*/
//
// Response Channel
//
// If our timeout has passed, return an error object, disregarding any response from
// job which timed out. Same reason we check for context error before placing results
// on the response chan. We have no idea what Job.Task will be doing..lets say sending
// a GET request. If the http request takes 11 seconds, but the timeout is 10 seconds,
// our hands are tied.
//
// * We cannot cancel in flight requests, whether long running http or simply `time.Sleep`.
// So we just ignore the response when it eventually comes on the 12th second.
//
// TODO: *
// ...
package workerpoolxt
134 changes: 80 additions & 54 deletions workerpoolxt.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,60 @@ type WorkerPoolXT struct {
responses []Response
}

// SubmitXT submits a job. Allows you to not only submit a job, but get the response from it
// SubmitXT submits a job. Allows you to not
// only submit a job, but get the response from it
func (wp *WorkerPoolXT) SubmitXT(job Job) {
wp.Submit(wp.wrap(job))
}

// StopWaitXT gets results then kills the worker pool. You cannot add jobs after calling `StopWaitXT()`
// StopWaitXT gets results then kills the worker pool.
// You cannot add jobs after calling `StopWaitXT()`
func (wp *WorkerPoolXT) StopWaitXT() (rs []Response) {
wp.stop(false)
return wp.responses
}

// WithOptions sets default options for each job You can also supply options on a per job basis,
// WithOptions sets default options for each job
// You can also supply options on a per job basis,
// which will override the default options.
func (wp *WorkerPoolXT) WithOptions(o Options) {
wp.options = o
}

// stop either stops the worker pool now or later
func (wp *WorkerPoolXT) stop(now bool) {
wp.once.Do(func() {
if now {
wp.Stop()
} else {
wp.StopWait()
}
close(wp.responsesChan)
wp.killswitch <- true
})
// getBackoff determines if a job is using Retry - if it is
// we configure the backoff
//
// TODO... I would love a 'native' way to retry jobs, though!
//
func (wp *WorkerPoolXT) getBackoff(job Job) backoff.BackOff {
var rbo backoff.BackOff
if job.Retry > 0 {
rbo = backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(job.Retry))
}
return rbo
}

// getOptions decides which options to use : default or job
func (wp *WorkerPoolXT) getOptions(job Job) Options {
if job.Options != nil {
return job.Options
}
if wp.options != nil {
return wp.options
}
return make(Options)
}

// processResponses listens for anything on the responses chan and aggregates results
// getTimeout decides which timeout to use : default or job
func (wp *WorkerPoolXT) getTimeout(job Job) time.Duration {
if job.Timeout != 0 {
return job.Timeout
}
return wp.defaultTimeout
}

// processResponses listens for anything on
// the responses chan and aggregates results
func (wp *WorkerPoolXT) processResponses() {
for {
select {
Expand All @@ -78,59 +101,81 @@ Done:
<-wp.killswitch
}

// work should be ran on it's own goroutine
// Sorts out job metadata and options
// Runs user provided Job.Task
func (wp *WorkerPoolXT) work(ctx context.Context, done context.CancelFunc, j Job, ts time.Time, bkoff backoff.BackOff) {
// stop either stops the worker pool now or later
func (wp *WorkerPoolXT) stop(now bool) {
wp.once.Do(func() {
if now {
wp.Stop()
} else {
wp.StopWait()
}
close(wp.responsesChan)
wp.killswitch <- true
})
}

// work should be ran on it's own goroutine. Sorts out job metadata and options and runs user provided Job.Task
func (wp *WorkerPoolXT) work(ctx context.Context, done context.CancelFunc, j Job, ts time.Time, bo backoff.BackOff) {
jobtask := func() error {
o := wp.getOptions(j)
// Run user provided task & set job specific metadata
// Run job provided by caller
f := j.Task(o)
if f.Error != nil && bkoff != nil {
// We only want to return an error (or nil if no error)
// if the job is using Retry (which ultimately calls `backoff`).
// This is how `backoff` knows the job failed, but if we are
// not using Retry it will cause errors without this check.
if f.Error != nil && bo != nil {
return f.Error
}

f.runtimeDuration = time.Since(ts)
f.name = j.Name
// This check is important as it keeps from sending duplicate responses on our responses chan

// This check is important as it keeps
// from sending duplicate responses on our responses chan
// ** See `./doc.go`, section `Response Channel` for more info **
if ctx.Err() == nil {
wp.responsesChan <- f
}
return nil
}

if bkoff != nil {
if e := backoff.Retry(jobtask, bkoff); e != nil {
wp.responsesChan <- Response{name: j.Name, Error: e}
if bo != nil {
// If the job is using Retry
jobErr := backoff.Retry(jobtask, bo)
if jobErr != nil {
// Send a Response, with our error, to our response chan.
wp.responsesChan <- Response{name: j.Name, Error: jobErr}
}
} else {
// If job is not using Retry,
// simply call the job we were given
jobtask()
}
// context.CancelFunc
done()
}

// wrap generates the func that we pass to Submit.
// - If a timeout is not supplied with the job, we use the global default supplied when `New()` is called
// - If options are not supplied with the job, we use the global default supplied when `New()` is called
// - Responsible for injecting timeout and runtime duration
func (wp *WorkerPoolXT) wrap(job Job) func() {
timeout := wp.getTimeout(job)

// This is the func() that ultimately
// gets passed to `workerpool.Submit(f)`
return func() {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
start := time.Now()
retryBackoff := wp.getBackoff(job)

if job.Retry == 0 {
go wp.work(ctx, cancel, job, start, nil)
} else {
boff := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(job.Retry))
go wp.work(ctx, cancel, job, start, boff)
}
go wp.work(ctx, cancel, job, start, retryBackoff)

select {
// If our timeout has passed, return an error object, disregarding any response from job which timed out
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
// If our timeout has passed, return an error object,
// disregarding any response from job which timed out.
// ** See `./doc.go`, section `Response Channel` for more info **
wp.responsesChan <- Response{
Error: context.DeadlineExceeded,
name: job.Name,
Expand All @@ -141,25 +186,6 @@ func (wp *WorkerPoolXT) wrap(job Job) func() {
}
}

// getTimeout decides which timeout to use : default or job
func (wp *WorkerPoolXT) getTimeout(job Job) time.Duration {
if job.Timeout != 0 {
return job.Timeout
}
return wp.defaultTimeout
}

// getOptions decides which options to use : default or job
func (wp *WorkerPoolXT) getOptions(job Job) Options {
if job.Options != nil {
return job.Options
}
if wp.options != nil {
return wp.options
}
return make(Options)
}

// Options hold misc options
type Options map[string]interface{}

Expand Down

0 comments on commit 31ceb3e

Please sign in to comment.