Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define error aware interface #1275

Merged
merged 37 commits into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3f27b74
Define error aware interface
synzhu Sep 10, 2021
a2971d1
define new error base component
synzhu Sep 10, 2021
dbeb249
rename to error manager
synzhu Sep 13, 2021
dcee55a
Merge branch 'master' into smnzhu/error-handling
peterargue Sep 13, 2021
a643f03
implement error handling helper func
synzhu Sep 14, 2021
ef50fb6
Update common.go
synzhu Sep 14, 2021
73e8bdd
Update common.go
synzhu Sep 15, 2021
1bccc1e
Merge branch 'master' into smnzhu/error-handling
synzhu Sep 16, 2021
afb4e6d
[module] Adds basic IrrecoverableHandler, integration into Context and
huitseeker Sep 17, 2021
4c947a1
[module] Apply review comments
huitseeker Sep 20, 2021
e653e6f
tweak handler trigger logic
huitseeker Sep 21, 2021
7f91ef2
rename error handler -> orrecoverable handler
huitseeker Sep 21, 2021
2e8bca3
Merge pull request #1308 from huitseeker/smnzhu/error-handling
huitseeker Sep 21, 2021
46baf6c
Renamings and refactor RunComponent
synzhu Sep 22, 2021
30c652a
undo consensus follower changes
synzhu Sep 22, 2021
bc36dc8
Merge branch 'master' into smnzhu/error-handling
synzhu Sep 22, 2021
248d373
add more comments and rename ThrowError
synzhu Sep 22, 2021
6406d3f
Example for using irrecoverable and RunComponent (#1327)
peterargue Sep 22, 2021
9b8e1d3
Merge branch 'master' into smnzhu/error-handling
peterargue Sep 22, 2021
ad0d7e6
updates from code review
peterargue Sep 24, 2021
011bac6
address comments
synzhu Sep 24, 2021
94c43b3
Merge branch 'master' into smnzhu/error-handling
synzhu Sep 24, 2021
da298e2
Renaming example file to irrecoverable_example_test.go
peterargue Sep 24, 2021
50e92c7
add comments and update semantics of Start
synzhu Sep 24, 2021
5930a6e
handle edge cases
synzhu Sep 24, 2021
b51ed23
update example
synzhu Sep 24, 2021
0e7708f
added empty tests
synzhu Sep 24, 2021
8ffda55
fix lint error
synzhu Sep 24, 2021
6f0b657
add run_component_test
synzhu Sep 25, 2021
2de33dd
Update irrecoverable_example_test.go
synzhu Sep 25, 2021
d81f072
Merge branch 'master' into smnzhu/error-handling
synzhu Sep 26, 2021
c0552e2
Update run_component_test.go
synzhu Sep 27, 2021
96dd381
Merge branch 'master' into smnzhu/error-handling
synzhu Sep 27, 2021
43f9d6a
generate mocks
synzhu Sep 27, 2021
d2ad181
update logger
synzhu Sep 27, 2021
38dfa8a
add comment explaining recover
synzhu Sep 27, 2021
0bcea39
added new synchronous startup error test
synzhu Sep 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
113 changes: 110 additions & 3 deletions module/common.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
package module

import (
"context"

"github.com/onflow/flow-go/module/irrecoverable"
)

// WARNING: The semantics of this interface will be changing in the near future, with
// startup / shutdown capabilities being delegated to the Startable interface instead.
// For more details, see [FLIP 1167](https://github.com/onflow/flow-go/pull/1167)
//
// ReadyDoneAware provides an easy interface to wait for module startup and shutdown.
// Modules that implement this interface only support a single start-stop cycle, and
// will not restart if Ready() is called again after shutdown has already commenced.
type ReadyDoneAware interface {
// Ready commences startup of the module, and returns a ready channel that is closed once
// startup has completed.
// startup has completed. Note that the ready channel may never close if errors are
// encountered during startup.
// If shutdown has already commenced before this method is called for the first time,
// startup will not be performed and the returned channel will never close.
// startup will not be performed and the returned channel will also never close.
// This should be an idempotent method.
Ready() <-chan struct{}

// Done commences shutdown of the module, and returns a done channel that is closed once
// shutdown has completed.
// shutdown has completed. Note that the done channel should be closed even if errors are
// encountered during shutdown.
// This should be an idempotent method.
Done() <-chan struct{}
}
Expand All @@ -30,3 +42,98 @@ func (n *NoopReadDoneAware) Done() <-chan struct{} {
defer close(done)
return done
}

// Startable provides an interface to start a component. Once started, the component
// can be stopped by cancelling the given context.
type Startable interface {
Start(irrecoverable.SignalerContext)
}

type Component interface {
Startable
ReadyDoneAware
}

type ComponentFactory func() (Component, error)

// OnError reacts to an irrecoverable error
// It is meant to inspect the error, determining its type and seeing if e.g. a restart or some other measure is suitable,
// and optionally trigger the continuation provided by the caller (RunComponent), which defines what "a restart" means.
// Instead of restarting the component, it could also:
// - panic (in canary / benchmark)
// - log in various Error channels and / or send telemetry ...
type OnError = func(err error, triggerRestart func())

func RunComponent(ctx context.Context, componentFactory ComponentFactory, handler OnError) error {
// reference to per-run signals for the component
var component Component
var cancel context.CancelFunc
var done <-chan struct{}
var irrecoverables chan error

start := func() (err error) {
component, err = componentFactory()
if err != nil {
return // failure to generate the component, should be handled out-of-band because a restart won't help
}

// context used to run the component
var runCtx context.Context
runCtx, cancel = context.WithCancel(ctx)

// signaler used for irrecoverables
var signalingCtx irrecoverable.SignalerContext
irrecoverables = make(chan error)
signalingCtx = irrecoverable.WithSignaler(runCtx, irrecoverable.NewSignaler(irrecoverables))

// the component must be started in a separate goroutine in case an irrecoverable error
// is thrown during the call to Start, which terminates the calling goroutine
go component.Start(signalingCtx)

done = component.Done()
return
}

shutdownAndWaitForRestart := func(err error) error {
// shutdown the component
cancel()

// wait until it's done
// note that irrecoverables which are encountered during shutdown are ignored
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
}

// send error to the handler programmed with a restart continuation
restartChan := make(chan struct{})
go handler(err, func() {
close(restartChan)
})

// wait for handler to trigger restart or abort
select {
case <-ctx.Done():
return ctx.Err()
case <-restartChan:
}

return nil
}

for {
if err := start(); err != nil {
return err // failure to start
}

select {
case <-ctx.Done():
return ctx.Err()
case err := <-irrecoverables:
if canceled := shutdownAndWaitForRestart(err); canceled != nil {
return canceled
}
}
}
}
77 changes: 77 additions & 0 deletions module/irrecoverable/irrecoverable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package irrecoverable

import (
"context"
"fmt"
"log"
"os"
"runtime"
)

// Signaler sends the error out
type Signaler struct {
errors chan<- error
}

func NewSignaler(errors chan<- error) *Signaler {
return &Signaler{errors}
}

// Throw is a narrow drop-in replacement for panic, log.Fatal, log.Panic, etc
// anywhere there's something connected to the error channel
func (e *Signaler) Throw(err error) {
defer func() {
// If the error channel was already closed by a concurrent call to Throw, the call
// to close below will panic. We simply log the unhandled irrecoverable for now.
if r := recover(); r != nil {
log.New(os.Stderr, "", log.LstdFlags).Println(fmt.Errorf("unhandled irrecoverable: %w", err))
}
runtime.Goexit()
}()
e.errors <- err
close(e.errors)
}

// We define a constrained interface to provide a drop-in replacement for context.Context
// including in interfaces that compose it.
type SignalerContext interface {
context.Context
Throw(err error) // delegates to the signaler
sealed() // private, to constrain builder to using WithSignaler
}

// private, to force context derivation / WithSignaler
type signalerCtxt struct {
context.Context
signaler *Signaler
}

func (sc signalerCtxt) sealed() {}

// Drop-in replacement for panic, log.Fatal, log.Panic, etc
// to use when we are able to get an SignalerContext and thread it down in the component
func (sc signalerCtxt) Throw(err error) {
sc.signaler.Throw(err)
}

// the One True Way of getting a SignalerContext
func WithSignaler(ctx context.Context, sig *Signaler) SignalerContext {
return signalerCtxt{ctx, sig}
}

// If we have an SignalerContext, we can directly ctx.Throw.
//
// But a lot of library methods expect context.Context, & we want to pass the same w/o boilerplate
// Moreover, we could have built with: context.WithCancel(irrecoverable.WithSignaler(ctx, sig)),
// "downcasting" to context.Context. Yet, we can still type-assert and recover.
//
// Throw can be a drop-in replacement anywhere we have a context.Context likely
// to support Irrecoverables. Note: this is not a method
func Throw(ctx context.Context, err error) {
signalerAbleContext, ok := ctx.(SignalerContext)
if ok {
signalerAbleContext.Throw(err)
}
// Be spectacular on how this does not -but should- handle irrecoverables:
log.Fatalf("irrecoverable error signaler not found for context, please implement! Unhandled irrecoverable error %v", err)
}
146 changes: 146 additions & 0 deletions module/irrecoverable/irrecoverable_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package irrecoverable_test

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/irrecoverable"
)

var ErrTriggerRestart = errors.New("restart me")
var ErrNoRestart = errors.New("fatal, no restarts")

func Example() {
// a context is mandatory in order to call RunComponent
ctx, cancel := context.WithCancel(context.Background())

// module.ComponentFactory encapsulates all of the component building logic
// required before running Start()
starts := 0
componentFactory := func() (module.Component, error) {
starts++
return NewExampleComponent(starts), nil
}

// this is the place to inspect the encountered error and implement the appropriate error
// handling behaviors, e.g. restarting the component, firing an alert to pagerduty, etc ...
// the shutdown of the component is handled for you by RunComponent, but you may consider
// performing additional cleanup here
onError := func(err error, triggerRestart func()) {
// check the error type to decide whether to restart or shutdown
if errors.Is(err, ErrTriggerRestart) {
fmt.Printf("Restarting component after fatal error: %v\n", err)
triggerRestart()
return
} else {
fmt.Printf("An unrecoverable error occurred: %v\n", err)
// shutdown other components. it might also make sense to just panic here
// depending on the circumstances
cancel()
}

}

// run the component. this is a blocking call, and will return with an error if the
// first startup or any subsequent restart attempts fails or the context is canceled
err := module.RunComponent(ctx, componentFactory, onError)
if err != nil {
fmt.Printf("Error returned from RunComponent: %v\n", err)
}

// Output:
// [Component 1] Starting up
// [Component 1] Shutting down
// Restarting component after fatal error: restart me
// [Component 2] Starting up
// [Component 2] Shutting down
// An unrecoverable error occurred: fatal, no restarts
// Error returned from RunComponent: context canceled
}

// ExampleComponent is an example of a typical component
type ExampleComponent struct {
id int
started chan struct{}
ready sync.WaitGroup
done sync.WaitGroup
}

func NewExampleComponent(id int) *ExampleComponent {
return &ExampleComponent{
id: id,
started: make(chan struct{}),
}
}

// start the component and register its shutdown handler
// this component will throw an error after 20ms to demonstrate the error handling
func (c *ExampleComponent) Start(ctx irrecoverable.SignalerContext) {
c.printMsg("Starting up")

// do some setup...

c.ready.Add(2)
c.done.Add(2)

go func() {
c.ready.Done()
defer c.done.Done()

<-ctx.Done()

c.printMsg("Shutting down")
// do some cleanup...
}()

go func() {
c.ready.Done()
defer c.done.Done()

select {
case <-time.After(20 * time.Millisecond):
// encounter irrecoverable error
if c.id > 1 {
ctx.Throw(ErrNoRestart)
} else {
ctx.Throw(ErrTriggerRestart)
}
case <-ctx.Done():
c.printMsg("Cancelled by parent")
}
}()

close(c.started)
}

// simply return the Started channel
// all startup processing is done in Start()
func (c *ExampleComponent) Ready() <-chan struct{} {
ready := make(chan struct{})
go func() {
<-c.started
c.ready.Wait()
close(ready)
}()
return ready
}

// simply return the Stopped channel
// all shutdown processing is done in shutdownOnCancel()
func (c *ExampleComponent) Done() <-chan struct{} {
done := make(chan struct{})
go func() {
<-c.started
c.done.Wait()
close(done)
}()
return done
}

func (c *ExampleComponent) printMsg(msg string) {
fmt.Printf("[Component %d] %s\n", c.id, msg)
}