Skip to content

Add sentinel to async queue in wado_retrieve#240

Merged
samuelvkwong merged 18 commits intomainfrom
wadors-improve
Oct 24, 2025
Merged

Add sentinel to async queue in wado_retrieve#240
samuelvkwong merged 18 commits intomainfrom
wadors-improve

Conversation

@samuelvkwong
Copy link
Collaborator

In the previous implementation of wado_retrieve, fetch_task is initially scheduled on the event loop. Inside the continuous loop, a new queue_get_task is scheduled on the event loop after the previous queue_get_task has finished.
There is a possible race condition in the following case:
The event loop could finish running fetch_task and asyncio.wait() returns immediately before the event loop can run queue_get_task for the last element in the queue.

My suggested fix is to schedule a task on the event loop that waits for fetch_task to finish running and then puts a sentinel in the queue. In the continuous loop, we wait for queue items, yielding each one unless it is a sentinel value in which case we break. This ensures no datasets are skipped.

@medihack
Copy link
Member

@codex

@chatgpt-codex-connector
Copy link

Summary

Testing

  • pytest adit/dicom_web/tests/utils/test_wado_retrieve.py (fails: ModuleNotFoundError: No module named 'nest_asyncio')

View task →

@medihack
Copy link
Member

@claude

@claude
Copy link

claude bot commented Oct 23, 2025

Claude finished @medihack's task —— View job


Code Review

  • Gather context and read repository guidelines
  • Review the changes in wadors_utils.py
  • Review the new test file
  • Check dependencies and testing setup
  • Provide comprehensive feedback

Summary

This PR successfully addresses a race condition in the wado_retrieve async generator by implementing a sentinel-based queue termination pattern. The fix is sound and well-implemented.

The Race Condition

Original Issue: In the previous implementation, the event loop could complete fetch_task and return from asyncio.wait() before the final queue_get_task could retrieve the last item from the queue, resulting in skipped datasets.

Root Cause: The code was using asyncio.wait() with FIRST_COMPLETED, checking if fetch_task was done, and immediately breaking. If items were still in the queue when fetch_task finished, they would be lost.

The Fix

The sentinel pattern implemented here is a textbook solution for async producer-consumer problems:

  1. Sentinel Task (add_sentinel_when_done()): Waits for fetch_task to complete, then pushes None into the queue as a termination signal
  2. Consumer Loop: Continuously pulls from the queue, yielding items until it receives the None sentinel

Key changes in adit/dicom_web/utils/wadors_utils.py:35,75-87:

  • Queue type changed from asyncio.Queue[Dataset] to asyncio.Queue[Dataset|None] to allow the sentinel
  • Replaced complex asyncio.wait() logic with a simple while True loop
  • Added sentinel task that puts None after fetch_task completes
  • Loop now breaks only when receiving None, ensuring all queued items are processed

Code Quality

Strengths:
✅ The sentinel pattern is the idiomatic solution for this problem
✅ Type annotations properly updated to reflect Dataset | None
✅ Code is cleaner and more readable than the original
✅ Exception handling preserved correctly

Minor observations:

  • The spacing in asyncio.Queue[Dataset|None] (line 35) uses the compact style without spaces around |. While Python accepts both Dataset|None and Dataset | None, PEP 604 examples typically show spacing for readability. However, this is a stylistic preference and doesn't affect functionality.

Test Implementation

The test in adit/dicom_web/tests/utils/test_wado_retrieve.py has an issue:

Problem: The test demonstrates the race condition in the old implementation but doesn't actually test the new implementation with the sentinel fix. The test function wado_retrieve_rc() still uses the old asyncio.wait() approach without a sentinel.

What the test currently does:

  • Simulates the old buggy behavior
  • Asserts that only [0, 1] are returned (showing the bug)
  • Doesn't verify that the fixed code returns [0, 1, 2]

Recommendation: The test should either:

  1. Import and test the actual wado_retrieve function to verify the sentinel fix works, OR
  2. Include two separate test functions: one demonstrating the old race condition and another verifying the sentinel-based fix

Example of what a proper test might look like:

@pytest.mark.asyncio
async def test_sentinel_prevents_race_condition():
    """Test that the sentinel pattern ensures all items are processed."""
    results = []
    
    async def wado_retrieve_with_sentinel():
        queue = asyncio.Queue()
        
        async def fetch():
            for i in range(3):
                await asyncio.sleep(0.0001)
                queue.put_nowait(i)
        
        fetch_task = asyncio.create_task(fetch())
        
        async def add_sentinel_when_done():
            await fetch_task
            await queue.put(None)
        
        asyncio.create_task(add_sentinel_when_done())
        
        while True:
            item = await queue.get()
            if item is None:
                break
            results.append(item)
            await asyncio.sleep(0.0003)  # Simulate delay
    
    await wado_retrieve_with_sentinel()
    assert results == [0, 1, 2]  # All items should be retrieved

Missing Dependency Note

The comment from @chatgpt-codex-connector mentions: ModuleNotFoundError: No module named 'nest_asyncio'

However, I can confirm that nest-asyncio>=1.6.0 is listed in the dev dependencies in pyproject.toml:68. This error likely occurred in a different environment or before dependencies were installed. The test should work fine in a properly configured development environment with uv sync run.

Recommendation

Approve with suggestions:

  1. ✅ The core fix is correct and should be merged
  2. 📝 Consider updating the test to actually verify the fixed implementation
  3. 📝 Optional: Add spacing in type annotation for PEP 604 style consistency: asyncio.Queue[Dataset | None]

The sentinel-based approach is robust and eliminates the race condition effectively. Great work identifying and fixing this subtle concurrency bug!


@medihack
Copy link
Member

  1. 📝 Optional: Add spacing in type annotation for PEP 604 style consistency: asyncio.Queue[Dataset | None]

I really wonder why this was not catched by our lint rule which normally checks for this. But you also need to install ruff and I also recommend to active "format on save" in VSCode settings.

@samuelvkwong
Copy link
Collaborator Author

  1. 📝 Optional: Add spacing in type annotation for PEP 604 style consistency: asyncio.Queue[Dataset | None]

I really wonder why this was not catched by our lint rule which normally checks for this. But you also need to install ruff and I also recommend to active "format on save" in VSCode settings.

Interesting. Setting my default formatter to ruff and setting "format on save" does take care of it. Before, I was running cli lint/cli format-code as well as uv run ruff check . --fix

@claude
Copy link

claude bot commented Oct 24, 2025

Pull Request Review: Add sentinel to async queue in wado_retrieve

Summary

This PR addresses a race condition in the wado_retrieve function by implementing a sentinel pattern to ensure all queued datasets are processed before the async generator completes. The fix is well-motivated and addresses a real concurrency issue.

Code Quality and Best Practices

Strengths ✅

  1. Clear problem identification: The PR description clearly explains the race condition scenario
  2. Clean implementation: The sentinel pattern is a well-established solution for this type of problem
  3. Type annotations: Good use of Queue[Dataset | None] to indicate the sentinel value
  4. Testing: Added unit tests that demonstrate both the problem and the solution

Areas for Improvement 📋

1. Test Quality (adit/dicom_web/tests/utils/test_wado_retrieve.py:1-76)

Issue: The first test test_race_condition is problematic:

  • It relies on specific timing (asyncio.sleep(0.0001), asyncio.sleep(0.0003)) which is fragile and may fail on different systems or under different loads
  • The commented assertion # assert results in ([0, 1, 2], [0, 1], [0]) suggests uncertainty about expected behavior
  • The test doesn't actually test the production code—it reimplements the buggy logic

Recommendation:

  • Remove test_race_condition entirely, or
  • Move it to a comment/docstring explaining the original problem
  • Keep only test_sentinel_prevents_race_condition which tests the actual implementation

2. Error Handling (adit/dicom_web/utils/wadors_utils.py:75-79)

Issue: The add_sentinel_when_done() task is fire-and-forget with no error handling.

async def add_sentinel_when_done():
    await fetch_task
    await queue.put(None)

asyncio.create_task(add_sentinel_when_done())

If fetch_task raises an exception, it will be swallowed and the sentinel will never be added, causing the main loop to hang indefinitely.

Recommendation:

async def add_sentinel_when_done():
    try:
        await fetch_task
    finally:
        await queue.put(None)

asyncio.create_task(add_sentinel_when_done())

This ensures the sentinel is always added, even if fetch fails. The exception will still propagate from the main await of the generator.

3. Task Lifecycle Management (adit/dicom_web/utils/wadors_utils.py:79)

Issue: The created task is not tracked, which can lead to warnings about unawaited tasks or make debugging harder.

Recommendation: Store the task reference:

sentinel_task = asyncio.create_task(add_sentinel_when_done())

This suppresses asyncio warnings and makes the control flow more explicit.

4. Test Implementation Details (adit/dicom_web/tests/utils/test_wado_retrieve.py:45-76)

Issue: test_sentinel_prevents_race_condition doesn't test the actual wado_retrieve function—it tests a simplified mock version.

Recommendation: Consider adding an integration test that:

  • Mocks the DicomOperator and its fetch methods
  • Calls the actual wado_retrieve function
  • Verifies all datasets are yielded

Example structure:

@pytest.mark.asyncio
async def test_wado_retrieve_processes_all_datasets(mocker):
    # Mock the DicomOperator and QueryDataset
    # Call actual wado_retrieve()
    # Verify all datasets are yielded

5. Code Style (adit/dicom_web/utils/wadors_utils.py:39-41)

Minor: There's inconsistent blank line usage. The blank line was removed after the callback definition:

def callback(ds: Dataset) -> None:
    dicom_manipulator.manipulate(ds, pseudonym, trial_protocol_id, trial_protocol_name)
    loop.call_soon_threadsafe(queue.put_nowait, ds)

This is fine, but ensure it aligns with the project's style (Google Python Style Guide per CONTRIBUTING.md).

Potential Bugs or Issues

Critical 🔴

  1. Missing exception handling in sentinel task (see error handling section above) - could cause hangs

Medium 🟡

  1. Race condition still possible: If an exception occurs in the callback, datasets might be lost. Consider adding error handling in the callback or around queue.put_nowait.

Performance Considerations

No performance concerns:

  • The sentinel pattern adds negligible overhead (one extra queue operation)
  • Actually improves reliability, preventing potential data loss
  • No blocking operations added

Security Concerns

No security issues identified:

  • DICOM data handling unchanged
  • No new attack vectors introduced
  • Pseudonymization logic untouched

Test Coverage

Current Coverage

  • ✅ Unit test demonstrates the sentinel pattern works
  • ⚠️ Tests don't exercise the actual wado_retrieve function
  • ⚠️ No tests for error conditions (fetch failure, callback exceptions)

Recommendations

  1. Add integration test using actual wado_retrieve with mocked dependencies
  2. Add test for exception handling:
    • What happens if fetch_task raises an exception?
    • What happens if callback raises an exception?
  3. Remove or comment out the test_race_condition function

Summary

Overall Assessment: ✅ Approve with recommendations

This PR correctly identifies and fixes a real race condition. The sentinel pattern is the right approach. However, there are some improvements needed:

Must Fix (before merge):

  1. ✅ Add try/finally block in add_sentinel_when_done() to ensure sentinel is always added

Should Fix (before merge):

  1. Remove or relocate test_race_condition
  2. Add integration test using actual wado_retrieve

Nice to Have:

  1. Add error handling tests
  2. Store task reference for better lifecycle management

The core fix is sound, but the error handling gap could cause production issues. With the try/finally addition, this would be ready to merge.


Great work identifying and fixing this subtle race condition! 🎉

@claude
Copy link

claude bot commented Oct 24, 2025

Pull Request Review

Thanks for this fix @samuelvkwong! This addresses an important race condition in the async queue handling. Overall, this is a solid improvement with good test coverage.

✅ Strengths

  1. Correct Race Condition Fix: The sentinel pattern is the right approach for this problem. The old implementation using asyncio.wait() with FIRST_COMPLETED had a timing vulnerability where fetch_task could complete before the final queue_get_task was scheduled.

  2. Excellent Test Coverage: Adding 181 lines of tests for this utility function shows good engineering practice. The integration test properly mocks all dependencies and verifies the fix.

  3. Clean Implementation: The add_sentinel_when_done() helper function is well-structured and uses try/finally to ensure the sentinel is always added.

  4. Type Safety: Good use of type hints (asyncio.Queue[Dataset | None]) to indicate the queue now accepts None as a sentinel value.

🔍 Code Quality Issues

1. Potential Task Leak (adit/dicom_web/utils/wadors_utils.py:81)

asyncio.create_task(add_sentinel_when_done())

Issue: The task created here is not stored or awaited, which can lead to:

  • Task never completes if an exception occurs
  • Potential resource leak
  • Python 3.12+ will warn about unawaited tasks

Recommendation: Store the sentinel task and ensure it completes:

sentinel_task = asyncio.create_task(add_sentinel_when_done())

Then ensure cleanup happens properly, or use task groups in Python 3.11+:

async with asyncio.TaskGroup() as tg:
    fetch_task = tg.create_task(...)
    sentinel_task = tg.create_task(add_sentinel_when_done())

2. Missing Exception Propagation (adit/dicom_web/utils/wadors_utils.py:75-79)

The try/finally in add_sentinel_when_done() silently swallows exceptions from fetch_task. If the fetch fails, the consumer will just get a sentinel without knowing an error occurred.

Current behavior:

async def add_sentinel_when_done():
    try:
        await fetch_task  # Exception is silently caught
    finally:
        await queue.put(None)

Recommendation: Store and check the exception:

async def add_sentinel_when_done():
    try:
        await fetch_task
    except Exception as e:
        await queue.put(None)
        raise  # Re-raise so the exception isn't lost

Actually, looking at the outer exception handling (lines 91-94), the exceptions should already be caught there. However, the current implementation might cause the generator to exit cleanly even if fetch_task fails, which could mask errors.

3. Test Code in Production File (adit/dicom_web/tests/utils/test_wado_retrieve.py:105-177)

The commented-out mock tests simulating the race condition should either be:

  • Removed (as the TODO comment suggests)
  • Uncommented if they provide value for regression testing
  • Moved to a separate documentation file if kept for educational purposes

Recommendation: I'd suggest keeping these tests uncommented. They clearly demonstrate the problem and provide regression protection. The artificial delays make them a bit fragile, but they serve as executable documentation of the race condition.

🐛 Potential Bugs

1. Edge Case: Empty Queue After Fetch Failure

If fetch_task raises an exception before putting any items in the queue:

  1. The exception is caught by the outer try/except (lines 91-94)
  2. The sentinel is added to the queue
  3. The consumer gets the sentinel immediately
  4. The generator yields nothing and exits cleanly

This is probably the desired behavior, but it's worth verifying that an empty result set is acceptable when a retriable error occurs.

2. Queue Cleanup on Exception

If an exception occurs in the consumer loop (lines 83-89), the fetch_task and sentinel task may still be running. The outer exception handlers convert the exceptions to API errors, but there's no explicit cleanup of the background tasks.

⚡ Performance Considerations

  1. Minimal Performance Impact: The sentinel pattern adds negligible overhead - just one extra queue operation at the end.

  2. Memory Usage: The queue can grow unbounded if the consumer is slower than the producer. This was true in the old implementation as well, but worth noting. Consider adding a max queue size if memory is a concern:

    queue = asyncio.Queue[Dataset | None](maxsize=100)

🔒 Security Concerns

No significant security issues identified. The changes don't introduce any new attack vectors.

📝 Style & Best Practices

  1. Google Python Style Guide Compliance: The code generally follows the guide. One minor note: the multi-line string in the test file (lines 105-177) should either be removed or uncommented.

  2. Type Hints: ✅ Properly updated

  3. Docstring: The existing docstring could be updated to mention the race condition fix and sentinel pattern.

🧪 Test Coverage

Excellent! The new test:

  • Properly mocks all dependencies
  • Tests the integration flow end-to-end
  • Verifies all datasets are retrieved (the key fix)
  • Checks side effects (manipulator calls, operator calls)

Suggestion: Consider adding a test case for error handling:

async def test_wado_retrieve_with_fetch_error(monkeypatch):
    # Test that exceptions in fetch_task are properly propagated
    class FakeDicomOperator:
        def fetch_study(self, **kwargs):
            raise RetriableDicomError("Connection failed")
    # ... verify ServiceUnavailableApiError is raised

📋 Summary

Overall Assessment: ✅ Approve with minor suggestions

This is a well-thought-out fix for a real concurrency issue. The sentinel pattern is the correct solution, and the test coverage is excellent. The main concerns are:

  1. Must fix: Handle the unawaited task warning for add_sentinel_when_done()
  2. Should consider: Exception propagation behavior
  3. Nice to have: Uncomment or remove the commented test code

The fix significantly improves the reliability of the wado_retrieve function and should prevent data loss in production.


Recommendation: Approve after addressing the unawaited task issue. The other points are minor and can be addressed in follow-up work if needed.

@claude
Copy link

claude bot commented Oct 24, 2025

Pull Request Review: Add sentinel to async queue in wado_retrieve

Summary

This PR addresses a legitimate race condition in the wado_retrieve function where datasets could be skipped if fetch_task completes before the final queue_get_task is scheduled. The solution using a sentinel pattern with asyncio.TaskGroup is well-designed and represents a significant improvement.

Code Quality & Best Practices ✅

Strengths:

  1. Clean refactoring: The code is more readable and maintainable than the previous implementation
  2. Modern async patterns: Uses asyncio.TaskGroup (Python 3.11+) which provides better error handling and automatic cleanup
  3. Type hints: Properly updated asyncio.Queue[Dataset | None] to reflect the sentinel value
  4. Consistent style: Follows Google Python Style Guide as specified in CONTRIBUTING.md
  5. Clear separation of concerns: The add_sentinel_when_done helper function is well-isolated

Observations:

  • The refactoring from asyncio.create_task() to task_group.create_task() is appropriate
  • The finally block in add_sentinel_when_done ensures the sentinel is always added, even if fetch_task raises an exception

Bug Fixes & Correctness ✅

Race Condition Fix:

The PR correctly identifies and fixes the race condition:

  • Old approach: Used asyncio.wait() with FIRST_COMPLETED, which could finish fetch_task before the last queue_get_task was scheduled
  • New approach: Guarantees all items are processed by using a sentinel value that's only added after fetch_task completes

Potential Issues:

  1. TaskGroup exception handling: In adit/dicom_web/utils/wadors_utils.py:70-74, the add_sentinel_when_done function always adds a sentinel even if fetch_task fails. This is actually correct behavior because:

    • The exception will propagate from the TaskGroup
    • The sentinel ensures the while True loop can terminate cleanly
    • However, consider adding a comment explaining this design choice
  2. Double task creation overhead: The add_sentinel_when_done function creates an additional task that just waits for fetch_task. This is a minor overhead but acceptable for correctness. Alternative approach (for future consideration):

    fetch_task = task_group.create_task(fetch_coro)
    
    async def consume_queue():
        while True:
            queue_ds = await queue.get()
            if queue_ds is None:
                break
            yield queue_ds
    
    # Then handle adding sentinel after fetch_task in the exception handling

Performance Considerations ⚠️

  1. TaskGroup overhead: Minimal additional overhead from using TaskGroup vs manual task management
  2. Extra task creation: The add_sentinel_when_done wrapper adds one extra task, but the performance impact is negligible compared to DICOM operations
  3. Queue efficiency: No change to queue performance; still using asyncio.Queue efficiently
  4. Overall assessment: The performance trade-off is acceptable for the correctness gained

Security Concerns ✅

No security issues identified:

  • DICOM manipulation still happens via DicomManipulator with proper pseudonymization parameters
  • No changes to error handling that could leak sensitive information
  • thread_sensitive=False is appropriate for I/O-bound DICOM operations

Test Coverage ⭐️

Excellent test coverage:

  1. Integration test (test_wado_retrieve_integration):

    • Tests the full flow with mocked dependencies
    • Covers all three levels (STUDY shown, SERIES/IMAGE paths exist)
    • Validates that all datasets are retrieved and manipulated correctly
    • Good use of monkeypatching for sync_to_async to avoid actual threading
  2. Race condition tests:

    • test_race_condition: Demonstrates the old bug with artificial delays
    • test_sentinel_prevents_race_condition: Proves the sentinel pattern solves it
    • These are valuable for documentation purposes, showing the problem and solution

Test Quality Notes:

  1. The immediate_sync_to_async wrapper in tests is clever - it converts the sync-to-async pattern back to async for testing without threading complexity
  2. The use of asyncio.sleep() to simulate timing is appropriate for demonstrating race conditions
  3. Consider adding the comment from the test file explaining the race condition tests can be removed if deemed unnecessary - I'd suggest keeping them as they serve as documentation

Missing Test Coverage:

  1. Error handling paths:

    • Test that RetriableDicomError properly raises ServiceUnavailableApiError
    • Test that DicomError properly raises BadGatewayApiError
    • Test that exceptions in fetch_task still result in proper cleanup (sentinel added, queue consumed)
  2. Edge cases:

    • Empty result set (no datasets returned)
    • Exception thrown during dataset manipulation

Recommendations

Required Changes: None

The code is ready to merge as-is.

Suggested Improvements (optional):

  1. Add explanatory comment in wadors_utils.py:70-74:

    async def add_sentinel_when_done(task: asyncio.Task[None]) -> None:
        try:
            await task
        finally:
            # Always add sentinel even on exception to ensure the consumer loop terminates.
            # The TaskGroup will propagate any exception after cleanup.
            await queue.put(None)
  2. Consider adding error handling tests to verify exception propagation works correctly with the new TaskGroup approach

  3. Documentation: Consider adding a brief comment at the start of the try block explaining the sentinel pattern for future maintainers:

    # Use sentinel pattern: fetch_task adds None to queue when done,
    # ensuring all items are consumed before termination

Conclusion

Recommendation: ✅ APPROVE

This is a high-quality fix that:

  • Correctly addresses a real race condition
  • Improves code clarity and maintainability
  • Includes excellent test coverage demonstrating the problem and solution
  • Uses modern Python async patterns appropriately
  • Maintains backward compatibility in the public API

The sentinel pattern is a well-established solution for producer-consumer problems with async queues, and the implementation is solid. Great work on identifying this subtle race condition and providing a robust fix!


Note: This PR requires Python 3.11+ due to asyncio.TaskGroup. Ensure CI/deployment environments meet this requirement.

@claude
Copy link

claude bot commented Oct 24, 2025

Code Review - PR #240: Add sentinel to async queue in wado_retrieve

Summary

This PR addresses a race condition in the wado_retrieve function where datasets could be lost if the fetch task completes before the final queue.get() task can be scheduled. The solution implements a sentinel pattern to ensure all datasets are processed.

Positive Findings

1. Correctly Identifies and Fixes the Race Condition

The PR accurately identifies the race condition in the original implementation:

  • In the old code, asyncio.wait() with FIRST_COMPLETED could return when fetch_task finishes before the last queue_get_task runs
  • This would cause the loop to break and cancel queue_get_task, losing the last dataset(s)

The sentinel pattern elegantly solves this by:

  • Ensuring a None sentinel is added to the queue after fetch_task completes (wadors_utils.py:70-74)
  • Processing all items until the sentinel is encountered (wadors_utils.py:80-84)

2. Excellent Test Coverage

The test file demonstrates strong testing practices:

  • Integration test (test_wado_retrieve_integration): Tests the complete flow with mocked dependencies, verifying all datasets are retrieved correctly
  • Race condition demonstration (test_race_condition): Proves the old implementation had the bug (only retrieves 2/3 items)
  • Sentinel validation (test_sentinel_prevents_race_condition): Proves the sentinel pattern fixes it (retrieves all 3 items)

The tests use appropriate delays to artificially trigger race conditions, making them reliable and meaningful.

3. Clean Refactoring

The new implementation is cleaner than the old one:

  • Removed complex asyncio.wait() logic with manual task state tracking
  • Simplified control flow with a straightforward queue consumption loop
  • Better use of TaskGroup (already used elsewhere in the codebase at adit/core/management/commands/receiver.py:50)

4. Type Safety Improvements

The queue type annotation was updated from Queue[Dataset] to Queue[Dataset | None] (wadors_utils.py:35), properly reflecting that it can contain the sentinel value.


Issues and Suggestions

1. Potential Issue: Exception Handling with TaskGroup ⚠️

Location: wadors_utils.py:76-84

The new implementation uses async with asyncio.TaskGroup() which has different exception behavior than the old code:

Current behavior:

  • If any task in the TaskGroup raises an exception, the TaskGroup will:
    1. Cancel all other tasks in the group
    2. Raise an ExceptionGroup (or BaseExceptionGroup) containing all exceptions

Problem: The existing exception handlers (lines 86-89) catch RetriableDicomError and DicomError, but they won't catch ExceptionGroup that wraps these exceptions.

Recommendation:

except* RetriableDicomError as err:
    # Use exception group syntax (Python 3.11+)
    raise ServiceUnavailableApiError(str(err.exceptions[0]))
except* DicomError as err:
    raise BadGatewayApiError(str(err.exceptions[0]))

Or explicitly handle ExceptionGroup:

except ExceptionGroup as eg:
    # Extract the first relevant exception
    for exc in eg.exceptions:
        if isinstance(exc, RetriableDicomError):
            raise ServiceUnavailableApiError(str(exc))
        if isinstance(exc, DicomError):
            raise BadGatewayApiError(str(exc))
    raise  # Re-raise if no handled exception found

Testing: Consider adding a test case where fetch_coro raises an exception to verify error handling works correctly.

2. Minor: Test Organization 💭

Location: test_wado_retrieve.py:106-109

The comment suggests these mock tests "can be removed if deemed unnecessary."

Recommendation: I actually think these tests are valuable for documentation purposes - they clearly demonstrate the bug and the fix. However, consider:

  • Moving them to a separate test class (e.g., TestRaceConditionReproduction) to indicate they're demonstrative
  • Updating the comment to be more assertive about their value as regression tests

3. Minor: Sentinel Value Documentation 📝

Location: wadors_utils.py:70-74

The sentinel pattern is well-implemented but could benefit from a brief comment explaining why it's needed.

Suggestion:

async def add_sentinel_when_done(task: asyncio.Task[None]) -> None:
    """Add a sentinel to the queue when fetch completes to signal end of data."""
    try:
        await task
    finally:
        # Always add sentinel, even if fetch fails, so the consumer can exit
        await queue.put(None)

4. Edge Case: What if callback raises an exception? 🤔

Location: wadors_utils.py:39-41

If dicom_manipulator.manipulate() or queue.put_nowait() raises an exception in the callback, it would propagate through the sync thread but might not be properly caught by the TaskGroup.

Testing recommendation: Add a test where the manipulator raises an exception to verify proper error propagation.


Performance Considerations

No performance concerns identified. The new implementation:

  • Eliminates the overhead of repeatedly calling asyncio.wait()
  • Uses the same queue-based approach for thread-safe communication
  • TaskGroup provides proper task lifecycle management

Security Considerations

No security concerns identified. The changes are purely about async control flow and don't affect:

  • DICOM data validation
  • Authentication/authorization
  • Network communication
  • Data manipulation

Code Quality

The code follows the Google Python Style Guide as required by CONTRIBUTING.md:

  • ✅ Clear variable names
  • ✅ Appropriate type hints
  • ✅ Consistent formatting
  • ✅ Good test structure

Final Recommendation

Approve with minor suggestions. This is a solid fix for a subtle but important race condition. The primary concern is ensuring exception handling works correctly with TaskGroup. I recommend:

  1. Must have: Verify exception handling with TaskGroup (add test or adjust exception handlers)
  2. Nice to have: Add brief documentation comments for the sentinel pattern
  3. Optional: Reorganize the mock race condition tests into a separate class

Great work on identifying and fixing this race condition! The test coverage is particularly impressive.

@claude
Copy link

claude bot commented Oct 24, 2025

Code Review - PR #240: Add sentinel to async queue in wado_retrieve

Summary

This PR addresses a race condition in the wado_retrieve async generator function. The fix uses a sentinel pattern to ensure all datasets are processed before the generator completes. Overall, this is a solid improvement that fixes a real concurrency bug.

✅ Strengths

1. Correct Problem Identification

The race condition is real and well-documented. The old implementation using asyncio.wait() with FIRST_COMPLETED could indeed finish fetch_task before the final queue_get_task runs, causing the last dataset(s) to be skipped.

2. Elegant Solution

The sentinel pattern is a well-established approach for this type of producer-consumer problem. Adding None as a sentinel when fetching completes is cleaner and more reliable than the previous approach.

3. Excellent Test Coverage

  • Integration test validates the full workflow with realistic mocking
  • Two unit tests demonstrate the race condition and its fix
  • Tests are well-structured and use appropriate async testing patterns

4. Cleaner Code

The new implementation is more readable and easier to understand than the old asyncio.wait() logic.

🔍 Issues & Concerns

1. Redundant Task in TaskGroup (Minor Issue)

File: adit/dicom_web/utils/wadors_utils.py:76-78

async with asyncio.TaskGroup() as task_group:
    fetch_task = task_group.create_task(fetch_coro)
    task_group.create_task(add_sentinel_when_done(fetch_task))

The add_sentinel_when_done helper creates an unnecessary extra task. The code can be simplified:

async def fetch_and_add_sentinel():
    await fetch_coro
    await queue.put(None)

async with asyncio.TaskGroup() as task_group:
    task_group.create_task(fetch_and_add_sentinel())
    
    while True:
        queue_ds = await queue.get()
        if queue_ds is None:
            break
        yield queue_ds

Or even simpler, don't use TaskGroup at all since there's only one background task:

async def fetch_and_add_sentinel():
    try:
        await fetch_coro
    finally:
        await queue.put(None)

fetch_task = asyncio.create_task(fetch_and_add_sentinel())

try:
    while True:
        queue_ds = await queue.get()
        if queue_ds is None:
            break
        yield queue_ds
finally:
    await fetch_task  # Ensure cleanup

2. Exception Handling Complexity

File: adit/dicom_web/utils/wadors_utils.py:86-93

The ExceptionGroup handling assumes TaskGroup behavior, but the logic is brittle:

  • Only extracts the first matching exception type
  • Falls through to re-raise if no match, which could expose internal implementation details
  • Doesn't handle the case where multiple different exceptions occur

Consider:

except ExceptionGroup as eg:
    # Prioritize specific DICOM errors
    retriable_errors = [e for e in eg.exceptions if isinstance(e, RetriableDicomError)]
    if retriable_errors:
        raise ServiceUnavailableApiError(str(retriable_errors[0]))
    
    dicom_errors = [e for e in eg.exceptions if isinstance(e, DicomError)]
    if dicom_errors:
        raise BadGatewayApiError(str(dicom_errors[0]))
    
    # Re-raise the exception group if no known error types
    raise

3. Type Annotation Precision (Nitpick)

File: adit/dicom_web/utils/wadors_utils.py:44

fetch_coro: Awaitable[None]

This variable holds a coroutine, which is technically Coroutine[Any, Any, None], not just Awaitable[None]. While this works, being more precise would be better:

from typing import Coroutine
fetch_coro: Coroutine[Any, Any, None]

Or simply omit the annotation since the type is inferred from the context.

4. Test Isolation Concerns (Minor)

File: adit/dicom_web/tests/utils/test_wado_retrieve.py:110-146

The race condition tests (test_race_condition and test_sentinel_prevents_race_condition) are valuable but fragile:

  • Rely on specific sleep timings (0.0001, 0.0003)
  • May be flaky on slow CI runners or under load
  • The comment suggests these might be removed

Recommendation: Either:

  1. Keep them but add a comment explaining they demonstrate the concept (not guaranteed to fail/pass 100% of the time)
  2. Use more deterministic testing with mocks/events instead of sleep-based timing

5. Missing Edge Case Tests

Consider adding tests for:

  • What happens if fetch_coro raises an exception immediately?
  • What happens if the queue has items when an exception occurs?
  • Concurrent iteration (though async generators typically aren't thread-safe anyway)

🔒 Security

No security concerns identified. The change doesn't introduce new attack surfaces.

⚡ Performance

Improvement: The sentinel pattern is slightly more efficient than the old asyncio.wait() approach since it avoids creating and canceling tasks in each loop iteration.

Concern: The queue still uses thread-safe operations (call_soon_threadsafe) for each dataset, which is correct but has overhead. This is inherent to the design and not introduced by this PR.

📋 Style & Conventions

  • Follows Google Python Style Guide ✅
  • Proper async/await usage ✅
  • Good docstring on main function ✅
  • Type hints are present ✅

🎯 Recommendations

Must Fix:

  • None (the code works correctly as-is)

Should Consider:

  1. Simplify the TaskGroup usage or remove it entirely (see issue Fully automate dev setup with Gitpod #1)
  2. Improve ExceptionGroup handling for robustness (see issue wip #2)

Nice to Have:

  1. Add edge case tests for exception scenarios
  2. Consider whether the race condition demo tests provide value vs. flakiness risk

Verdict

Approve with minor suggestions

The core fix is sound and the test coverage is excellent. The suggested improvements are refinements rather than blockers. Great work identifying and fixing this subtle concurrency bug!


🤖 Generated with Claude Code

@claude
Copy link

claude bot commented Oct 24, 2025

Code Review for PR #240: Add sentinel to async queue in wado_retrieve

Summary

This PR fixes a race condition in the wado_retrieve function by implementing a sentinel pattern. The fix ensures all datasets are properly retrieved when the fetch task completes before the final queue item can be consumed.


Strengths

  1. Excellent Problem Identification: The PR correctly identifies a subtle race condition where fetch_task could complete before the final queue_get_task is scheduled, causing data loss.

  2. Clean Solution: The sentinel pattern is an elegant and well-established solution for this type of producer-consumer problem.

  3. Comprehensive Test Coverage: The PR includes:

    • Integration test with realistic mocking
    • Exception handling tests for both retriable and non-retriable errors
    • Unit tests demonstrating the race condition and its fix
  4. Modern asyncio Patterns: Good use of asyncio.TaskGroup (Python 3.11+) for structured concurrency.


Issues & Recommendations

🔴 Critical: Exception Handling Bug

Location: adit/dicom_web/utils/wadors_utils.py:85-92

The exception handling logic has a significant flaw:

except ExceptionGroup as eg:
    for exc in eg.exceptions:
        if isinstance(exc, RetriableDicomError):
            raise ServiceUnavailableApiError(str(exc))
        if isinstance(exc, DicomError):
            raise BadGatewayApiError(str(exc))
    raise  # Re-raise if no handled exception found

Problem: When checking isinstance(exc, DicomError), this will match BOTH DicomError and RetriableDicomError (since RetriableDicomError is a subclass of DicomError). The current code works accidentally because the if statements return early, but this creates confusion.

Recommended fix:

except ExceptionGroup as eg:
    for exc in eg.exceptions:
        if isinstance(exc, RetriableDicomError):
            raise ServiceUnavailableApiError(str(exc))
        elif isinstance(exc, DicomError):  # Use elif instead of if
            raise BadGatewayApiError(str(exc))
    raise  # Re-raise if no handled exception found

Additional consideration: The current implementation only handles the first exception in the ExceptionGroup. If multiple exceptions occur, others will be silently ignored. Consider whether this is the intended behavior or if you need to handle multiple exceptions more explicitly.

🟡 Medium: Potential Task Lifecycle Issue

Location: adit/dicom_web/utils/wadors_utils.py:69-77

async def add_sentinel_when_done(task: asyncio.Task[None]) -> None:
    try:
        await task
    finally:
        await queue.put(None)

async with asyncio.TaskGroup() as task_group:
    fetch_task = task_group.create_task(fetch_coro)
    task_group.create_task(add_sentinel_when_done(fetch_task))

Problem: You're creating a task from a coroutine (fetch_coro), then immediately awaiting that task in add_sentinel_when_done. This creates a dependency where the add_sentinel_when_done task won't complete until fetch_task completes.

However, there's a subtle issue: if fetch_task is cancelled (e.g., if the consuming loop breaks early or raises an exception), the await task in add_sentinel_when_done will raise CancelledError, but the finally block will still execute, putting the sentinel in the queue. This is actually the correct behavior for cleanup, but it's worth documenting.

Recommendation: Add a comment explaining the cancellation behavior:

async def add_sentinel_when_done(task: asyncio.Task[None]) -> None:
    """Wait for fetch task to complete and add sentinel to signal end of stream.
    
    The sentinel is added even if the task is cancelled, ensuring the consumer
    loop always terminates cleanly.
    """
    try:
        await task
    finally:
        await queue.put(None)

🟡 Medium: Test Quality Issues

Location: adit/dicom_web/tests/utils/test_wado_retrieve.py:251-325

The mock tests (test_race_condition and test_sentinel_prevents_race_condition) are valuable for demonstrating the issue, but they have concerns:

  1. Timing-dependent: These tests rely on asyncio.sleep with hardcoded delays (0.0001s, 0.0003s). This makes them:

    • Potentially flaky on slower CI runners
    • Not deterministic across different environments
    • Slower than necessary
  2. Comment suggests uncertainty: The docstring says "Can be removed if deemed unnecessary" - if you're not sure they add value, consider removing them. The integration tests already provide good coverage.

Recommendation: Either:

  • Remove these tests and rely on the integration tests
  • Or, make them more deterministic using mocks or by controlling the event loop more explicitly

🟢 Minor: Type Annotation Inconsistency

Location: adit/dicom_web/utils/wadors_utils.py:35

queue = asyncio.Queue[Dataset | None]()

Good use of precise typing! However, verify this is consistent with your project's Python version requirements (Python 3.12+, as seen in pyproject.toml, so this is fine).

🟢 Minor: Import Statement Style

The test file uses:

from adit.dicom_web.utils import wadors_utils

Then references functions as wadors_utils.wado_retrieve. While this works, the more common pattern would be:

from adit.dicom_web.utils.wadors_utils import wado_retrieve

However, importing the module is necessary for monkeypatching, so this is actually correct. No change needed, but worth noting the reasoning.


Performance Considerations

Good: The sentinel pattern adds minimal overhead - just one extra await queue.put(None) call and one extra queue check per invocation.

Good: Removing the asyncio.wait with FIRST_COMPLETED actually improves performance by eliminating polling behavior. The new implementation is cleaner and more efficient.

⚠️ Consider: The TaskGroup will wait for all tasks to complete before exiting. Since add_sentinel_when_done waits for fetch_task, this creates a sequential dependency. This is correct for the use case, but it means the consumer loop must fully drain the queue before the function can return.


Security Considerations

No security concerns identified. The changes don't introduce any new attack surfaces or handle sensitive data differently.


Compliance with Google Python Style Guide

Good adherence to the project's stated style guide
Docstrings: The main function has a docstring, though the new add_sentinel_when_done helper could benefit from one (see recommendation above)
Naming: Clear, descriptive names following Python conventions


Recommendation

Approve with minor changes. The PR solves a real problem and includes good tests. The critical exception handling issue should be fixed before merging, but it's a simple change.

Action Items

  1. Must fix: Change if isinstance(exc, DicomError) to elif
  2. 📝 Should add: Docstring for add_sentinel_when_done explaining cancellation behavior
  3. 🤔 Consider: Remove or make deterministic the mock race condition tests

Great work on identifying and fixing this subtle race condition!

@claude
Copy link

claude bot commented Oct 24, 2025

Code Review: PR #240 - Add sentinel to async queue in wado_retrieve

Summary

This PR fixes a race condition in the wado_retrieve function where datasets could be skipped during async retrieval. The solution uses a sentinel pattern combined with asyncio.TaskGroup to ensure all datasets are processed. Overall, this is a well-designed fix with excellent test coverage.

Strengths

1. Problem Identification & Solution

  • Correctly identifies and fixes a real race condition where fetch_task could complete before the final queue_get_task runs
  • The sentinel pattern is a clean, idiomatic solution for this type of producer-consumer problem
  • Great addition of tests that demonstrate the race condition and verify the fix

2. Code Quality

  • Clean, readable implementation following Python best practices
  • Good use of asyncio.TaskGroup for structured concurrency
  • Proper docstring for add_sentinel_when_done explaining the behavior
  • Type hints are properly updated (Queue[Dataset | None])

3. Test Coverage

  • Comprehensive test suite covering happy path, retriable errors, non-retriable errors, race condition demonstration, and sentinel pattern verification
  • Good use of mocking to isolate the async behavior

Issues & Recommendations

1. Critical: Deadlock Risk in add_sentinel_when_done

Location: adit/dicom_web/utils/wadors_utils.py:69-78

The current implementation awaits fetch_task inside add_sentinel_when_done, which creates a circular dependency. Both tasks are in the same TaskGroup, and if an exception occurs in fetch_task, the TaskGroup will wait for all tasks to complete before propagating the exception. This could cause the consumer loop to block indefinitely waiting for the sentinel.

Recommendation: Don't await the fetch_task directly. Instead, poll for completion or use a simpler pattern without the nested task.

2. Exception Handling Gap

Location: adit/dicom_web/utils/wadors_utils.py:90-97

Issues:

  • Only the first matching exception is raised - other exceptions in the group are silently ignored
  • If multiple exceptions occur, only one is handled
  • Non-DicomError exceptions are re-raised as ExceptionGroup

Recommendation: Use exception chaining (raise ... from eg) and explicitly handle all exception types.

3. Test Improvements

The test_race_condition test uses arbitrary sleep durations that may not reliably reproduce the race on all systems. Consider adding a comment noting this is a demonstration test and may not always reproduce the race consistently.

Performance & Security

  • Performance: Queue overhead is minimal and appropriate for DICOM dataset streaming
  • Security: No security concerns identified

Summary

Recommendation: Request changes before merging

The PR correctly identifies and solves a real concurrency bug, but the current implementation has a potential deadlock issue. The fix itself (sentinel pattern) is sound, but the implementation needs adjustment.

Priority fixes:

  1. High: Resolve the deadlock risk in add_sentinel_when_done
  2. Medium: Improve exception handling to avoid silently dropping exceptions
  3. Low: Add clarifying comments to the race condition test

Great work identifying this subtle race condition! The test demonstrating the problem is particularly valuable.

@samuelvkwong samuelvkwong merged commit 7ee97b6 into main Oct 24, 2025
2 checks passed
@samuelvkwong samuelvkwong deleted the wadors-improve branch October 24, 2025 21:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants