-
Notifications
You must be signed in to change notification settings - Fork 166
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into m4ksio/fix-encoidng-user-input-in-failed-t…
…ransaction-message
- Loading branch information
Showing
9 changed files
with
677 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package irrecoverable | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"log" | ||
"os" | ||
"runtime" | ||
) | ||
|
||
// Signaler sends the error out | ||
type Signaler struct { | ||
errors chan<- error | ||
} | ||
|
||
func NewSignaler(errors chan<- error) *Signaler { | ||
return &Signaler{errors} | ||
} | ||
|
||
// Throw is a narrow drop-in replacement for panic, log.Fatal, log.Panic, etc | ||
// anywhere there's something connected to the error channel | ||
func (e *Signaler) Throw(err error) { | ||
defer func() { | ||
// If the error channel was already closed by a concurrent call to Throw, the call | ||
// to close below will panic. We simply log the unhandled irrecoverable for now. | ||
if r := recover(); r != nil { | ||
log.New(os.Stderr, "", log.LstdFlags).Println(fmt.Errorf("unhandled irrecoverable: %w", err)) | ||
} | ||
runtime.Goexit() | ||
}() | ||
e.errors <- err | ||
close(e.errors) | ||
} | ||
|
||
// We define a constrained interface to provide a drop-in replacement for context.Context | ||
// including in interfaces that compose it. | ||
type SignalerContext interface { | ||
context.Context | ||
Throw(err error) // delegates to the signaler | ||
sealed() // private, to constrain builder to using WithSignaler | ||
} | ||
|
||
// private, to force context derivation / WithSignaler | ||
type signalerCtxt struct { | ||
context.Context | ||
signaler *Signaler | ||
} | ||
|
||
func (sc signalerCtxt) sealed() {} | ||
|
||
// Drop-in replacement for panic, log.Fatal, log.Panic, etc | ||
// to use when we are able to get an SignalerContext and thread it down in the component | ||
func (sc signalerCtxt) Throw(err error) { | ||
sc.signaler.Throw(err) | ||
} | ||
|
||
// the One True Way of getting a SignalerContext | ||
func WithSignaler(ctx context.Context, sig *Signaler) SignalerContext { | ||
return signalerCtxt{ctx, sig} | ||
} | ||
|
||
// If we have an SignalerContext, we can directly ctx.Throw. | ||
// | ||
// But a lot of library methods expect context.Context, & we want to pass the same w/o boilerplate | ||
// Moreover, we could have built with: context.WithCancel(irrecoverable.WithSignaler(ctx, sig)), | ||
// "downcasting" to context.Context. Yet, we can still type-assert and recover. | ||
// | ||
// Throw can be a drop-in replacement anywhere we have a context.Context likely | ||
// to support Irrecoverables. Note: this is not a method | ||
func Throw(ctx context.Context, err error) { | ||
signalerAbleContext, ok := ctx.(SignalerContext) | ||
if ok { | ||
signalerAbleContext.Throw(err) | ||
} | ||
// Be spectacular on how this does not -but should- handle irrecoverables: | ||
log.Fatalf("irrecoverable error signaler not found for context, please implement! Unhandled irrecoverable error %v", err) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
package irrecoverable_test | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/onflow/flow-go/module" | ||
"github.com/onflow/flow-go/module/irrecoverable" | ||
) | ||
|
||
var ErrTriggerRestart = errors.New("restart me") | ||
var ErrNoRestart = errors.New("fatal, no restarts") | ||
|
||
func Example() { | ||
// a context is mandatory in order to call RunComponent | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
|
||
// module.ComponentFactory encapsulates all of the component building logic | ||
// required before running Start() | ||
starts := 0 | ||
componentFactory := func() (module.Component, error) { | ||
starts++ | ||
return NewExampleComponent(starts), nil | ||
} | ||
|
||
// this is the place to inspect the encountered error and implement the appropriate error | ||
// handling behaviors, e.g. restarting the component, firing an alert to pagerduty, etc ... | ||
// the shutdown of the component is handled for you by RunComponent, but you may consider | ||
// performing additional cleanup here | ||
onError := func(err error, triggerRestart func()) { | ||
// check the error type to decide whether to restart or shutdown | ||
if errors.Is(err, ErrTriggerRestart) { | ||
fmt.Printf("Restarting component after fatal error: %v\n", err) | ||
triggerRestart() | ||
return | ||
} else { | ||
fmt.Printf("An unrecoverable error occurred: %v\n", err) | ||
// shutdown other components. it might also make sense to just panic here | ||
// depending on the circumstances | ||
cancel() | ||
} | ||
|
||
} | ||
|
||
// run the component. this is a blocking call, and will return with an error if the | ||
// first startup or any subsequent restart attempts fails or the context is canceled | ||
err := module.RunComponent(ctx, componentFactory, onError) | ||
if err != nil { | ||
fmt.Printf("Error returned from RunComponent: %v\n", err) | ||
} | ||
|
||
// Output: | ||
// [Component 1] Starting up | ||
// [Component 1] Shutting down | ||
// Restarting component after fatal error: restart me | ||
// [Component 2] Starting up | ||
// [Component 2] Shutting down | ||
// An unrecoverable error occurred: fatal, no restarts | ||
// Error returned from RunComponent: context canceled | ||
} | ||
|
||
// ExampleComponent is an example of a typical component | ||
type ExampleComponent struct { | ||
id int | ||
started chan struct{} | ||
ready sync.WaitGroup | ||
done sync.WaitGroup | ||
} | ||
|
||
func NewExampleComponent(id int) *ExampleComponent { | ||
return &ExampleComponent{ | ||
id: id, | ||
started: make(chan struct{}), | ||
} | ||
} | ||
|
||
// start the component and register its shutdown handler | ||
// this component will throw an error after 20ms to demonstrate the error handling | ||
func (c *ExampleComponent) Start(ctx irrecoverable.SignalerContext) { | ||
c.printMsg("Starting up") | ||
|
||
// do some setup... | ||
|
||
c.ready.Add(2) | ||
c.done.Add(2) | ||
|
||
go func() { | ||
c.ready.Done() | ||
defer c.done.Done() | ||
|
||
<-ctx.Done() | ||
|
||
c.printMsg("Shutting down") | ||
// do some cleanup... | ||
}() | ||
|
||
go func() { | ||
c.ready.Done() | ||
defer c.done.Done() | ||
|
||
select { | ||
case <-time.After(20 * time.Millisecond): | ||
// encounter irrecoverable error | ||
if c.id > 1 { | ||
ctx.Throw(ErrNoRestart) | ||
} else { | ||
ctx.Throw(ErrTriggerRestart) | ||
} | ||
case <-ctx.Done(): | ||
c.printMsg("Cancelled by parent") | ||
} | ||
}() | ||
|
||
close(c.started) | ||
} | ||
|
||
// simply return the Started channel | ||
// all startup processing is done in Start() | ||
func (c *ExampleComponent) Ready() <-chan struct{} { | ||
ready := make(chan struct{}) | ||
go func() { | ||
<-c.started | ||
c.ready.Wait() | ||
close(ready) | ||
}() | ||
return ready | ||
} | ||
|
||
// simply return the Stopped channel | ||
// all shutdown processing is done in shutdownOnCancel() | ||
func (c *ExampleComponent) Done() <-chan struct{} { | ||
done := make(chan struct{}) | ||
go func() { | ||
<-c.started | ||
c.done.Wait() | ||
close(done) | ||
}() | ||
return done | ||
} | ||
|
||
func (c *ExampleComponent) printMsg(msg string) { | ||
fmt.Printf("[Component %d] %s\n", c.id, msg) | ||
} |
Oops, something went wrong.