Skip to content

fix: emit OTel-spec structured messages on invoke_agent spans #164

Open
hectorhdzg wants to merge 1 commit into
microsoft:mainfrom
hectorhdzg:fix/langchain-invoke-agent-message-format
Open

fix: emit OTel-spec structured messages on invoke_agent spans #164
hectorhdzg wants to merge 1 commit into
microsoft:mainfrom
hectorhdzg:fix/langchain-invoke-agent-message-format

Conversation

@hectorhdzg
Copy link
Copy Markdown
Member

Fixes #160

  • Use InputMessage/OutputMessage objects from opentelemetry.util.genai.types
  • Add A365 LangChain enricher to convert structured messages back to plain content arrays for A365 backend compatibility
  • Extract shared enricher_utils to a365/core/ (used by AF and LC enrichers)
  • Add comprehensive tests for enricher and extraction utilities

…oft#160)

- Use InputMessage/OutputMessage objects from opentelemetry.util.genai.types
- Add A365 LangChain enricher to convert structured messages back to
  plain content arrays for A365 backend compatibility
- Extract shared enricher_utils to a365/core/ (used by AF and LC enrichers)
- Add comprehensive tests for enricher and extraction utilities
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the LangChain invoke_agent span attributes to emit OpenTelemetry GenAI structured message objects (instead of legacy plain-string arrays / IDs), while adding an A365 export-time enricher to convert structured messages back into the plain-string arrays expected by the A365 backend. It also factors common enricher logic into shared A365 core utilities and adds tests around the new behavior.

Changes:

  • Emit gen_ai.input.messages / gen_ai.output.messages on LangChain invoke_agent spans as OTel-spec structured messages (InputMessage/OutputMessage).
  • Add an A365 LangChain span enricher + shared enricher_utils to down-convert structured messages to plain content arrays for backend compatibility.
  • Add/adjust LangChain tests for agent message extraction, tracer aggregation, and the new enricher utilities.

Reviewed changes

Copilot reviewed 9 out of 10 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
tests/langchain/test_utils.py Adds unit tests for new agent message extraction helpers.
tests/langchain/test_tracer.py Updates expectations for tool output aggregation to structured message objects.
tests/langchain/test_span_enricher.py New tests for the shared enricher utilities and LangChain span enricher behavior.
src/microsoft/opentelemetry/a365/langchain/_span_enricher.py New A365 exporter enricher to convert structured messages to plain-string arrays on invoke_agent spans.
src/microsoft/opentelemetry/a365/langchain/init.py Declares the A365 LangChain package (empty init).
src/microsoft/opentelemetry/a365/core/enricher_utils.py New shared helper(s) for extracting plain text content from structured message JSON.
src/microsoft/opentelemetry/_genai/_langchain/_utils.py Adds agent-level structured message extraction helpers.
src/microsoft/opentelemetry/_genai/_langchain/_tracer.py Switches agent span serialization to structured messages and registers tool outputs as structured messages.
src/microsoft/opentelemetry/_genai/_langchain/_tracer_instrumentor.py Registers/unregisters the A365 LangChain enricher when available.
src/microsoft/opentelemetry/_agent_framework/_utils.py Re-exports message extraction helpers from the new shared A365 core utilities for backwards compatibility.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +32 to +53
try:
messages = json.loads(messages_json)
if isinstance(messages, list):
contents = []
for msg in messages:
if isinstance(msg, dict):
role = msg.get("role", "")

if role_filter and role != role_filter:
continue

parts = msg.get("parts")
if parts and isinstance(parts, list):
for part in parts:
if isinstance(part, dict):
part_type = part.get("type", "")
if part_type == "text" and "content" in part:
contents.append(part["content"])
return json.dumps(contents)
return messages_json
except (json.JSONDecodeError, TypeError):
return messages_json
Comment on lines +1018 to +1044
def _extract_agent_input_messages(
inputs: Mapping[str, Any] | None,
) -> list[InputMessage]:
"""Convert agent-level input messages to OTel ``InputMessage`` list.

Agent runs store messages as a flat list under the ``messages`` key,
unlike LLM runs which nest them as list-of-lists.
"""
if not inputs or not isinstance(inputs, Mapping):
return []
messages = inputs.get("messages")
if not messages or not isinstance(messages, list):
return []
# Handle potential nested lists
if len(messages) > 0 and isinstance(messages[0], list):
messages = messages[0]
results: list[InputMessage] = []
for msg in messages:
role = _langchain_role(msg)
parts: list[Any] = []
content = _langchain_content(msg)
if content:
parts.append(Text(content=content))
parts.extend(_langchain_tool_calls(msg))
if parts:
results.append(InputMessage(role=role, parts=parts))
return results
Comment on lines +1047 to +1075
def _extract_agent_output_messages(
outputs: Mapping[str, Any] | None,
) -> list[OutputMessage]:
"""Convert agent-level output messages to OTel ``OutputMessage`` list.

Agent runs store output as a flat messages list. Extracts the last
assistant/AI message as the agent output.
"""
if not outputs or not isinstance(outputs, Mapping):
return []
messages = outputs.get("messages")
if not messages or not isinstance(messages, list):
return []
# Handle potential nested lists
if len(messages) > 0 and isinstance(messages[0], list):
messages = messages[0]
results: list[OutputMessage] = []
for msg in reversed(messages):
role = _langchain_role(msg)
if role and role.lower() in ("ai", "assistant"):
parts: list[Any] = []
content = _langchain_content(msg)
if content and isinstance(content, str) and content.strip():
parts.append(Text(content=content))
parts.extend(_langchain_tool_calls(msg))
if parts:
results.append(OutputMessage(role=role, parts=parts, finish_reason="stop"))
break
return results
Comment on lines 511 to +537
# Set aggregated input/output messages only when content capture is enabled
if _should_capture_content_on_spans():
if msgs := content.get("input_messages"):
span.set_attribute(GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps(msgs))
span.set_attribute(
GEN_AI_INPUT_MESSAGES_KEY,
safe_json_dumps([asdict(m) for m in msgs]),
)
else:
for _, val in invoke_agent_input_message(run.inputs):
span.set_attribute(GEN_AI_INPUT_MESSAGES_KEY, val)
break
agent_msgs = _extract_agent_input_messages(run.inputs)
if agent_msgs:
span.set_attribute(
GEN_AI_INPUT_MESSAGES_KEY,
safe_json_dumps([asdict(m) for m in agent_msgs]),
)

if msgs := content.get("output_messages"):
span.set_attribute(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(msgs))
if out_msgs := content.get("output_messages"):
span.set_attribute(
GEN_AI_OUTPUT_MESSAGES_KEY,
safe_json_dumps([asdict(m) for m in out_msgs]),
)
else:
for _, val in invoke_agent_output_message(run.outputs):
span.set_attribute(GEN_AI_OUTPUT_MESSAGES_KEY, val)
break
agent_out_msgs = _extract_agent_output_messages(run.outputs)
if agent_out_msgs:
span.set_attribute(
GEN_AI_OUTPUT_MESSAGES_KEY,
safe_json_dumps([asdict(m) for m in agent_out_msgs]),
)
Comment on lines +650 to +707
class TestExtractAgentInputMessages(TestCase):
def test_extracts_structured_human_message(self):
inputs = {"messages": [{"role": "human", "content": "What is 2+2?"}]}
result = _extract_agent_input_messages(inputs)
self.assertEqual(len(result), 1)
self.assertEqual(result[0].role, "human")
self.assertEqual(len(result[0].parts), 1)
self.assertEqual(result[0].parts[0].content, "What is 2+2?")

def test_extracts_multiple_messages(self):
inputs = {
"messages": [
{"role": "system", "content": "You are helpful"},
{"role": "human", "content": "Hello"},
]
}
result = _extract_agent_input_messages(inputs)
self.assertEqual(len(result), 2)
self.assertEqual(result[0].role, "system")
self.assertEqual(result[1].role, "human")

def test_extracts_from_nested_list(self):
inputs = {"messages": [[{"role": "human", "content": "Hello"}]]}
result = _extract_agent_input_messages(inputs)
self.assertEqual(len(result), 1)
self.assertEqual(result[0].parts[0].content, "Hello")

def test_returns_empty_on_none(self):
self.assertEqual(_extract_agent_input_messages(None), [])

def test_returns_empty_on_no_messages(self):
self.assertEqual(_extract_agent_input_messages({"other": "data"}), [])


class TestExtractAgentOutputMessages(TestCase):
def test_extracts_structured_ai_message(self):
outputs = {"messages": [{"role": "ai", "content": "The answer is 4"}]}
result = _extract_agent_output_messages(outputs)
self.assertEqual(len(result), 1)
self.assertEqual(result[0].role, "ai")
self.assertEqual(result[0].parts[0].content, "The answer is 4")
self.assertEqual(result[0].finish_reason, "stop")

def test_returns_empty_on_none(self):
self.assertEqual(_extract_agent_output_messages(None), [])

def test_extracts_last_ai_message(self):
outputs = {
"messages": [
{"role": "ai", "content": "First"},
{"role": "human", "content": "Again"},
{"role": "ai", "content": "Second"},
]
}
result = _extract_agent_output_messages(outputs)
self.assertEqual(len(result), 1)
self.assertEqual(result[0].parts[0].content, "Second")

@JacksonWeber
Copy link
Copy Markdown
Contributor

Copilot comments look legitimate here, once the high severity ones are addressed I can review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

InvokeAgent spans emit non-spec gen_ai input/output message format

3 participants