Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/onflow/flow-go into yurii…
Browse files Browse the repository at this point in the history
…/5891-update-finalize
  • Loading branch information
durkmurder committed Sep 29, 2021
2 parents f7cf0b8 + c40cfbc commit 2476f96
Show file tree
Hide file tree
Showing 9 changed files with 677 additions and 6 deletions.
3 changes: 2 additions & 1 deletion cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit(ctx context.Context) {
p2p.DefaultMaxPubSubMsgSize,
fnb.Metrics.Network,
pingProvider,
fnb.BaseConfig.DNSCacheTTL)
fnb.BaseConfig.DNSCacheTTL,
fnb.BaseConfig.NodeRole)

if err != nil {
return nil, fmt.Errorf("could not generate libp2p node factory: %w", err)
Expand Down
113 changes: 110 additions & 3 deletions module/common.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
package module

import (
"context"

"github.com/onflow/flow-go/module/irrecoverable"
)

// WARNING: The semantics of this interface will be changing in the near future, with
// startup / shutdown capabilities being delegated to the Startable interface instead.
// For more details, see [FLIP 1167](https://github.com/onflow/flow-go/pull/1167)
//
// ReadyDoneAware provides an easy interface to wait for module startup and shutdown.
// Modules that implement this interface only support a single start-stop cycle, and
// will not restart if Ready() is called again after shutdown has already commenced.
type ReadyDoneAware interface {
// Ready commences startup of the module, and returns a ready channel that is closed once
// startup has completed.
// startup has completed. Note that the ready channel may never close if errors are
// encountered during startup.
// If shutdown has already commenced before this method is called for the first time,
// startup will not be performed and the returned channel will never close.
// startup will not be performed and the returned channel will also never close.
// This should be an idempotent method.
Ready() <-chan struct{}

// Done commences shutdown of the module, and returns a done channel that is closed once
// shutdown has completed.
// shutdown has completed. Note that the done channel should be closed even if errors are
// encountered during shutdown.
// This should be an idempotent method.
Done() <-chan struct{}
}
Expand All @@ -30,3 +42,98 @@ func (n *NoopReadDoneAware) Done() <-chan struct{} {
defer close(done)
return done
}

// Startable provides an interface to start a component. Once started, the component
// can be stopped by cancelling the given context.
type Startable interface {
Start(irrecoverable.SignalerContext)
}

type Component interface {
Startable
ReadyDoneAware
}

type ComponentFactory func() (Component, error)

// OnError reacts to an irrecoverable error
// It is meant to inspect the error, determining its type and seeing if e.g. a restart or some other measure is suitable,
// and optionally trigger the continuation provided by the caller (RunComponent), which defines what "a restart" means.
// Instead of restarting the component, it could also:
// - panic (in canary / benchmark)
// - log in various Error channels and / or send telemetry ...
type OnError = func(err error, triggerRestart func())

func RunComponent(ctx context.Context, componentFactory ComponentFactory, handler OnError) error {
// reference to per-run signals for the component
var component Component
var cancel context.CancelFunc
var done <-chan struct{}
var irrecoverables chan error

start := func() (err error) {
component, err = componentFactory()
if err != nil {
return // failure to generate the component, should be handled out-of-band because a restart won't help
}

// context used to run the component
var runCtx context.Context
runCtx, cancel = context.WithCancel(ctx)

// signaler used for irrecoverables
var signalingCtx irrecoverable.SignalerContext
irrecoverables = make(chan error)
signalingCtx = irrecoverable.WithSignaler(runCtx, irrecoverable.NewSignaler(irrecoverables))

// the component must be started in a separate goroutine in case an irrecoverable error
// is thrown during the call to Start, which terminates the calling goroutine
go component.Start(signalingCtx)

done = component.Done()
return
}

shutdownAndWaitForRestart := func(err error) error {
// shutdown the component
cancel()

// wait until it's done
// note that irrecoverables which are encountered during shutdown are ignored
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
}

// send error to the handler programmed with a restart continuation
restartChan := make(chan struct{})
go handler(err, func() {
close(restartChan)
})

// wait for handler to trigger restart or abort
select {
case <-ctx.Done():
return ctx.Err()
case <-restartChan:
}

return nil
}

for {
if err := start(); err != nil {
return err // failure to start
}

select {
case <-ctx.Done():
return ctx.Err()
case err := <-irrecoverables:
if canceled := shutdownAndWaitForRestart(err); canceled != nil {
return canceled
}
}
}
}
77 changes: 77 additions & 0 deletions module/irrecoverable/irrecoverable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package irrecoverable

import (
"context"
"fmt"
"log"
"os"
"runtime"
)

// Signaler sends the error out
type Signaler struct {
errors chan<- error
}

func NewSignaler(errors chan<- error) *Signaler {
return &Signaler{errors}
}

// Throw is a narrow drop-in replacement for panic, log.Fatal, log.Panic, etc
// anywhere there's something connected to the error channel
func (e *Signaler) Throw(err error) {
defer func() {
// If the error channel was already closed by a concurrent call to Throw, the call
// to close below will panic. We simply log the unhandled irrecoverable for now.
if r := recover(); r != nil {
log.New(os.Stderr, "", log.LstdFlags).Println(fmt.Errorf("unhandled irrecoverable: %w", err))
}
runtime.Goexit()
}()
e.errors <- err
close(e.errors)
}

// We define a constrained interface to provide a drop-in replacement for context.Context
// including in interfaces that compose it.
type SignalerContext interface {
context.Context
Throw(err error) // delegates to the signaler
sealed() // private, to constrain builder to using WithSignaler
}

// private, to force context derivation / WithSignaler
type signalerCtxt struct {
context.Context
signaler *Signaler
}

func (sc signalerCtxt) sealed() {}

// Drop-in replacement for panic, log.Fatal, log.Panic, etc
// to use when we are able to get an SignalerContext and thread it down in the component
func (sc signalerCtxt) Throw(err error) {
sc.signaler.Throw(err)
}

// the One True Way of getting a SignalerContext
func WithSignaler(ctx context.Context, sig *Signaler) SignalerContext {
return signalerCtxt{ctx, sig}
}

// If we have an SignalerContext, we can directly ctx.Throw.
//
// But a lot of library methods expect context.Context, & we want to pass the same w/o boilerplate
// Moreover, we could have built with: context.WithCancel(irrecoverable.WithSignaler(ctx, sig)),
// "downcasting" to context.Context. Yet, we can still type-assert and recover.
//
// Throw can be a drop-in replacement anywhere we have a context.Context likely
// to support Irrecoverables. Note: this is not a method
func Throw(ctx context.Context, err error) {
signalerAbleContext, ok := ctx.(SignalerContext)
if ok {
signalerAbleContext.Throw(err)
}
// Be spectacular on how this does not -but should- handle irrecoverables:
log.Fatalf("irrecoverable error signaler not found for context, please implement! Unhandled irrecoverable error %v", err)
}
146 changes: 146 additions & 0 deletions module/irrecoverable/irrecoverable_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package irrecoverable_test

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/irrecoverable"
)

var ErrTriggerRestart = errors.New("restart me")
var ErrNoRestart = errors.New("fatal, no restarts")

func Example() {
// a context is mandatory in order to call RunComponent
ctx, cancel := context.WithCancel(context.Background())

// module.ComponentFactory encapsulates all of the component building logic
// required before running Start()
starts := 0
componentFactory := func() (module.Component, error) {
starts++
return NewExampleComponent(starts), nil
}

// this is the place to inspect the encountered error and implement the appropriate error
// handling behaviors, e.g. restarting the component, firing an alert to pagerduty, etc ...
// the shutdown of the component is handled for you by RunComponent, but you may consider
// performing additional cleanup here
onError := func(err error, triggerRestart func()) {
// check the error type to decide whether to restart or shutdown
if errors.Is(err, ErrTriggerRestart) {
fmt.Printf("Restarting component after fatal error: %v\n", err)
triggerRestart()
return
} else {
fmt.Printf("An unrecoverable error occurred: %v\n", err)
// shutdown other components. it might also make sense to just panic here
// depending on the circumstances
cancel()
}

}

// run the component. this is a blocking call, and will return with an error if the
// first startup or any subsequent restart attempts fails or the context is canceled
err := module.RunComponent(ctx, componentFactory, onError)
if err != nil {
fmt.Printf("Error returned from RunComponent: %v\n", err)
}

// Output:
// [Component 1] Starting up
// [Component 1] Shutting down
// Restarting component after fatal error: restart me
// [Component 2] Starting up
// [Component 2] Shutting down
// An unrecoverable error occurred: fatal, no restarts
// Error returned from RunComponent: context canceled
}

// ExampleComponent is an example of a typical component
type ExampleComponent struct {
id int
started chan struct{}
ready sync.WaitGroup
done sync.WaitGroup
}

func NewExampleComponent(id int) *ExampleComponent {
return &ExampleComponent{
id: id,
started: make(chan struct{}),
}
}

// start the component and register its shutdown handler
// this component will throw an error after 20ms to demonstrate the error handling
func (c *ExampleComponent) Start(ctx irrecoverable.SignalerContext) {
c.printMsg("Starting up")

// do some setup...

c.ready.Add(2)
c.done.Add(2)

go func() {
c.ready.Done()
defer c.done.Done()

<-ctx.Done()

c.printMsg("Shutting down")
// do some cleanup...
}()

go func() {
c.ready.Done()
defer c.done.Done()

select {
case <-time.After(20 * time.Millisecond):
// encounter irrecoverable error
if c.id > 1 {
ctx.Throw(ErrNoRestart)
} else {
ctx.Throw(ErrTriggerRestart)
}
case <-ctx.Done():
c.printMsg("Cancelled by parent")
}
}()

close(c.started)
}

// simply return the Started channel
// all startup processing is done in Start()
func (c *ExampleComponent) Ready() <-chan struct{} {
ready := make(chan struct{})
go func() {
<-c.started
c.ready.Wait()
close(ready)
}()
return ready
}

// simply return the Stopped channel
// all shutdown processing is done in shutdownOnCancel()
func (c *ExampleComponent) Done() <-chan struct{} {
done := make(chan struct{})
go func() {
<-c.started
c.done.Wait()
close(done)
}()
return done
}

func (c *ExampleComponent) printMsg(msg string) {
fmt.Printf("[Component %d] %s\n", c.id, msg)
}

0 comments on commit 2476f96

Please sign in to comment.