Skip to content
This repository has been archived by the owner on Dec 8, 2020. It is now read-only.

Commit

Permalink
Breaking: Revise scheduler to correctly propagate error behavior
Browse files Browse the repository at this point in the history
This change represents effectively a V2 for the scheduler package.
Although the previous implementation worked sufficiently well, it had
serious limitations in its ability to manage errors (for example, a
Parent would exit as soon as any Segment exited, regardless of the
desired error behavior of the parent).

Additionally, the logic in the Segment implementation was intermixed
with the base functionality of managing Goroutines. As part of providing
the same error handling support to both Parent and Segment, it became
increasingly evident that there existed a common base between the two
lifecycles, namely, the ability to wait for a Goroutine to complete and
collect errors from it as it runs. This functionality is now exposed as
the Scheduler type.

Work in another project exposed the opportunity to provide an additional
Descriptor implementation to allow external parties to submit new work
to the scheduler, now provided as AdhocDescriptor and AdhocSubmitter.
This gives us rough parity to Java's ExecutorService.

We add documentation for most types and a few additional tests.
  • Loading branch information
impl committed Sep 10, 2019
1 parent b271dd0 commit db6c5f1
Show file tree
Hide file tree
Showing 22 changed files with 1,211 additions and 583 deletions.
140 changes: 140 additions & 0 deletions scheduler/adhoc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package scheduler

import (
"context"
"sync"
)

// adhocProcess wraps a process with a channel that can be used for notifying a
// caller of the process result.
type adhocProcess struct {
ch chan<- error
delegate Process
}

func (ap *adhocProcess) Description() string {
return ap.delegate.Description()
}

func (ap *adhocProcess) Run(ctx context.Context) (err error) {
defer func() {
if r := recover(); r != nil {
err = coerceError(r)

// Re-panic after we capture the error.
defer panic(r)
}

ap.ch <- err
}()

return ap.delegate.Run(ctx)
}

// AdhocDescriptor is a descriptor that allows external access to submit work to
// be scheduled. It is paired with an AdhocSubmitter, which should be provided
// to external clients to receive the work.
//
// This descriptor is non-blocking; it will indefinitely queue work, consuming a
// proportional amount of memory per pending process if the scheduler does not
// have availability. You may want to rate limit submissions.
type AdhocDescriptor struct {
queue []*adhocProcess
cond *sync.Cond
}

var _ Descriptor = &AdhocDescriptor{}

func (ad *AdhocDescriptor) runOnce(ctx context.Context) (*adhocProcess, bool) {
ad.cond.L.Lock()
defer ad.cond.L.Unlock()

for len(ad.queue) == 0 {
select {
case <-ctx.Done():
return nil, false
default:
}

ad.cond.Wait()
}

// Pluck the first item. We zero it out in the queue to make sure we can
// garbage collect the struct when it's done processing.
next := ad.queue[0]

ad.queue[0] = nil
ad.queue = ad.queue[1:]

return next, true
}

// Run executes this descriptor with the given process channel.
func (ad *AdhocDescriptor) Run(ctx context.Context, pc chan<- Process) error {
doneCh := make(chan struct{})
defer close(doneCh)

go func() {
select {
case <-doneCh:
case <-ctx.Done():
// There is a slight inefficiency here because we need to make sure
// we only wake up the descriptor waiting in the current context,
// but we don't know which one that is, so we have to broadcast.
ad.cond.L.Lock()
defer ad.cond.L.Unlock()

ad.cond.Broadcast()
}
}()

for {
p, ok := ad.runOnce(ctx)
if !ok {
break
}

pc <- p
}

return nil
}

// AdhocSubmitter is used to submit work to an adhoc descriptor.
//
// Work is always immediately enqueued.
type AdhocSubmitter struct {
target *AdhocDescriptor
}

// QueueLen returns the number of work items in the descriptor's queue. These
// items have not yet been submitted to the scheduler for processing.
func (as *AdhocSubmitter) QueueLen() int {
as.target.cond.L.Lock()
defer as.target.cond.L.Unlock()

return len(as.target.queue)
}

// Submit adds a new work item to the descriptor's queue.
func (as *AdhocSubmitter) Submit(p Process) <-chan error {
as.target.cond.L.Lock()
defer as.target.cond.L.Unlock()

ch := make(chan error, 1)

as.target.queue = append(as.target.queue, &adhocProcess{delegate: p, ch: ch})
as.target.cond.Signal()

return ch
}

// NewAdhocDescriptor returns a bound pair of adhoc descriptor and submitter.
// Submitting work items through the returned submitter will enqueue them to the
// returned descriptor.
func NewAdhocDescriptor() (*AdhocDescriptor, *AdhocSubmitter) {
ad := &AdhocDescriptor{cond: sync.NewCond(&sync.Mutex{})}
as := &AdhocSubmitter{target: ad}

return ad, as
}
158 changes: 158 additions & 0 deletions scheduler/adhoc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package scheduler_test

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/puppetlabs/errawr-go/v2/pkg/errawr"
"github.com/puppetlabs/errawr-go/v2/pkg/testutil"
"github.com/puppetlabs/horsehead/scheduler"
"github.com/stretchr/testify/assert"
)

func TestAdhocQueue(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

ad, as := scheduler.NewAdhocDescriptor()
lc := scheduler.
NewSegment(1, []scheduler.Descriptor{ad}).
WithErrorBehavior(scheduler.ErrorBehaviorDrop)

p1 := as.Submit(scheduler.DescribeProcessFunc("p1", func(ctx context.Context) error {
return nil
}))
p2 := as.Submit(scheduler.DescribeProcessFunc("p2", func(ctx context.Context) error {
return nil
}))

slc := lc.Start(scheduler.LifecycleStartOptions{})
defer func() {
assert.NoError(t, scheduler.CloseWaitContext(ctx, slc))
assert.Empty(t, slc.Errs())
}()

select {
case err := <-p1:
assert.NoError(t, err, "p1 returned error")
case <-ctx.Done():
assert.Fail(t, "p1 context expired")
}

select {
case err := <-p2:
assert.NoError(t, err, "p2 returned error")
case <-ctx.Done():
assert.Fail(t, "p2 context expired")
}
}

func TestAdhocErrors(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

ad, as := scheduler.NewAdhocDescriptor()
lc := scheduler.
NewSegment(1, []scheduler.Descriptor{ad}).
WithErrorBehavior(scheduler.ErrorBehaviorDrop)

p1 := as.Submit(scheduler.DescribeProcessFunc("p1", func(ctx context.Context) error {
return testutil.NewStubError("p1")
}))
p2 := as.Submit(scheduler.DescribeProcessFunc("p2", func(ctx context.Context) error {
panic(testutil.NewStubError("p2"))
}))

slc := lc.Start(scheduler.LifecycleStartOptions{})
defer func() {
assert.NoError(t, scheduler.CloseWaitContext(ctx, slc))
assert.Empty(t, slc.Errs())
}()

select {
case err := <-p1:
switch rerr := err.(type) {
case errawr.Error:
assert.Equal(t, "p1", rerr.Code())
default:
assert.Fail(t, "p1 did not return an error")
}
case <-ctx.Done():
assert.Fail(t, "p1 context expired")
}

select {
case err := <-p2:
switch rerr := err.(type) {
case errawr.Error:
assert.Equal(t, "p2", rerr.Code())
default:
assert.Fail(t, "p2 did not return an error")
}
case <-ctx.Done():
assert.Fail(t, "p2 context expired")
}
}

func TestAdhocSubmissionMultipleLifecycles(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

ad, as := scheduler.NewAdhocDescriptor()

slc1 := scheduler.
NewSegment(1, []scheduler.Descriptor{ad}).
WithErrorBehavior(scheduler.ErrorBehaviorDrop).
Start(scheduler.LifecycleStartOptions{})
defer func() {
assert.NoError(t, scheduler.CloseWaitContext(ctx, slc1))
assert.Empty(t, slc1.Errs())
}()

slc2 := scheduler.
NewSegment(1, []scheduler.Descriptor{ad}).
WithErrorBehavior(scheduler.ErrorBehaviorDrop).
Start(scheduler.LifecycleStartOptions{})
defer func() {
assert.NoError(t, scheduler.CloseWaitContext(ctx, slc2))
assert.Empty(t, slc2.Errs())
}()

var rc int32
p1 := as.Submit(scheduler.DescribeProcessFunc("p1", func(ctx context.Context) error {
// Should run exactly once, even if there are two active schedulers.
atomic.AddInt32(&rc, 1)
return nil
}))

assert.NoError(t, <-p1)
assert.Equal(t, int32(1), rc)

// Close one of the segments; we should still be able to run jobs on the
// second one.
assert.NoError(t, scheduler.CloseWaitContext(ctx, slc1))
assert.Empty(t, slc1.Errs())

p2 := as.Submit(scheduler.DescribeProcessFunc("p2", func(ctx context.Context) error {
atomic.AddInt32(&rc, 1)
return nil
}))

assert.NoError(t, <-p2)
assert.Equal(t, int32(2), rc)

// Close the second segment. Now a process should just go into the queue as
// there is nothing to run it.
assert.NoError(t, scheduler.CloseWaitContext(ctx, slc2))
assert.Empty(t, slc2.Errs())

as.Submit(scheduler.DescribeProcessFunc("p3", func(ctx context.Context) error {
atomic.AddInt32(&rc, 1)
return nil
}))

assert.Equal(t, 1, as.QueueLen())
assert.Equal(t, int32(2), rc)
}
16 changes: 16 additions & 0 deletions scheduler/capturer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package scheduler

import (
"github.com/puppetlabs/horsehead/instrumentation/alerts"
"github.com/puppetlabs/horsehead/instrumentation/alerts/trackers"
)

var defaultCapturer = alerts.NewAlerts(alerts.NoDelegate, alerts.Options{}).NewCapturer()

func coalesceCapturer(candidate trackers.Capturer) trackers.Capturer {
if candidate == nil {
return defaultCapturer
}

return candidate
}
63 changes: 63 additions & 0 deletions scheduler/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Package scheduler provides a managed API to Goroutines using Lifecycles.
The most basic type of management is using the Schedulable interface with a
Scheduler:
worker := scheduler.SchedulableFunc(func(ctx context.Context, er scheduler.ErrorReporter) {
for {
select {
case <-ctx.Done():
return
case <-time.After(100 * time.Millisecond):
fmt.Println("Mmm... pie.")
}
}
})
l := scheduler.NewScheduler(scheduler.OneSchedulable(worker))
sl := l.Start(scheduler.LifecycleStartOptions{})
time.Sleep(1 * time.Second)
// Tell the scheduler to start closing.
sl.Close()
// Wait for all managed routines to finish.
<-sl.Done()
Schedulers terminate when all of their children exit.
You can choose from three canned error behaviors for most lifecycles:
ErrorBehaviorDrop, ErrorBehaviorCollect, and ErrorBehaviorTerminate.
ErrorBehaviorDrop ignores errors, allowing the lifecycle to continue executing
normally. ErrorBehaviorCollect stores all errors returned (potentially allowing
for unbounded memory growth, so use with discretion) and provides them when the
lifecycle completes. ErrorBehaviorTerminate causes the lifecycle to close as
soon as it receives an error. You may implement your own error behaviors by
conforming to the ErrorBehavior interface.
If you have a few lifecycles that are parameterized differently and you want to
manage them together, the Parent lifecycle aggregates them and runs them in
parallel.
This package also provides a more sophisticated lifecycle, Segment. A Segment
provides a worker pool and a mechanism for dispatching work. Dispatchers
implement the Descriptor interface and work items implement the Process
interface. The example above could equivalently be written as follows:
proc := scheduler.ProcessFunc(func(ctx context.Context) error {
fmt.Println("Mmm... pie.")
return nil
})
l := scheduler.NewSegment(1, []scheduler.Descriptor{
scheduler.NewIntervalDescriptor(100*time.Millisecond, proc),
})
// Start, close, and wait on the lifecycle as before.
Descriptors are particularly useful when asynchronously waiting on events from
external APIs for processing.
*/
package scheduler
Loading

0 comments on commit db6c5f1

Please sign in to comment.