Skip to content

Commit

Permalink
Close the test dispatcher when completing (#1117)
Browse files Browse the repository at this point in the history
Currently, a test workflow like this will not ever run the code after the goroutine's sleep (correct),
nor the code in the defer (incorrect, production would run it):
```
func(ctx workflow.Context) error {
  workflow.Go(func(ctx workflow.Context) {
    defer func() { fmt.Println("in defer") }()
    workflow.Sleep(ctx, time.Hour) // wait longer than the workflow lives
    fmt.Println("after sleep")
  })
  workflow.Sleep(ctx, time.Minute) // just to make sure the goroutine starts
  return nil
}
```
The workflow will correctly end, but since the dispatcher was never closed, any not-yet-complete
goroutines would never exit, and we'd leak goroutines.

Semantically this should be a safe change:
- Any post-complete decisions would not be executed or recorded, and this retains that.
- When panicking, anything that would be *recorded* in a defer will not be recorded, so no
  replay-state-aware user code should be affected.  And any code that ignores replay state will
  now execute like it should, where before it would not.

So safe / correct code should be unaffected, leaks should be reduced, and latent mistakes should
now cause errors.  AFAICT - I'm not sure how complete our tests are here :)

There's some room for in-defer code to be semantically incorrect in tests without this fix, (e.g. testing
custom logger/metric impls in defers), though I expect those to be very rare bordering on nonexistent.
But for the most part I expect that people will not notice this change, they'll just have fewer goroutine
leaks during tests (so e.g. https://github.com/uber-go/goleak users will be happy).

---

Prior to this fix, the added test fails with:
```
=== RUN   TestWorkflowUnitTest/Test_StaleGoroutinesAreShutDown
    internal_workflow_test.go:1210: 
        	Error Trace:	internal_workflow_test.go:1210
        	Error:      	deferred func should have been called within 1 second
        	Test:       	TestWorkflowUnitTest/Test_StaleGoroutinesAreShutDown
    internal_workflow_test.go:1216: code after sleep correctly not executed
```
Now it passes with this, which also shows it's not slowing tests down in any meaningful way:
```
=== RUN   TestWorkflowUnitTest/Test_StaleGoroutinesAreShutDown
    internal_workflow_test.go:1210: deferred callback executed after 9.177µs
    internal_workflow_test.go:1217: code after sleep correctly not executed
```
  • Loading branch information
Groxx committed Aug 26, 2021
1 parent 9f2fda0 commit 84c60d8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
37 changes: 37 additions & 0 deletions internal/internal_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,43 @@ func (s *WorkflowUnitTest) Test_WaitGroupWorkflowTest() {
s.Equal(n, total)
}

func (s *WorkflowUnitTest) Test_StaleGoroutinesAreShutDown() {
env := s.NewTestWorkflowEnvironment()
deferred := make(chan struct{})
after := make(chan struct{})
wf := func(ctx Context) error {
Go(ctx, func(ctx Context) {
defer func() { close(deferred) }()
_ = Sleep(ctx, time.Hour) // outlive the workflow
close(after)
})
_ = Sleep(ctx, time.Minute)
return nil
}
env.RegisterWorkflow(wf)

env.ExecuteWorkflow(wf)
s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())

// goroutines are shut down async at the moment, so wait with a timeout.
// give it up to 1s total.

started := time.Now()
maxWait := time.NewTimer(time.Second)
defer maxWait.Stop()
select {
case <-deferred: s.T().Logf("deferred callback executed after %v", time.Now().Sub(started))
case <-maxWait.C: s.Fail("deferred func should have been called within 1 second")
}
// if deferred code has run, this has already occurred-or-not.
// if it timed out waiting for the deferred code, it has waited long enough, and this is mostly a curiosity.
select {
case <-after: s.Fail("code after sleep should not have run")
default: s.T().Log("code after sleep correctly not executed")
}
}

var _ WorkflowInterceptorFactory = (*tracingInterceptorFactory)(nil)

type tracingInterceptorFactory struct {
Expand Down
1 change: 1 addition & 0 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ func (env *testWorkflowEnvironmentImpl) Complete(result []byte, err error) {
env.logger.Debug("Workflow already completed.")
return
}
env.workflowDef.Close()
if _, ok := err.(*CanceledError); ok && env.workflowCancelHandler != nil {
env.workflowCancelHandler()
}
Expand Down

0 comments on commit 84c60d8

Please sign in to comment.