Skip to content

feat: display batch node progress as items complete#20

Merged
spinje merged 5 commits into
mainfrom
feat/batch-progress-display
Dec 29, 2025
Merged

feat: display batch node progress as items complete#20
spinje merged 5 commits into
mainfrom
feat/batch-progress-display

Conversation

@spinje
Copy link
Copy Markdown
Owner

@spinje spinje commented Dec 29, 2025

Summary

Add real-time progress updates for batch node execution, showing item completion status as they happen instead of blocking silently until all items complete.

Before:

  convert-sections... ✓ 24.9s

After:

  convert-sections... 1/8 ✓
  convert-sections... 2/8 ✓     (updates in place via \r)
  convert-sections... 3/8 ✗     (shows failures)
  ...
  convert-sections... 8/8 ✓ 24.9s

Changes

  • output_controller.py: Add _handle_batch_progress() method using carriage return for in-place line updates; extend _handle_node_complete() to handle batch nodes (only shows timing, not duplicate checkmark)
  • batch_node.py: Call progress callback after each item in _exec_sequential() and _collect_parallel_results()
  • instrumented_wrapper.py: Detect batch nodes via batch_metadata in output and pass is_batch, batch_total, batch_success_count to completion callback
  • Tests: Add TestBatchProgressCallbacks (6 tests) and TestBatchProgressDisplay (8 tests)
 src/pflow/core/output_controller.py      |  71 ++++++++++--
 src/pflow/runtime/batch_node.py          |  57 +++++++++-
 src/pflow/runtime/instrumented_wrapper.py|  15 ++-
 tests/test_core/test_output_controller.py| 193 +++++++++++++++++++++++++++++++
 tests/test_runtime/test_batch_node.py    | 185 +++++++++++++++++++++++++++++
 5 files changed, 508 insertions(+), 13 deletions(-)

Testing

All 3677 tests pass:

make test

Fixes #17

…nput parameter

Introduces two new features:
1. A `mode: "reduce"` option for batch processing, allowing sequential chained operations where each iteration's output becomes the next iteration's input, addressing the need for fold/accumulate patterns.
2. A `files` parameter for the shell node, enabling safe data passing by writing template data to temporary files, thus avoiding shell escaping issues with special characters.
Add real-time progress updates for batch node execution showing
item completion status as they happen:

  convert-sections... 1/8 ✓
  convert-sections... 2/8 ✓   (updates in place)
  convert-sections... 8/8 ✓ 24.9s

Changes:
- Add batch_progress event to OutputController with carriage return
  line updates
- Batch node calls progress callback after each item in both
  sequential and parallel modes
- InstrumentedNodeWrapper detects batch nodes via batch_metadata
  and passes is_batch flag to completion callback
- Add comprehensive tests for batch progress callbacks

Fixes #17
Resolve conflict in template_validator.py - use more specific type annotation
from main branch (dict[str, dict[str, Any]] instead of dict[str, Any])
@claude
Copy link
Copy Markdown

claude Bot commented Dec 29, 2025

Code Review - PR #20: Batch Node Progress Display

This PR adds real-time progress updates for batch processing. I've reviewed the implementation and have the following feedback organized by priority.


Critical — must fix before merge

None! The implementation is solid with no critical issues found.


⚠️ Warnings — should be addressed

1. Missing Progress Callback Handler in OutputController

Location: src/pflow/core/output_controller.py:82-105

The _handle_batch_progress() method is defined, but I don't see where it's being called. The callback signature in create_progress_callback() needs to be updated to route batch_progress events to this handler.

Expected fix:

# In create_progress_callback() method
def _callback(node_id, event, *args, **kwargs):
    if event == "node_start":
        self._handle_node_start(node_id, indent)
    elif event == "batch_progress":
        # Add this handler
        self._handle_batch_progress(
            node_id, 
            indent, 
            kwargs.get('batch_current'),
            kwargs.get('batch_total'),
            kwargs.get('batch_success')
        )
    elif event == "node_complete":
        # ... existing code

Without this routing, the beautiful progress display code won't actually be invoked.


2. Type Coercion May Hide Schema Validation Bugs

Location: src/pflow/runtime/batch_node.py:134-209

The defensive type coercion for batch config (_coerce_bool, _coerce_int, _coerce_float) is well-intentioned, but it may mask schema validation failures upstream. If invalid types are reaching the batch node, that's a symptom of a broken validation layer, not something runtime should silently fix.

Recommendation:

  • Keep the coercion for now (defense-in-depth is good)
  • Add a TODO/comment explaining this is temporary until schema validation is strengthened
  • Consider elevating the warnings to errors in a future release

Why this matters: Silent coercion can hide bugs. "true"True feels harmless, but "5.2"5 for max_concurrent could cause confusion.


3. JSON Auto-Parsing Security Check Could Be Bypassed

Location: src/pflow/runtime/batch_node.py:248-269

The 10MB security limit is checked AFTER trimming whitespace:

trimmed = items.strip()  # Could be huge
if len(trimmed) > MAX_JSON_SIZE:  # Check happens here

If items is a 100MB string with no whitespace, .strip() will process the entire string before the size check runs, potentially causing memory issues.

Fix:

# Check size BEFORE any processing
if isinstance(items, str):
    if len(items) > MAX_JSON_SIZE:
        logger.warning(...)
        # Don't process further
    else:
        trimmed = items.strip()
        if trimmed.startswith("["):
            # ... parse

4. Batch Progress Missing from Non-Interactive Mode Check

Location: src/pflow/core/output_controller.py:82-105

The _handle_batch_progress() method doesn't check is_interactive(). If called in non-interactive mode (e.g., piped output), the carriage return escape codes would pollute stdout.

Fix:

def _handle_batch_progress(self, node_id, indent, batch_current, batch_total, batch_success):
    # Only show progress in interactive mode
    if not self.is_interactive():
        return
    
    status = click.style("✓", fg="green") if batch_success else click.style("✗", fg="red")
    click.echo(f"\r{indent}  {node_id}... {batch_current}/{batch_total} {status}", err=True, nl=False)

💡 Suggestions — optional improvements

1. Document the Progress Callback Protocol

The callback signature has evolved to support multiple event types (node_start, batch_progress, node_complete) with varying kwargs. Consider adding a docstring or type protocol to document the expected signature:

# In output_controller.py or a shared types file
from typing import Protocol, Literal

class ProgressCallback(Protocol):
    def __call__(
        self,
        node_id: str,
        event: Literal["node_start", "batch_progress", "node_complete"],
        *args,
        **kwargs
    ) -> None:
        ...

This would make it easier for future developers to understand the contract.


2. Consider Batch Progress Throttling for Large Batches

For batches with 10,000+ items, firing a callback after every single item might cause performance issues (even with contextlib.suppress). Consider throttling updates:

# Only report progress every N items or every 100ms
if completed_count % max(1, total // 100) == 0 or (time.perf_counter() - last_update) > 0.1:
    callback(...)
    last_update = time.perf_counter()

This would keep the UX smooth while reducing overhead for massive batches.


3. Test Coverage: Missing Integration Test for Progress Display

The test suite has excellent unit tests (TestBatchProgressCallbacks, TestBatchProgressDisplay), but I don't see an integration test that verifies the full chain:

  • Batch node calls callback with batch_progress
  • OutputController routes it to _handle_batch_progress()
  • Terminal shows in-place updates

Consider adding one end-to-end test that mocks click.echo and verifies the carriage return behavior.


4. Minor: Inconsistent Naming

batch_success in the callback parameters vs error is None in the implementation. Consider renaming the parameter to item_succeeded for clarity:

callback(
    self.node_id,
    "batch_progress",
    duration_ms,
    depth,
    batch_current=idx + 1,
    batch_total=total,
    item_succeeded=(error is None),  # Clearer than batch_success
)

🎉 What I Liked

  1. Excellent bugfix documentation: The bugfix-log.md entries are exemplary - detailed, actionable, with reproduction steps. This is gold for future debugging.

  2. Defense-in-depth philosophy: JSON auto-parsing following the proven node_wrapper.py pattern shows good architectural consistency.

  3. Thread-safe progress reporting: Using contextlib.suppress(Exception) around callback invocations prevents callback bugs from breaking batch execution.

  4. Comprehensive test coverage: 25 new tests for JSON parsing and type coercion show thoroughness.

  5. Clear separation of concerns: Progress reporting is cleanly separated from business logic.


📊 Test Quality Assessment

Unit tests: ✅ Excellent

  • TestBatchProgressCallbacks - 6 tests
  • TestBatchProgressDisplay - 8 tests
  • TestItemsJsonAutoParsing - 10 tests
  • TestConfigTypeCoercion - 15 tests

What's tested well:

  • ✅ Progress callback invocation per item
  • ✅ Sequential and parallel modes
  • ✅ Error handling during progress reporting
  • ✅ JSON string auto-parsing edge cases
  • ✅ Type coercion for all config fields

What could be improved:

  • ⚠️ Missing end-to-end test verifying click.echo carriage return behavior
  • ⚠️ No test for non-interactive mode (should NOT show progress)
  • ⚠️ No test for very large batches (10k+ items) to verify performance

🔒 Security Assessment

No security concerns - the 10MB JSON limit is appropriate (though needs the early check fix mentioned above).


🏗️ Architectural Quality

Score: 8.5/10

Strengths:

  • Follows established patterns (JSON parsing from node_wrapper.py)
  • Clean separation between batch logic and display logic
  • Properly uses callbacks for loose coupling
  • Defensive programming with graceful degradation

Weaknesses:

  • Type coercion masks upstream validation issues (but documented in bugfix log)
  • Progress callback protocol could be more explicit (type hints/protocol class)

📝 Summary

This is a well-crafted PR with strong fundamentals. The main issue is the missing event routing in OutputController.create_progress_callback() - the _handle_batch_progress() method won't be called without it. Fix that plus the non-interactive mode check, and this is ready to merge.

The defensive type coercion and JSON auto-parsing are pragmatic solutions to real problems, well-documented in the bugfix log. The test coverage is excellent for unit tests but could use one integration test.

Recommendation: Fix the two warnings (missing event handler routing + non-interactive check), then merge. The suggestions are nice-to-haves for future iterations.

Great work on the comprehensive bugfix documentation - it sets a high standard for the project! 🎯

@spinje
Copy link
Copy Markdown
Owner Author

spinje commented Dec 29, 2025

Response to Code Review

Thanks for the thorough review! I've carefully analyzed each point. Here's my assessment:


⚠️ Warning 1: Missing Progress Callback Handler

Status: Already Implemented ✅

The routing exists at output_controller.py:249-251:

elif event == "batch_progress":
    if batch_current is not None and batch_total is not None and batch_success is not None:
        self._handle_batch_progress(node_id, indent, batch_current, batch_total, batch_success)

The event dispatching was added as part of this PR.


⚠️ Warning 2: Type Coercion May Hide Schema Validation Bugs

Status: Pre-existing code, not part of this PR

The _coerce_bool, _coerce_int, _coerce_float methods existed before this PR (added in Task 96). This is intentional defensive programming - when workflows come from JSON or shell commands, values often arrive as strings ("true" instead of true). The coercion makes pflow more robust to real-world inputs.

Valid concern for future discussion, but out of scope for this PR.


⚠️ Warning 3: JSON Auto-Parsing Security Check

Status: Pre-existing code, not part of this PR

The JSON auto-parsing code (lines 248-269) was added in Task 96 to fix #13. The suggestion to check size before .strip() is a micro-optimization - by the time code reaches that point, Python has already allocated the string in memory. The 10MB limit prevents accidental parsing of huge data, not a security boundary against untrusted input.


⚠️ Warning 4: Non-Interactive Mode Check

Status: Already Handled ✅

The callback is only created in interactive mode (output_controller.py:197-198):

def create_progress_callback(self) -> Optional[Callable]:
    if not self.is_interactive():
        return None  # Returns None in non-interactive mode

In non-interactive mode, __progress_callback__ is None, and batch_node.py checks if callable(callback) before calling. No carriage returns will ever reach non-interactive output.


Summary

Warning Assessment
#1 Missing routing ✅ Already implemented
#2 Type coercion ⚠️ Pre-existing code (Task 96)
#3 JSON security ⚠️ Pre-existing code (Task 96)
#4 Non-interactive ✅ Already handled

The PR is ready to merge. It closes #17 (Display batch node progress as items complete).

@spinje spinje merged commit a366da0 into main Dec 29, 2025
6 of 7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Display batch node progress as items complete

1 participant