Skip to content

Add Line Callback Support to SubprocessExecutor #27

@dugshub

Description

@dugshub

Priority: Medium Effort: 2-3 hours Component: cli_patterns.execution.subprocess_executor.SubprocessExecutor


Description

Add optional line-by-line callback support to SubprocessExecutor.run() to enable real-time parsing of subprocess output. This allows CLI applications to update progress displays dynamically as operations execute, rather than waiting for command completion.

Use Case: Tools like dbt deps, npm install, pip install output progress line-by-line. Applications need to parse these lines in real-time to show:

  • Package installation progress
  • Model compilation status
  • Test execution results
  • Live progress indicators

Current Limitation: SubprocessExecutor streams output to console but provides no interception mechanism. Applications can only access output after completion via CommandResult.stdout/stderr.


Proposed Enhancement

Add optional line_callback parameter to run() method:

from typing import Callable, Optional, Union

LineCallback = Callable[[str], None]

class SubprocessExecutor:
    async def run(
        self,
        command: Union[str, list[str]],
        timeout: Optional[float] = None,
        cwd: Optional[str] = None,
        env: Optional[dict[str, str]] = None,
        allow_shell_features: bool = False,
        line_callback: Optional[LineCallback] = None,  # NEW
    ) -> CommandResult:
        """
        Execute command with optional line-by-line callback.

        Args:
            command: Command to execute
            timeout: Command timeout in seconds
            cwd: Working directory
            env: Environment variables
            allow_shell_features: Enable shell features (security warning)
            line_callback: Optional callback invoked for each output line
                - Receives one line at a time (stripped, no trailing newline)
                - Called for both stdout and stderr lines
                - Should not block (executed inline)

        Returns:
            CommandResult with exit code and captured output (same as before)
        """

Implementation Requirements

Core Behavior:

  • Call callback for each line as it's received (don't wait for completion)
  • Combine stdout and stderr streams (chronological order preferred)
  • Strip trailing newlines before passing to callback
  • Preserve all output in final CommandResult.stdout/stderr (existing behavior)
  • Backward compatible: when callback=None, behave exactly as current version

Optional Enhancements:

  • Strip ANSI color codes before callback (recommended for parsing)
  • Provide separate callbacks for stdout vs stderr (if needed)
  • Pass metadata (stream type, timestamp) in callback

Performance:

  • Callback execution should not significantly slow down subprocess
  • Avoid buffering entire output before invoking callback
  • Handle high-frequency output (e.g., 100+ lines/sec) gracefully

Example Usage

Basic Progress Parsing:

executor = SubprocessExecutor()

packages_installed = []

def on_line(line: str):
    """Parse package installation lines."""
    if "Installing" in line:
        # Extract package name
        package = line.split("Installing ")[-1]
        packages_installed.append(package)
        print(f"📦 {package}")

result = await executor.run(
    ["dbt", "deps"],
    line_callback=on_line
)

print(f"Installed {len(packages_installed)} packages")

Live Progress Display:

from rich.live import Live
from rich.text import Text

with Live() as live:
    current_operation = Text()

    def on_line(line: str):
        current_operation.clear()
        current_operation.append(line[:60])  # Truncate long lines
        live.update(current_operation)

    await executor.run(
        ["npm", "install"],
        line_callback=on_line
    )

Implementation Approach

Option 1: Inline Callback (Simpler)

In existing _stream_output or similar method:

async def _stream_output(self, process, callback: Optional[LineCallback] = None):
    """Stream and optionally callback for each line."""
    async for line in process.stdout:
        decoded = line.decode().rstrip()

        # Invoke callback if provided
        if callback:
            callback(decoded)

        # Stream to console (existing behavior)
        self.console.print(decoded)

        # Capture for CommandResult (existing)
        stdout_lines.append(decoded)

Option 2: Async Callback (More Flexible)

Allow async callbacks for I/O-bound operations:

line_callback: Optional[Union[Callable[[str], None], Callable[[str], Awaitable[None]]]] = None

# In implementation:
if callback:
    if asyncio.iscoroutinefunction(callback):
        await callback(decoded)
    else:
        callback(decoded)

Testing Requirements

Unit Tests:

  • Callback invoked for each line of output
  • Callback receives stripped lines (no trailing newline)
  • Both stdout and stderr trigger callback
  • CommandResult unchanged when callback provided
  • Backward compatible (callback=None works as before)
  • Callback exceptions don't crash subprocess
  • ANSI codes stripped before callback (if implemented)

Integration Tests:

  • Real subprocess with multi-line output
  • High-frequency output (100+ lines)
  • Mixed stdout/stderr output
  • Long-running command with streaming output

Edge Cases:

  • Empty output
  • Binary output (non-UTF8)
  • Extremely long lines (>10KB)
  • Callback raises exception

Acceptance Criteria

  • line_callback parameter added to SubprocessExecutor.run()
  • Callback invoked for each line as received (real-time)
  • Trailing newlines stripped from lines before callback
  • CommandResult unchanged (all output still captured)
  • Backward compatible (existing code works without changes)
  • Unit tests cover callback functionality
  • Integration test with real subprocess
  • Documentation updated (docstrings + examples)
  • Type hints complete (mypy passes)
  • No performance regression when callback=None

Documentation Updates

Docstring Example:

async def run(...) -> CommandResult:
    """
    Execute command with themed output streaming.

    ...existing docs...

    Args:
        line_callback: Optional callback invoked for each output line.
            Receives stripped line text (stdout/stderr combined).
            Useful for parsing progress indicators or updating displays.

    Examples:
        # Parse package installations
        packages = []
        def track_package(line: str):
            if "Installing" in line:
                packages.append(line.split()[-1])

        result = await executor.run(
            ["pip", "install", "requests"],
            line_callback=track_package
        )
        print(f"Installed: {packages}")
    """

Risks & Mitigations

Risk Mitigation
Callback blocks subprocess Document "don't block" requirement, add timeout warning
Callback exceptions crash run Wrap callback in try/except, log errors
Performance impact Benchmark with/without callback, optimize if needed
ANSI codes break parsers Strip ANSI before callback (optional but recommended)

Future Enhancements

After initial implementation:

  • Separate stdout/stderr callbacks
  • Metadata in callback (stream type, timestamp)
  • Async callback support
  • Built-in ANSI stripping toggle
  • Line filtering (only callback certain patterns)

Related

Consumer Projects:

  • python-dbt-cli (will use this for progress parsing)
  • Other CLI tools using SubprocessExecutor

Similar Implementations:

  • asyncio.create_subprocess_exec (inspiration)
  • subprocess.Popen with line iteration
  • click.echo with real-time updates

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions