Skip to content

Commit

Permalink
fix broken context cancelation
Browse files Browse the repository at this point in the history
  • Loading branch information
richardartoul committed May 29, 2023
1 parent d01dce2 commit d2520bc
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
32 changes: 31 additions & 1 deletion virtual/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,14 @@ func (r *environment) invokeActorStreamHelper(
invokeCtx, cc = context.WithTimeout(ctx, retryPolicy.PerAttemptTimeout)
}
resp, selectedReferences, err := r.invokeReferences(invokeCtx, vs, references, operation, payload, create)
cc()
if err != nil {
// If there was an error we can cancel the per-attempt context immediately.
cc()
} else {
// However, if there was no error then we need to ensure that the cc() function
// will eventually be called, but not until the caller is done reading the stream.
resp = newCtxReaderCloser(cc, resp)
}

// If there is no error or the error is due to server blacklisting, consider the invocation successful.
// Return the response and error to exit the retry loop.
Expand Down Expand Up @@ -1150,3 +1157,26 @@ func filterReferences(

return filtered
}

// ctxReaderCloser wraps a io.ReaderCloser and a context such that they're both
// finalized together to avoid leaking any resources.
type ctxReaderCloser struct {
ctxCC func()
r io.ReadCloser
}

func newCtxReaderCloser(ctxCC func(), r io.ReadCloser) io.ReadCloser {
return &ctxReaderCloser{
ctxCC: ctxCC,
r: r,
}
}

func (c *ctxReaderCloser) Read(p []byte) (int, error) {
return c.r.Read(p)
}

func (c *ctxReaderCloser) Close() error {
defer c.ctxCC()
return c.r.Close()
}
4 changes: 4 additions & 0 deletions virtual/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,10 @@ func (ta *testActor) Invoke(
operation string,
payload []byte,
) ([]byte, error) {
if err := ctx.Err(); err != nil {
return nil, ctx.Err()
}

defer func() { ta.numInvocations++ }()

switch operation {
Expand Down

0 comments on commit d2520bc

Please sign in to comment.