Skip to content

Commit

Permalink
[Keystone] Improve workflow engine test times (#12959)
Browse files Browse the repository at this point in the history
* Improve workflow engine test times

* Extract graph traversal initalization logic
  • Loading branch information
HenryNguyen5 committed Apr 29, 2024
1 parent 8cb83a4 commit e482c79
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 124 deletions.
5 changes: 5 additions & 0 deletions .changeset/fast-students-accept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal Optimize workflow engine tests
207 changes: 121 additions & 86 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ type Engine struct {

// Used for testing to wait for an execution to complete
xxxExecutionFinished chan string
// testing lifecycle hook to signal initialization status
afterInit func(success bool)
// Used for testing to control the number of retries
// we'll do when initializing the engine.
maxRetries int
// Used for testing to control the retry interval
// when initializing the engine.
retryMs int
}

func (e *Engine) Start(ctx context.Context) error {
Expand All @@ -60,124 +68,133 @@ func (e *Engine) Start(ctx context.Context) error {
})
}

// init does the following:
//
// 1. Resolves the underlying capability for each trigger
// 2. Registers each step's capability to this workflow
// 3. Registers for trigger events now that all capabilities are resolved
// resolveWorkflowCapabilities does the following:
//
// Steps 1 and 2 are retried every 5 seconds until successful.
func (e *Engine) init(ctx context.Context) {
defer e.wg.Done()

retrySec := 5
ticker := time.NewTicker(time.Duration(retrySec) * time.Second)
defer ticker.Stop()

LOOP:
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
initSuccessful := true
// Resolve the underlying capability for each trigger
for _, t := range e.workflow.triggers {
tg, err := e.registry.GetTrigger(ctx, t.Type)
if err != nil {
initSuccessful = false
e.logger.Errorf("failed to get trigger capability: %s, retrying in %d seconds", err, retrySec)
continue
}
t.trigger = tg
}
if !initSuccessful {
continue
}

// Walk the graph and initialize each step.
// This means:
// - fetching the capability
// - register the capability to this workflow
// - initializing the step's executionStrategy
err := e.workflow.walkDo(keywordTrigger, func(s *step) error {
// The graph contains a dummy step for triggers, but
// we handle triggers separately since there might be more than one.
if s.Ref == keywordTrigger {
return nil
}

err := e.initializeCapability(ctx, s, retrySec)
if err != nil {
return err
}

return e.initializeExecutionStrategy(s)
})
if err != nil {
initSuccessful = false
e.logger.Error(err)
}

if initSuccessful {
break LOOP
}
// 1. Resolves the underlying capability for each trigger
// 2. Registers each step's capability to this workflow
func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
//
// Step 1. Resolve the underlying capability for each trigger
//
triggersInitialized := true
for _, t := range e.workflow.triggers {
tg, err := e.registry.GetTrigger(ctx, t.Type)
if err != nil {
e.logger.Errorf("failed to get trigger capability: %s", err)
// we don't immediately return here, since we want to retry all triggers
// to notify the user of all errors at once.
triggersInitialized = false
} else {
t.trigger = tg
}
}
if !triggersInitialized {
return fmt.Errorf("failed to resolve triggers")
}

// Step 2. Walk the graph and register each step's capability to this workflow
//
// This means:
// - fetching the capability
// - register the capability to this workflow
// - initializing the step's executionStrategy
capabilityRegistrationErr := e.workflow.walkDo(keywordTrigger, func(s *step) error {
// The graph contains a dummy step for triggers, but
// we handle triggers separately since there might be more than one
// trigger registered to a workflow.
if s.Ref == keywordTrigger {
return nil
}

// We have all needed capabilities, now we can register for trigger events
for _, t := range e.workflow.triggers {
err := e.registerTrigger(ctx, t)
err := e.initializeCapability(ctx, s)
if err != nil {
e.logger.Errorf("failed to register trigger: %s", err)
return err
}
}

e.logger.Info("engine initialized")
return e.initializeExecutionStrategy(s)
})

return capabilityRegistrationErr
}

func (e *Engine) initializeCapability(ctx context.Context, s *step, retrySec int) error {
func (e *Engine) initializeCapability(ctx context.Context, s *step) error {
// If the capability already exists, that means we've already registered it
if s.capability != nil {
return nil
}

cp, innerErr := e.registry.Get(ctx, s.Type)
if innerErr != nil {
return fmt.Errorf("failed to get capability with ref %s: %s, retrying in %d seconds", s.Type, innerErr, retrySec)
cp, err := e.registry.Get(ctx, s.Type)
if err != nil {
return fmt.Errorf("failed to get capability with ref %s: %s", s.Type, err)
}

// We only need to configure actions, consensus and targets here, and
// We configure actions, consensus and targets here, and
// they all satisfy the `CallbackCapability` interface
cc, ok := cp.(capabilities.CallbackCapability)
if !ok {
return fmt.Errorf("could not coerce capability %s to CallbackCapability", s.Type)
}

if s.config == nil {
configMap, ierr := values.NewMap(s.Config)
if ierr != nil {
return fmt.Errorf("failed to convert config to values.Map: %s", ierr)
configMap, newMapErr := values.NewMap(s.Config)
if newMapErr != nil {
return fmt.Errorf("failed to convert config to values.Map: %s", newMapErr)
}
s.config = configMap
}

reg := capabilities.RegisterToWorkflowRequest{
registrationRequest := capabilities.RegisterToWorkflowRequest{
Metadata: capabilities.RegistrationMetadata{
WorkflowID: e.workflow.id,
},
Config: s.config,
}

innerErr = cc.RegisterToWorkflow(ctx, reg)
if innerErr != nil {
return fmt.Errorf("failed to register to workflow (%+v): %w", reg, innerErr)
err = cc.RegisterToWorkflow(ctx, registrationRequest)
if err != nil {
return fmt.Errorf("failed to register to workflow (%+v): %w", registrationRequest, err)
}

s.capability = cc
return nil
}

// init does the following:
//
// 1. Resolves the underlying capability for each trigger
// 2. Registers each step's capability to this workflow
// 3. Registers for trigger events now that all capabilities are resolved
//
// Steps 1 and 2 are retried every 5 seconds until successful.
func (e *Engine) init(ctx context.Context) {
defer e.wg.Done()

retryErr := retryable(ctx, e.logger, e.retryMs, e.maxRetries, func() error {
err := e.resolveWorkflowCapabilities(ctx)
if err != nil {
return fmt.Errorf("failed to resolve workflow: %s", err)
}
return nil
})

if retryErr != nil {
e.logger.Errorf("initialization failed: %s", retryErr)
e.afterInit(false)
return
}

e.logger.Debug("capabilities resolved, registering triggers")
for _, t := range e.workflow.triggers {
err := e.registerTrigger(ctx, t)
if err != nil {
e.logger.Errorf("failed to register trigger: %s", err)
}
}

e.logger.Info("engine initialized")
e.afterInit(true)
}

// initializeExecutionStrategy for `step`.
// Broadly speaking, we'll use `immediateExecution` for non-target steps
// and `scheduledExecution` for targets. If we don't have the necessary
Expand Down Expand Up @@ -609,6 +626,7 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability) er

func (e *Engine) Close() error {
return e.StopOnce("Engine", func() error {
e.logger.Info("shutting down engine")
ctx := context.Background()
// To shut down the engine, we'll start by deregistering
// any triggers to ensure no new executions are triggered,
Expand Down Expand Up @@ -668,6 +686,11 @@ type Config struct {
NewWorkerTimeout time.Duration
DONInfo *capabilities.DON
PeerID func() *p2ptypes.PeerID

// For testing purposes only
maxRetries int
retryMs int
afterInit func(success bool)
}

const (
Expand All @@ -689,6 +712,14 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
cfg.NewWorkerTimeout = defaultNewWorkerTimeout
}

if cfg.retryMs == 0 {
cfg.retryMs = 5000
}

if cfg.afterInit == nil {
cfg.afterInit = func(success bool) {}
}

// TODO: validation of the workflow spec
// We'll need to check, among other things:
// - that there are no step `ref` called `trigger` as this is reserved for any triggers
Expand Down Expand Up @@ -718,14 +749,18 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
DON: cfg.DONInfo,
PeerID: cfg.PeerID,
},
executionStates: newInMemoryStore(),
pendingStepRequests: make(chan stepRequest, cfg.QueueSize),
newWorkerCh: newWorkerCh,
stepUpdateCh: make(chan stepState),
triggerEvents: make(chan capabilities.CapabilityResponse),
stopCh: make(chan struct{}),
newWorkerTimeout: cfg.NewWorkerTimeout,
executionStates: newInMemoryStore(),
pendingStepRequests: make(chan stepRequest, cfg.QueueSize),
newWorkerCh: newWorkerCh,
stepUpdateCh: make(chan stepState),
triggerEvents: make(chan capabilities.CapabilityResponse),
stopCh: make(chan struct{}),
newWorkerTimeout: cfg.NewWorkerTimeout,
// For testing purposes only
xxxExecutionFinished: make(chan string),
afterInit: cfg.afterInit,
maxRetries: cfg.maxRetries,
retryMs: cfg.retryMs,
}
return engine, nil
}

0 comments on commit e482c79

Please sign in to comment.