diff --git a/pipeline.go b/pipeline.go index 51eb149..1a7489f 100644 --- a/pipeline.go +++ b/pipeline.go @@ -88,6 +88,19 @@ func (p *pipeline) Run() { worker := <-p.components[rep.Component].pool worker <- m } + continue + + default: + } + + select { + case m, ok := <-p.retry: + if ok { + rep := m.Reports.Head().(Report) + worker := <-p.components[rep.Component].pool + worker <- m + } + continue case m, ok := <-p.input: // If input channel has been closed, close output channel