Skip to content

bug: Workflow tasks bypass <code>concurrent_limiter</code> entirely #305

@hariom888

Description

@hariom888

Root Cause

Both workflows.py::_run_workflow() and routes.py::run_workflow_once() call executor.execute_task(task_id) directly without acquiring a concurrency slot:

# workflows.py line 76
task_id = await executor.create_task(plugin_id, inputs, ...)
asyncio.create_task(executor.execute_task(task_id))   # ← no acquire()

routes.py line 984

asyncio.create_task(executor.execute_task(task_id)) # ← no acquire()

The start_task route correctly calls concurrent_limiter.acquire(task_id) before scheduling. Workflow-triggered tasks skip this entirely, meaning a workflow with 10 steps launches 10 concurrent scans regardless of max_concurrent. This also means execute_task's finally block calls concurrent_limiter.release(task_id) for a slot that was never acquired — a silent no-op that masks the problem.

Minimal Fix

In both locations, acquire before scheduling and handle failure:

# workflows.py and routes.py — same pattern
can_acquire, _ = await concurrent_limiter.acquire(task_id)
if not can_acquire:
    await executor.mark_task_failed(task_id, reason="Concurrency limit reached")
    continue  # skip scheduling this step
asyncio.create_task(executor.execute_task(task_id))

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions