From 36b2cb69ec7da77bbc1271a0ddc09a64d50eb5dd Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 29 Feb 2024 21:16:28 -0600 Subject: [PATCH] fix memory leak of job cancellation contexts When remote job cancellation was added, a new cancellable context was allocated within the producer before the executor is spawned. The cancel func here was only called if the job was actually cancelled remotely or via a parent context cancellation, meaning we would slowly leak memory for every job worked that wasn't cancelled. Thank you @brandur for pinpointing the issue. Fixes #239. Co-Authored-By: Brandur Leach --- CHANGELOG.md | 4 ++++ job_executor.go | 3 +++ job_executor_test.go | 1 + producer.go | 1 + 4 files changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb5ad957..6f795d5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fixed a memory leak caused by not always cancelling the context used to enable jobs to be cancelled remotely. [PR foo](https://github.com/riverqueue/river/pull/TODO). + ## [0.0.23] - 2024-02-29 ### Added diff --git a/job_executor.go b/job_executor.go index cbd3524d..d8be5213 100644 --- a/job_executor.go +++ b/job_executor.go @@ -138,6 +138,9 @@ func (e *jobExecutor) Cancel() { } func (e *jobExecutor) Execute(ctx context.Context) { + // Ensure that the context is cancelled no matter what, or it will leak: + defer e.CancelFunc(nil) + e.start = e.TimeNowUTC() e.stats = &jobstats.JobStatistics{ QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt), diff --git a/job_executor_test.go b/job_executor_test.go index 5cf892ae..5d46c734 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -640,6 +640,7 @@ func TestJobExecutor_Execute(t *testing.T) { workCtx, cancelFunc := context.WithCancelCause(ctx) executor.CancelFunc = cancelFunc + t.Cleanup(func() { cancelFunc(nil) }) executor.Execute(workCtx) executor.Completer.Wait() diff --git a/producer.go b/producer.go index a73b557e..a5e4db46 100644 --- a/producer.go +++ b/producer.go @@ -358,6 +358,7 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype. workUnit = workInfo.workUnitFactory.MakeUnit(job) } + // jobCancel will always be called by the executor to prevent leaks. jobCtx, jobCancel := context.WithCancelCause(workCtx) executor := baseservice.Init(&p.Archetype, &jobExecutor{