diff --git a/.changeset/fast-students-accept.md b/.changeset/fast-students-accept.md new file mode 100644 index 00000000000..8813f3a7812 --- /dev/null +++ b/.changeset/fast-students-accept.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#internal Optimize workflow engine tests diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 0ecc311acac..040e123567e 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -45,6 +45,14 @@ type Engine struct { // Used for testing to wait for an execution to complete xxxExecutionFinished chan string + // testing lifecycle hook to signal initialization status + afterInit func(success bool) + // Used for testing to control the number of retries + // we'll do when initializing the engine. + maxRetries int + // Used for testing to control the retry interval + // when initializing the engine. + retryMs int } func (e *Engine) Start(ctx context.Context) error { @@ -60,94 +68,67 @@ func (e *Engine) Start(ctx context.Context) error { }) } -// init does the following: -// -// 1. Resolves the underlying capability for each trigger -// 2. Registers each step's capability to this workflow -// 3. Registers for trigger events now that all capabilities are resolved +// resolveWorkflowCapabilities does the following: // -// Steps 1 and 2 are retried every 5 seconds until successful. -func (e *Engine) init(ctx context.Context) { - defer e.wg.Done() - - retrySec := 5 - ticker := time.NewTicker(time.Duration(retrySec) * time.Second) - defer ticker.Stop() - -LOOP: - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - initSuccessful := true - // Resolve the underlying capability for each trigger - for _, t := range e.workflow.triggers { - tg, err := e.registry.GetTrigger(ctx, t.Type) - if err != nil { - initSuccessful = false - e.logger.Errorf("failed to get trigger capability: %s, retrying in %d seconds", err, retrySec) - continue - } - t.trigger = tg - } - if !initSuccessful { - continue - } - - // Walk the graph and initialize each step. - // This means: - // - fetching the capability - // - register the capability to this workflow - // - initializing the step's executionStrategy - err := e.workflow.walkDo(keywordTrigger, func(s *step) error { - // The graph contains a dummy step for triggers, but - // we handle triggers separately since there might be more than one. - if s.Ref == keywordTrigger { - return nil - } - - err := e.initializeCapability(ctx, s, retrySec) - if err != nil { - return err - } - - return e.initializeExecutionStrategy(s) - }) - if err != nil { - initSuccessful = false - e.logger.Error(err) - } - - if initSuccessful { - break LOOP - } +// 1. Resolves the underlying capability for each trigger +// 2. Registers each step's capability to this workflow +func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error { + // + // Step 1. Resolve the underlying capability for each trigger + // + triggersInitialized := true + for _, t := range e.workflow.triggers { + tg, err := e.registry.GetTrigger(ctx, t.Type) + if err != nil { + e.logger.Errorf("failed to get trigger capability: %s", err) + // we don't immediately return here, since we want to retry all triggers + // to notify the user of all errors at once. + triggersInitialized = false + } else { + t.trigger = tg } } + if !triggersInitialized { + return fmt.Errorf("failed to resolve triggers") + } + + // Step 2. Walk the graph and register each step's capability to this workflow + // + // This means: + // - fetching the capability + // - register the capability to this workflow + // - initializing the step's executionStrategy + capabilityRegistrationErr := e.workflow.walkDo(keywordTrigger, func(s *step) error { + // The graph contains a dummy step for triggers, but + // we handle triggers separately since there might be more than one + // trigger registered to a workflow. + if s.Ref == keywordTrigger { + return nil + } - // We have all needed capabilities, now we can register for trigger events - for _, t := range e.workflow.triggers { - err := e.registerTrigger(ctx, t) + err := e.initializeCapability(ctx, s) if err != nil { - e.logger.Errorf("failed to register trigger: %s", err) + return err } - } - e.logger.Info("engine initialized") + return e.initializeExecutionStrategy(s) + }) + + return capabilityRegistrationErr } -func (e *Engine) initializeCapability(ctx context.Context, s *step, retrySec int) error { +func (e *Engine) initializeCapability(ctx context.Context, s *step) error { // If the capability already exists, that means we've already registered it if s.capability != nil { return nil } - cp, innerErr := e.registry.Get(ctx, s.Type) - if innerErr != nil { - return fmt.Errorf("failed to get capability with ref %s: %s, retrying in %d seconds", s.Type, innerErr, retrySec) + cp, err := e.registry.Get(ctx, s.Type) + if err != nil { + return fmt.Errorf("failed to get capability with ref %s: %s", s.Type, err) } - // We only need to configure actions, consensus and targets here, and + // We configure actions, consensus and targets here, and // they all satisfy the `CallbackCapability` interface cc, ok := cp.(capabilities.CallbackCapability) if !ok { @@ -155,29 +136,65 @@ func (e *Engine) initializeCapability(ctx context.Context, s *step, retrySec int } if s.config == nil { - configMap, ierr := values.NewMap(s.Config) - if ierr != nil { - return fmt.Errorf("failed to convert config to values.Map: %s", ierr) + configMap, newMapErr := values.NewMap(s.Config) + if newMapErr != nil { + return fmt.Errorf("failed to convert config to values.Map: %s", newMapErr) } s.config = configMap } - reg := capabilities.RegisterToWorkflowRequest{ + registrationRequest := capabilities.RegisterToWorkflowRequest{ Metadata: capabilities.RegistrationMetadata{ WorkflowID: e.workflow.id, }, Config: s.config, } - innerErr = cc.RegisterToWorkflow(ctx, reg) - if innerErr != nil { - return fmt.Errorf("failed to register to workflow (%+v): %w", reg, innerErr) + err = cc.RegisterToWorkflow(ctx, registrationRequest) + if err != nil { + return fmt.Errorf("failed to register to workflow (%+v): %w", registrationRequest, err) } s.capability = cc return nil } +// init does the following: +// +// 1. Resolves the underlying capability for each trigger +// 2. Registers each step's capability to this workflow +// 3. Registers for trigger events now that all capabilities are resolved +// +// Steps 1 and 2 are retried every 5 seconds until successful. +func (e *Engine) init(ctx context.Context) { + defer e.wg.Done() + + retryErr := retryable(ctx, e.logger, e.retryMs, e.maxRetries, func() error { + err := e.resolveWorkflowCapabilities(ctx) + if err != nil { + return fmt.Errorf("failed to resolve workflow: %s", err) + } + return nil + }) + + if retryErr != nil { + e.logger.Errorf("initialization failed: %s", retryErr) + e.afterInit(false) + return + } + + e.logger.Debug("capabilities resolved, registering triggers") + for _, t := range e.workflow.triggers { + err := e.registerTrigger(ctx, t) + if err != nil { + e.logger.Errorf("failed to register trigger: %s", err) + } + } + + e.logger.Info("engine initialized") + e.afterInit(true) +} + // initializeExecutionStrategy for `step`. // Broadly speaking, we'll use `immediateExecution` for non-target steps // and `scheduledExecution` for targets. If we don't have the necessary @@ -609,6 +626,7 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability) er func (e *Engine) Close() error { return e.StopOnce("Engine", func() error { + e.logger.Info("shutting down engine") ctx := context.Background() // To shut down the engine, we'll start by deregistering // any triggers to ensure no new executions are triggered, @@ -668,6 +686,11 @@ type Config struct { NewWorkerTimeout time.Duration DONInfo *capabilities.DON PeerID func() *p2ptypes.PeerID + + // For testing purposes only + maxRetries int + retryMs int + afterInit func(success bool) } const ( @@ -689,6 +712,14 @@ func NewEngine(cfg Config) (engine *Engine, err error) { cfg.NewWorkerTimeout = defaultNewWorkerTimeout } + if cfg.retryMs == 0 { + cfg.retryMs = 5000 + } + + if cfg.afterInit == nil { + cfg.afterInit = func(success bool) {} + } + // TODO: validation of the workflow spec // We'll need to check, among other things: // - that there are no step `ref` called `trigger` as this is reserved for any triggers @@ -718,14 +749,18 @@ func NewEngine(cfg Config) (engine *Engine, err error) { DON: cfg.DONInfo, PeerID: cfg.PeerID, }, - executionStates: newInMemoryStore(), - pendingStepRequests: make(chan stepRequest, cfg.QueueSize), - newWorkerCh: newWorkerCh, - stepUpdateCh: make(chan stepState), - triggerEvents: make(chan capabilities.CapabilityResponse), - stopCh: make(chan struct{}), - newWorkerTimeout: cfg.NewWorkerTimeout, + executionStates: newInMemoryStore(), + pendingStepRequests: make(chan stepRequest, cfg.QueueSize), + newWorkerCh: newWorkerCh, + stepUpdateCh: make(chan stepState), + triggerEvents: make(chan capabilities.CapabilityResponse), + stopCh: make(chan struct{}), + newWorkerTimeout: cfg.NewWorkerTimeout, + // For testing purposes only xxxExecutionFinished: make(chan string), + afterInit: cfg.afterInit, + maxRetries: cfg.maxRetries, + retryMs: cfg.retryMs, } return engine, nil } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index d82c9d4b7d2..4821b5800c7 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -65,6 +65,44 @@ targets: abi: "receive(report bytes)" ` +// newTestEngine creates a new engine with some test defaults. +func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string) (eng *Engine, initFailed chan struct{}) { + peerID := p2ptypes.PeerID{} + initFailed = make(chan struct{}) + cfg := Config{ + Lggr: logger.TestLogger(t), + Registry: reg, + Spec: spec, + DONInfo: nil, + PeerID: func() *p2ptypes.PeerID { return &peerID }, + maxRetries: 1, + retryMs: 100, + afterInit: func(success bool) { + if !success { + close(initFailed) + } + }, + } + eng, err := NewEngine(cfg) + require.NoError(t, err) + return eng, initFailed +} + +// getExecutionId returns the execution id of the workflow that is +// currently being executed by the engine. +// +// If the engine fails to initialize, the test will fail rather +// than blocking indefinitely. +func getExecutionId(t *testing.T, eng *Engine, initFailed <-chan struct{}) string { + var eid string + select { + case <-initFailed: + t.FailNow() + case eid = <-eng.xxxExecutionFinished: + } + return eid +} + type mockCapability struct { capabilities.CapabilityInfo capabilities.CallbackExecutable @@ -148,23 +186,13 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) { ) require.NoError(t, reg.Add(ctx, target2)) - lggr := logger.TestLogger(t) - peerID := p2ptypes.PeerID{} - cfg := Config{ - Lggr: lggr, - Registry: reg, - Spec: hardcodedWorkflow, - DONInfo: nil, - PeerID: func() *p2ptypes.PeerID { return &peerID }, - } - eng, err := NewEngine(cfg) - require.NoError(t, err) + eng, initFailed := newTestEngine(t, reg, hardcodedWorkflow) - err = eng.Start(ctx) + err := eng.Start(ctx) require.NoError(t, err) defer eng.Close() - eid := <-eng.xxxExecutionFinished + eid := getExecutionId(t, eng, initFailed) assert.Equal(t, cr, <-target1.response) assert.Equal(t, cr, <-target2.response) @@ -312,22 +340,13 @@ func TestEngine_ErrorsTheWorkflowIfAStepErrors(t *testing.T) { require.NoError(t, reg.Add(ctx, mockFailingConsensus())) require.NoError(t, reg.Add(ctx, mockTarget())) - peerID := p2ptypes.PeerID{} - cfg := Config{ - Lggr: logger.TestLogger(t), - Registry: reg, - Spec: simpleWorkflow, - DONInfo: nil, - PeerID: func() *p2ptypes.PeerID { return &peerID }, - } - eng, err := NewEngine(cfg) - require.NoError(t, err) + eng, initFailed := newTestEngine(t, reg, simpleWorkflow) - err = eng.Start(ctx) + err := eng.Start(ctx) require.NoError(t, err) defer eng.Close() - eid := <-eng.xxxExecutionFinished + eid := getExecutionId(t, eng, initFailed) state, err := eng.executionStates.get(ctx, eid) require.NoError(t, err) @@ -420,22 +439,12 @@ func TestEngine_MultiStepDependencies(t *testing.T) { action, out := mockAction() require.NoError(t, reg.Add(ctx, action)) - peerID := p2ptypes.PeerID{} - cfg := Config{ - Lggr: logger.TestLogger(t), - Registry: reg, - Spec: multiStepWorkflow, - DONInfo: nil, - PeerID: func() *p2ptypes.PeerID { return &peerID }, - } - eng, err := NewEngine(cfg) - require.NoError(t, err) - - err = eng.Start(ctx) + eng, initFailed := newTestEngine(t, reg, multiStepWorkflow) + err := eng.Start(ctx) require.NoError(t, err) defer eng.Close() - eid := <-eng.xxxExecutionFinished + eid := getExecutionId(t, eng, initFailed) state, err := eng.executionStates.get(ctx, eid) require.NoError(t, err) diff --git a/core/services/workflows/retry.go b/core/services/workflows/retry.go new file mode 100644 index 00000000000..e3f04353e7f --- /dev/null +++ b/core/services/workflows/retry.go @@ -0,0 +1,53 @@ +package workflows + +import ( + "context" + "fmt" + "time" + + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +// retryable is a helper function that retries a function until it succeeds. +// +// It will retry every `retryMs` milliseconds, up to `maxRetries` times. +// +// If `maxRetries` is 0, it will retry indefinitely. +// +// retryable will return an error in the following conditions: +// - the context is cancelled: the error returned is the context error +// - the retry limit has been hit: the error returned is the last error returned by `fn` +func retryable(ctx context.Context, lggr logger.Logger, retryMs int, maxRetries int, fn func() error) error { + ticker := time.NewTicker(time.Duration(retryMs) * time.Millisecond) + defer ticker.Stop() + + // immediately try once + err := fn() + if err == nil { + return nil + } + retries := 0 + + for { + // if maxRetries is 0, we'll retry indefinitely + if maxRetries > 0 { + if retries >= maxRetries { + lggr.Errorf("%s", err) + return fmt.Errorf("max retries reached, aborting") + } + } + lggr.Errorf("%s, retrying in %.2fs", err, float64(retryMs)/1000) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + err = fn() + if err == nil { + return nil + } + } + + retries++ + } +} diff --git a/core/services/workflows/retryable_test.go b/core/services/workflows/retryable_test.go new file mode 100644 index 00000000000..1a17ac55fae --- /dev/null +++ b/core/services/workflows/retryable_test.go @@ -0,0 +1,113 @@ +package workflows + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +func TestRetryableZeroMaxRetries(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + fn := func() error { + return errors.New("test error") + } + + err := retryable(ctx, logger.NullLogger, 100, 0, fn) + assert.ErrorIs(t, err, context.DeadlineExceeded, "Expected context deadline exceeded error") +} + +func TestRetryableSuccessOnFirstAttempt(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fn := func() error { + return nil + } + + err := retryable(ctx, logger.NullLogger, 100, 3, fn) + require.NoError(t, err, "Expected no error as function succeeds on first attempt") +} + +func TestRetryableSuccessAfterRetries(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + retries := 0 + fn := func() error { + if retries < 2 { + retries++ + return errors.New("test error") + } + return nil + } + + err := retryable(ctx, logger.NullLogger, 100, 5, fn) + assert.NoError(t, err, "Expected no error after successful retry") + assert.Equal(t, 2, retries, "Expected two retries before success") +} + +func TestRetryableErrorOnFirstTryNoRetries(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fn := func() error { + return errors.New("immediate failure") + } + + err := retryable(ctx, logger.NullLogger, 100, 1, fn) + require.Error(t, err, "Expected an error on the first try with no retries allowed") + assert.Equal(t, "max retries reached, aborting", err.Error(), "Expected function to abort after the first try") +} + +func TestRetryableErrorAfterMultipleRetries(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + attempts := 0 + fn := func() error { + attempts++ + return errors.New("persistent error") + } + + maxRetries := 3 + err := retryable(ctx, logger.NullLogger, 100, maxRetries, fn) + require.Error(t, err, "Expected an error after multiple retries") + assert.Equal(t, "max retries reached, aborting", err.Error(), "Expected the max retries reached error message") + assert.Equal(t, maxRetries+1, attempts, "Expected the function to be executed retry + 1 times") +} + +func TestRetryableCancellationHandling(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + + fn := func() error { + return errors.New("test error") + } + + go func() { + time.Sleep(150 * time.Millisecond) + cancel() + }() + + err := retryable(ctx, logger.NullLogger, 100, 5, fn) + assert.ErrorIs(t, err, context.Canceled, "Expected context cancellation error") +}