diff --git a/CHANGELOG.md b/CHANGELOG.md index eb5ad957..6f795d5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fixed a memory leak caused by not always cancelling the context used to enable jobs to be cancelled remotely. [PR foo](https://github.com/riverqueue/river/pull/TODO). + ## [0.0.23] - 2024-02-29 ### Added diff --git a/job_executor.go b/job_executor.go index cbd3524d..d8be5213 100644 --- a/job_executor.go +++ b/job_executor.go @@ -138,6 +138,9 @@ func (e *jobExecutor) Cancel() { } func (e *jobExecutor) Execute(ctx context.Context) { + // Ensure that the context is cancelled no matter what, or it will leak: + defer e.CancelFunc(nil) + e.start = e.TimeNowUTC() e.stats = &jobstats.JobStatistics{ QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt), diff --git a/job_executor_test.go b/job_executor_test.go index 5cf892ae..5d46c734 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -640,6 +640,7 @@ func TestJobExecutor_Execute(t *testing.T) { workCtx, cancelFunc := context.WithCancelCause(ctx) executor.CancelFunc = cancelFunc + t.Cleanup(func() { cancelFunc(nil) }) executor.Execute(workCtx) executor.Completer.Wait() diff --git a/producer.go b/producer.go index a73b557e..a5e4db46 100644 --- a/producer.go +++ b/producer.go @@ -358,6 +358,7 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype. workUnit = workInfo.workUnitFactory.MakeUnit(job) } + // jobCancel will always be called by the executor to prevent leaks. jobCtx, jobCancel := context.WithCancelCause(workCtx) executor := baseservice.Init(&p.Archetype, &jobExecutor{