Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/conductor/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,24 @@ class AgentDef(BaseModel):
workflow: ./research-pipeline.yaml
"""

input_mapping: dict[str, str] | None = None
"""Optional mapping of sub-workflow input names to Jinja2 expressions.

Each key is a sub-workflow input parameter name. Each value is a Jinja2
template expression evaluated against the parent workflow's context.

When present, the rendered values are passed as the sub-workflow's inputs
instead of forwarding the parent's workflow.input.* values.

Only valid for type='workflow' agents.

Example::

input_mapping:
work_item_id: "{{ task_manager.output.current_issue_id }}"
title: "{{ task_manager.output.current_issue_title }}"
"""

max_session_seconds: float | None = Field(None, ge=1.0)
"""Maximum wall-clock duration for this agent's session in seconds.

Expand Down Expand Up @@ -529,6 +547,8 @@ def validate_agent_type(self) -> AgentDef:
raise ValueError("human_gate agents require 'options'")
if not self.prompt:
raise ValueError("human_gate agents require 'prompt'")
if self.input_mapping is not None:
raise ValueError("human_gate agents cannot have 'input_mapping'")
elif self.type == "script":
if not self.command:
raise ValueError("script agents require 'command'")
Expand All @@ -555,6 +575,8 @@ def validate_agent_type(self) -> AgentDef:
raise ValueError("script agents cannot have 'max_agent_iterations'")
if self.retry is not None:
raise ValueError("script agents cannot have 'retry'")
if self.input_mapping is not None:
raise ValueError("script agents cannot have 'input_mapping'")
elif self.type == "workflow":
if not self.workflow:
raise ValueError("workflow agents require 'workflow' path")
Expand All @@ -578,6 +600,13 @@ def validate_agent_type(self) -> AgentDef:
raise ValueError("workflow agents cannot have 'max_agent_iterations'")
if self.retry is not None:
raise ValueError("workflow agents cannot have 'retry'")
else:
# Regular agent or human_gate — input_mapping is not valid
if self.input_mapping is not None:
raise ValueError(
f"'{self.type or 'agent'}' agents cannot have 'input_mapping' "
"(only workflow agents support input_mapping)"
)
return self


Expand Down
26 changes: 21 additions & 5 deletions src/conductor/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,11 +545,27 @@ async def _execute_subworkflow(
) from exc

# Build sub-workflow inputs from the parent context
# Extract workflow.input.* values from the parent context
workflow_ctx = context.get("workflow", {})
sub_inputs: dict[str, Any] = (
dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {}
)
sub_inputs: dict[str, Any]
if agent.input_mapping is not None:
# Dynamic inputs: render each Jinja2 expression against parent context
renderer = TemplateRenderer()
sub_inputs = {}
for key, template_expr in agent.input_mapping.items():
try:
rendered = renderer.render(template_expr, context)
except Exception as e:
raise ExecutionError(
f"Failed to render input_mapping key '{key}' for agent '{agent.name}': {e}",
suggestion=f"Check that the expression '{template_expr}' "
"references valid context variables.",
) from e
sub_inputs[key] = rendered
else:
# Default: forward parent's workflow.input.* values
workflow_ctx = context.get("workflow", {})
sub_inputs = (
dict(workflow_ctx.get("input", {})) if isinstance(workflow_ctx, dict) else {}
)

# Create child engine inheriting provider/registry but with deeper depth
child_engine = WorkflowEngine(
Expand Down
55 changes: 55 additions & 0 deletions tests/test_config/test_workflow_type_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,58 @@ def test_workflow_with_routes_to_agents(self) -> None:
# Should not raise
warnings = validate_workflow_config(config)
assert isinstance(warnings, list)


class TestInputMapping:
"""Tests for input_mapping on workflow agents."""

def test_valid_input_mapping(self) -> None:
"""Test that input_mapping is accepted on workflow agents."""
agent = AgentDef(
name="sub_wf",
type="workflow",
workflow="./sub.yaml",
input_mapping={
"work_item_id": "{{ intake.output.epic_id }}",
"title": "{{ intake.output.epic_title }}",
},
)
assert agent.input_mapping is not None
assert len(agent.input_mapping) == 2

def test_workflow_without_input_mapping(self) -> None:
"""Test that workflow agents work without input_mapping (backward compat)."""
agent = AgentDef(name="sub_wf", type="workflow", workflow="./sub.yaml")
assert agent.input_mapping is None

def test_input_mapping_on_regular_agent_raises(self) -> None:
"""Test that input_mapping on a regular agent raises ValidationError."""
with pytest.raises(ValidationError, match="input_mapping"):
AgentDef(
name="regular",
prompt="do something",
input_mapping={"key": "{{ value }}"},
)

def test_input_mapping_on_human_gate_raises(self) -> None:
"""Test that input_mapping on a human_gate raises ValidationError."""
with pytest.raises(ValidationError, match="input_mapping"):
AgentDef(
name="gate",
type="human_gate",
prompt="Choose",
options=[
GateOption(label="Yes", value="yes", route="next"),
],
input_mapping={"key": "{{ value }}"},
)

def test_input_mapping_on_script_raises(self) -> None:
"""Test that input_mapping on a script agent raises ValidationError."""
with pytest.raises(ValidationError, match="input_mapping"):
AgentDef(
name="script",
type="script",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important: No runtime tests for the core feature

These 5 schema tests are good, but there are zero tests for the 34 new runtime lines in workflow.py. Missing coverage:

  1. Jinja2 rendering of input_mapping expressions against parent context
  2. JSON type coercion behavior ("42" → int, "true" → bool)
  3. Error when Jinja2 references an undefined variable
  4. Parent context injection and namespace collision with child agents
  5. End-to-end: parent output → input_mapping → child sub-workflow receives correct inputs
  6. Backward compat: no input_mapping → existing behavior preserved

Recommend adding these in tests/test_engine/test_subworkflow.py using the existing mock provider pattern.

command="echo hi",
input_mapping={"key": "{{ value }}"},
)
Loading
Loading