Skip to content

Fix task enqueue race during job preparation#198

Merged
NumericalAdvantage merged 2 commits intomainfrom
jobStateMachineBug
Feb 16, 2026
Merged

Fix task enqueue race during job preparation#198
NumericalAdvantage merged 2 commits intomainfrom
jobStateMachineBug

Conversation

@NumericalAdvantage
Copy link
Copy Markdown
Collaborator

@NumericalAdvantage NumericalAdvantage commented Feb 9, 2026

When we have a Job that might take a long time to "PREPARE", what happens is that a lot of tasks from this Job might actually get created and even be ready to be picked up by the worker. In this case, an assertion fails which checks that the Job is in PENDING (and then switches it to IN_PROGRESS) and not PREPARING. If the Job is still being prepared, we raise assertion error that a task is being queued for the worker before the job the task belongs to has fully been PREPARED. So the fix was to make sure that we first finish preparing all the tasks in the DB before we enqueue them in the procrastinate worker.

Summary by CodeRabbit

  • Bug Fixes

    • Fixed task queuing and processing behavior for extraction and subscription jobs, improving reliability of job retries and re-queueing operations while ensuring proper task state management.
  • Tests

    • Added regression tests to verify correct task processing and queuing behavior for extraction and subscription jobs.

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello @NumericalAdvantage, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical race condition within the job processing system where tasks could be dispatched to workers before their associated parent job had completed its initial preparation phase. By strategically delaying the enqueuing of tasks until the job's status is confirmed as PENDING, the changes prevent premature task execution and eliminate assertion errors that previously occurred due to inconsistent job states. This ensures robust and reliable job processing, particularly for long-running preparation steps.

Highlights

  • Race Condition Fix: Addressed a race condition where tasks were enqueued prematurely while their parent job was still in a 'PREPARING' state, leading to assertion failures.
  • Deferred Task Enqueueing: Modified job processing logic for both ExtractionJob and SubscriptionJob to ensure tasks are only enqueued after the job has fully transitioned from 'PREPARING' to 'PENDING'.
  • queued_job_id Management: The 'queued_job_id' for ExtractionTask is now explicitly cleared upon task completion, allowing for re-queueing if needed.
  • New Regression Tests: Introduced dedicated regression tests for both extraction and subscription job processing to validate the fix and prevent future occurrences of this race condition.
Changelog
  • radis/extractions/tasks.py
    • Implemented logic to clear 'queued_job_id' for ExtractionTask upon successful or failed processing, enabling task re-queueing.
    • Refactored 'process_extraction_job' to ensure tasks are only enqueued after the job's preparation phase is complete and its status is PENDING, preventing premature task execution.
    • Added inline comments to clarify the invariant regarding task enqueuing during the 'PREPARING' state.
  • radis/extractions/tests/test_tasks.py
    • New file added containing 'test_process_extraction_job_only_enqueues_tasks_after_job_is_pending'. This test verifies that ExtractionTask instances are only enqueued when their parent ExtractionJob is in the PENDING state, confirming the race condition fix.
  • radis/subscriptions/tasks.py
    • Modified 'process_subscription_job' to delay the enqueuing of SubscriptionTask instances until the parent SubscriptionJob has completed its preparation and is in the PENDING state.
    • Added inline comments to explain the necessity of this delayed enqueuing to avoid assertion errors.
  • radis/subscriptions/tests/test_tasks.py
    • New file added containing 'test_process_subscription_job_only_enqueues_tasks_after_job_is_pending'. This test confirms that SubscriptionTask instances are correctly enqueued only when their parent SubscriptionJob is in the PENDING state, mirroring the fix for extraction jobs.
Activity
  • The pull request was initiated by NumericalAdvantage to address a race condition in job preparation.
  • The author provided a clear description of the problem, where tasks were being queued before the job was fully prepared, leading to assertion failures.
  • The proposed solution involves ensuring tasks are only enqueued after the job's preparation is complete.
  • New test files were added to validate the fix for both extraction and subscription jobs.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@claude
Copy link
Copy Markdown

claude Bot commented Feb 9, 2026

Pull Request Review: Fix task enqueue race during job preparation

Summary

This PR addresses a critical race condition (#197) in the Job/Task processing model where tasks could be enqueued and picked up by workers while the parent job was still in PREPARING status, causing assertion failures in AnalysisTaskProcessor.start().

✅ Positive Findings

Code Quality

  • Clear intent: The fix establishes a well-documented invariant that tasks must only be enqueued after job preparation is complete
  • Comprehensive comments: Excellent inline documentation explaining why the change is necessary (lines 39-42 in extractions/tasks.py)
  • Consistent implementation: The same pattern is applied to both extractions and subscriptions apps
  • Proper separation of concerns: Tasks are now created first, then enqueued in a separate step

Bug Fix Approach

  • Root cause addressed: The fix correctly identifies that the race occurs when task creation is slow and workers are fast
  • Resume/retry path preserved: The code properly handles existing tasks (resume/retry scenario) vs new job preparation
  • Status flow maintained: Jobs stay in PENDING for resume/retry, only enter PREPARING for new job preparation

Test Coverage

  • Regression tests added: Both apps now have tests explicitly verifying the invariant
  • Test design: Tests use monkeypatching to capture job status at enqueue time, which is an effective way to verify the invariant
  • Test coverage: Tests verify that ALL tasks are enqueued with job in PENDING status

🔍 Areas for Improvement

1. Duplicate Database Queries

In radis/extractions/tasks.py:25-29 and radis/subscriptions/tasks.py:28-30:

task = ExtractionTask.objects.get(id=task_id)  # Re-fetching after processing
task.queued_job_id = None
task.save()

Issue: The task is re-fetched from the database after processing. This is inefficient and could cause issues if the task was modified during processing.

Recommendation: Consider accessing the task from the processor:

processor = ExtractionTaskProcessor(task)
processor.start()
task.queued_job_id = None  # task is already in scope
task.save()

However, if processor.start() modifies the task internally, the re-fetch may be intentional to avoid stale data. Please clarify if this is the case.

2. Redundant Job Status Check in Extractions

In radis/extractions/tasks.py:105-107:

if job.queued_job_id is not None:
    job.queued_job_id = None
    job.save()

Issue: This check appears redundant since:

  • For new jobs: queued_job_id is already cleared on line 99
  • For resume/retry: queued_job_id would already be cleared from the previous run

Recommendation: Remove lines 105-107 unless there's a specific edge case this handles. If it's a defensive check, add a comment explaining when this would trigger.

3. Missing queued_job_id Cleanup in Task Processor

In radis/extractions/tasks.py:25-29, the task's queued_job_id is cleared after processing. However, the same cleanup appears to be missing from the job processing function for the job itself.

Question: Should process_extraction_job also clear its own queued_job_id after completion? Line 99 clears it, but only for new jobs, not for resume/retry paths.

4. Potential N+1 Query Issue

In radis/extractions/tasks.py:109-111:

for task in tasks_to_enqueue:
    if not task.is_queued:
        task.delay()

Issue: Each task.delay() call likely triggers a database write. For jobs with many tasks, this could be inefficient.

Recommendation: Consider if there's a way to batch the delay operations or use bulk updates if applicable. This may not be possible with Procrastinate's API, but worth investigating for performance optimization.

5. Test Assertion Could Be More Specific

In both test files, the final assertion:

assert enqueue_job_statuses
assert all(status == ExtractionJob.Status.PENDING for status in enqueue_job_statuses)

Suggestion: Consider adding an assertion on the count to ensure the expected number of tasks were enqueued:

assert len(enqueue_job_statuses) == 2  # We created 2 reports
assert all(status == ExtractionJob.Status.PENDING for status in enqueue_job_statuses)

🔒 Security Considerations

  • No security issues identified: The changes don't introduce any security vulnerabilities
  • Race condition properly handled: The fix prevents an inconsistent state that could have led to operational issues

⚡ Performance Considerations

  • Small overhead added: Re-fetching tasks adds one extra query per task processed, but this is negligible
  • Improved reliability: The fix prevents assertion errors and potential system instability, which outweighs the minor performance cost
  • Potential optimization: See point Adit radis shared subrepo #4 above regarding batch enqueuing

📊 Test Coverage

  • Good coverage: Regression tests added for the specific race condition
  • Test quality: Tests effectively verify the invariant using monkeypatching
  • Suggestion: Consider adding an integration test that actually runs workers in parallel to verify the race is fixed end-to-end (though this may be too complex for unit tests)

📝 Code Style & Conventions

  • Line length: All lines within 100 character limit ✓
  • Google Python Style Guide: Code follows the project's style guide ✓
  • Type hints: Could add return type hints to test functions, but not critical for tests
  • Comments: Excellent use of comments to explain the invariant

✨ Summary

This is a solid fix for a critical race condition. The approach is sound, well-documented, and includes proper test coverage. The main suggestions are around minor efficiency improvements and code clarity, not correctness issues.

Recommendation: ✅ Approve with minor suggestions

The race condition fix is correct and the tests verify the invariant properly. The suggestions above are optimizations and clarifications that can be addressed in follow-up work or in this PR based on your preference.

Great work identifying and fixing this subtle concurrency issue! 🎉

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a race condition during job preparation by ensuring that tasks are only enqueued after the job preparation is complete and the job status is PENDING. The changes primarily involve refactoring the logic in process_extraction_job and process_subscription_job to delay task enqueuing. New regression tests have been added to verify this behavior.

My review focuses on the implementation details. I've found a potential improvement in radis/extractions/tasks.py to make the code more efficient and robust against race conditions when clearing the queued_job_id. The rest of the changes look solid and effectively fix the described issue.

Comment on lines +27 to +29
task = ExtractionTask.objects.get(id=task_id)
task.queued_job_id = None
task.save()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Re-fetching the task object and then calling save() without update_fields can be inefficient and may lead to race conditions. A concurrent update to another field on this task could be overwritten. It's better to update only the necessary field. The task object is already in memory from the start of the function.

Suggested change
task = ExtractionTask.objects.get(id=task_id)
task.queued_job_id = None
task.save()
task.queued_job_id = None
task.save(update_fields=["queued_job_id"])

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Feb 9, 2026

📝 Walkthrough

Walkthrough

The pull request refactors task enqueuing behavior in both extraction and subscription job processing. Tasks are now enqueued only after the job transitions to PENDING state, rather than during task creation. The job's queued_job_id is cleared post-preparation to prevent stale queue state. Regression tests verify that tasks are enqueued only when the job reaches PENDING status.

Changes

Cohort / File(s) Summary
Extraction Task Processing
radis/extractions/tasks.py
Modified process_extraction_task to clear queued_job_id after processing. Modified process_extraction_job to defer task enqueuing until after job transitions from PREPARING to PENDING, collecting pending tasks and enqueuing them only when not already queued. Added safeguard to clear stale queued_job_id.
Subscription Task Processing
radis/subscriptions/tasks.py
Removed immediate task enqueuing within task creation loop. Added deferred enqueue logic after job state transition to collect and enqueue all PENDING tasks only after job completes PREPARING phase.
Extraction Tests
radis/extractions/tests/test_tasks.py
New regression test verifying that tasks are enqueued with PENDING status when process_extraction_job is invoked. Monkeypatches ExtractionTask.delay to capture enqueuing events and validates job state.
Subscription Tests
radis/subscriptions/tests/test_tasks.py
New test (test_process_subscription_job_only_enqueues_tasks_after_job_is_pending) validating that tasks transition from PREPARING to PENDING before enqueuing. Patches SubscriptionTask.delay and asserts task states are PENDING when enqueued.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant Job
    participant TaskQueue as Task Queue
    
    User->>Job: process_extraction_job / process_subscription_job
    activate Job
    alt existing tasks (resume/retry)
        Job->>Job: collect pending tasks
    else no existing tasks
        Job->>Job: set PREPARING state
        Job->>Job: create new tasks
    end
    
    Job->>Job: complete preparation
    Job->>Job: transition to PENDING
    Job->>Job: clear queued_job_id
    Job->>Job: collect pending tasks
    
    loop for each pending task not queued
        Job->>TaskQueue: task.delay()
    end
    
    deactivate Job
    TaskQueue->>User: tasks ready to process
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Poem

🐰 Tasks must wait with patient cheer,
Till PENDING calls them loud and clear!
No queue-ing while we're PREPARING still—
Good state machines bend to our will! 🎪

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 28.57% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Fix task enqueue race during job preparation' is specific, concise, and directly reflects the main change: addressing a race condition where tasks are enqueued before job preparation completes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch jobStateMachineBug

No actionable comments were generated in the recent review. 🎉

🧹 Recent nitpick comments
radis/subscriptions/tests/test_tasks.py (1)

38-39: fake_delay reads self.job.status — verify it hits the DB, not a cached instance.

When task.delay() is called from the loop in tasks.py (line 110), the task objects come from job.tasks.filter(...). Accessing self.job on those queryset results will trigger a fresh DB fetch, so this correctly reflects the saved PENDING status. However, if the ORM ever prefetches or caches the FK (e.g., via select_related), this test could silently pass even if the invariant were broken.

Consider adding self.job.refresh_from_db() before reading the status to make the assertion airtight:

Suggested hardening
     def fake_delay(self: SubscriptionTask) -> None:
+        self.job.refresh_from_db()
         enqueue_job_statuses.append(self.job.status)

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Copy Markdown

claude Bot commented Feb 9, 2026

Code Review

Summary

This PR fixes a race condition where tasks could be enqueued while their parent job is still in the PREPARING state, causing assertion failures in AnalysisTaskProcessor.start() (radis/core/processors.py:46). The fix ensures tasks are only enqueued after the job transitions to PENDING.

✅ Strengths

  1. Correct Root Cause Analysis: The fix properly identifies and addresses the race condition between job preparation and task enqueuing.

  2. Clear Documentation: Excellent inline comments explaining the invariant that tasks must not be enqueued during PREPARING state.

  3. Comprehensive Test Coverage: Both affected modules (extractions and subscriptions) now have regression tests that verify the fix by monkeypatching delay() to capture job status at enqueue time.

  4. Consistent Pattern: The fix is applied consistently across both extractions/tasks.py and subscriptions/tasks.py.

  5. Proper Cleanup: The addition of queued_job_id = None cleanup in process_extraction_task() (lines 26-28) enables proper task re-queuing on reset/retry.

🔍 Code Quality Observations

Positive

  • Code follows Google Python Style Guide (100 char line length)
  • Type hints would be inferred correctly by pyright
  • Log messages are clear and informative
  • Test factories used appropriately

Minor Suggestions

  1. DRY Principle (radis/extractions/tasks.py:105-108 and 111-114):
    The queued_job_id clearing logic appears twice. Consider consolidating into a single location.

  2. Test Assertion Messages (test files):
    The assertions in tests could benefit from failure messages for easier debugging when they fail.

  3. Potential Edge Case (radis/extractions/tasks.py:41-48):
    When resuming/retrying, consider explicitly filtering out already-queued tasks in the query rather than relying only on the is_queued check in the loop.

🐛 Potential Issues

None identified - The logic appears sound and the fix addresses the root cause correctly.

🔒 Security Considerations

No security concerns. The changes are internal state management improvements.

⚡ Performance Considerations

Minor improvement: The fix batches task creation (during PREPARING) separately from task enqueuing (after PENDING), which is more efficient than interleaving them.

Database queries: The current implementation makes multiple queries in the enqueuing loop. Consider using bulk_update() if task modifications are needed, though the current approach is fine for typical workloads.

🧪 Test Coverage

Excellent: Both modules have comprehensive regression tests that:

  • Mock the exact failure scenario from issue Assertion Error on Extraction Jobs #197
  • Verify the invariant holds (tasks only enqueued when job is PENDING)
  • Use realistic factories and providers
  • Are properly isolated with monkeypatching

Suggestion: Consider adding an integration test that verifies the end-to-end behavior with actual Procrastinate workers, though the current unit tests are sufficient for regression prevention.

📋 Additional Notes

  1. Migration: No database migrations needed ✓
  2. Backwards compatibility: The fix is fully backward compatible ✓
  3. Documentation: The inline comments serve as good documentation. Consider adding this pattern to KNOWLEDGE.md for future reference.
  4. Related code: The same pattern should be verified in any other job preparation code paths to ensure consistency.

Verdict

LGTM

This is a well-designed fix with excellent test coverage. The minor suggestions above are optional improvements, not blockers. The PR successfully resolves issue #197 and prevents the assertion error by enforcing the proper state machine invariant.


Review generated by Claude Code

@NumericalAdvantage NumericalAdvantage merged commit cacb8de into main Feb 16, 2026
3 checks passed
@NumericalAdvantage NumericalAdvantage deleted the jobStateMachineBug branch February 16, 2026 13:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant