From aa7e1281bfdc74a56fb52f140d3f00862b6176e6 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 29 Feb 2024 21:16:28 -0600 Subject: [PATCH 1/2] fix memory leak of job cancellation contexts When remote job cancellation was added, a new cancellable context was allocated within the producer before the executor is spawned. The cancel func here was only called if the job was actually cancelled remotely or via a parent context cancellation, meaning we would slowly leak memory for every job worked that wasn't cancelled. Thank you @brandur for pinpointing the issue. Fixes #239. Co-Authored-By: Brandur Leach --- CHANGELOG.md | 4 ++++ job_executor.go | 3 +++ job_executor_test.go | 6 ++++++ producer.go | 1 + 4 files changed, 14 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb5ad957..9701e3eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fixed a memory leak caused by not always cancelling the context used to enable jobs to be cancelled remotely. [PR #243](https://github.com/riverqueue/river/pull/243). + ## [0.0.23] - 2024-02-29 ### Added diff --git a/job_executor.go b/job_executor.go index cbd3524d..d8be5213 100644 --- a/job_executor.go +++ b/job_executor.go @@ -138,6 +138,9 @@ func (e *jobExecutor) Cancel() { } func (e *jobExecutor) Execute(ctx context.Context) { + // Ensure that the context is cancelled no matter what, or it will leak: + defer e.CancelFunc(nil) + e.start = e.TimeNowUTC() e.stats = &jobstats.JobStatistics{ QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt), diff --git a/job_executor_test.go b/job_executor_test.go index 5cf892ae..cdf19eac 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -174,7 +174,12 @@ func TestJobExecutor_Execute(t *testing.T) { jobRow: job, } + // allocate this context just so we can set the CancelFunc: + _, cancel := context.WithCancelCause(ctx) + t.Cleanup(func() { cancel(nil) }) + executor := baseservice.Init(archetype, &jobExecutor{ + CancelFunc: cancel, ClientRetryPolicy: &retryPolicyNoJitter{}, Completer: bundle.completer, ErrorHandler: bundle.errorHandler, @@ -640,6 +645,7 @@ func TestJobExecutor_Execute(t *testing.T) { workCtx, cancelFunc := context.WithCancelCause(ctx) executor.CancelFunc = cancelFunc + t.Cleanup(func() { cancelFunc(nil) }) executor.Execute(workCtx) executor.Completer.Wait() diff --git a/producer.go b/producer.go index a73b557e..a5e4db46 100644 --- a/producer.go +++ b/producer.go @@ -358,6 +358,7 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype. workUnit = workInfo.workUnitFactory.MakeUnit(job) } + // jobCancel will always be called by the executor to prevent leaks. jobCtx, jobCancel := context.WithCancelCause(workCtx) executor := baseservice.Init(&p.Archetype, &jobExecutor{ From 609ff4b3c7df50dff76b4403705705afec5e245b Mon Sep 17 00:00:00 2001 From: Brandur Date: Thu, 29 Feb 2024 20:16:37 -0800 Subject: [PATCH 2/2] A few amendments to default executor context cancel Builds on the rest of #243 with a few things I'd done in my own version of the fix: * Add a test case that verifies that that the executor's cancel function was called even in the event of no explicit job cancellation. * Add a default context cancellation error that's never user visible, but which can easily be recognized for testing purposes. --- job_executor.go | 6 +++++- job_executor_test.go | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/job_executor.go b/job_executor.go index d8be5213..35aebb5c 100644 --- a/job_executor.go +++ b/job_executor.go @@ -17,6 +17,10 @@ import ( "github.com/riverqueue/river/rivertype" ) +// Error used in CancelFunc in cases where the job was not cancelled for +// purposes of resource cleanup. Should never be user visible. +var errExecutorDefaultCancel = errors.New("context cancelled as executor finished") + // UnknownJobKindError is returned when a Client fetches and attempts to // work a job that has not been registered on the Client's Workers bundle (using // AddWorker). @@ -139,7 +143,7 @@ func (e *jobExecutor) Cancel() { func (e *jobExecutor) Execute(ctx context.Context) { // Ensure that the context is cancelled no matter what, or it will leak: - defer e.CancelFunc(nil) + defer e.CancelFunc(errExecutorDefaultCancel) e.start = e.TimeNowUTC() e.stats = &jobstats.JobStatistics{ diff --git a/job_executor_test.go b/job_executor_test.go index cdf19eac..3f4a4cf3 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -623,6 +623,22 @@ func TestJobExecutor_Execute(t *testing.T) { require.True(t, bundle.errorHandler.HandlePanicCalled) }) + t.Run("CancelFuncCleanedUpEvenWithoutCancel", func(t *testing.T) { + t.Parallel() + + executor, bundle := setup(t) + + executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return nil }, nil).MakeUnit(bundle.jobRow) + + workCtx, cancelFunc := context.WithCancelCause(ctx) + executor.CancelFunc = cancelFunc + + executor.Execute(workCtx) + executor.Completer.Wait() + + require.ErrorIs(t, context.Cause(workCtx), errExecutorDefaultCancel) + }) + runCancelTest := func(t *testing.T, returnErr error) *rivertype.JobRow { //nolint:thelper executor, bundle := setup(t)