Skip to content

Commit

Permalink
Fixed a case where there might be a deadlock on an error if there is …
Browse files Browse the repository at this point in the history
…a high back-pressure. (#51)
  • Loading branch information
michalkurzeja authored and nrwiersma committed Jul 19, 2019
1 parent 93c06e5 commit cc1f2fe
Showing 1 changed file with 4 additions and 1 deletion.
5 changes: 4 additions & 1 deletion task.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,15 @@ func (t *streamTask) closeTopology() error {

func (t *streamTask) handleError(err error) {
t.running = false
t.srcPumps.StopAll()

t.errorFn(err)
}

// OnError sets the error handler.
//
// When an error occurrs on the stream, it is safe to assume
// there is deadlock in the system. It is not safe to Close
// the task at this point as it will either hang or panic.
func (t *streamTask) OnError(fn ErrorFunc) {
t.errorFn = fn
}
Expand Down

0 comments on commit cc1f2fe

Please sign in to comment.