Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,6 @@ env/

# Python package management
uv.lock

# Internal files
internal_examples/
5 changes: 5 additions & 0 deletions examples/basic/agents_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
InputGuardrailTripwireTriggered,
OutputGuardrailTripwireTriggered,
Runner,
SQLiteSession,
)
from agents.run import RunConfig

Expand Down Expand Up @@ -50,6 +51,9 @@

async def main() -> None:
"""Main input loop for the customer support agent with input/output guardrails."""
# Create a session for the agent to store the conversation history
session = SQLiteSession("guardrails-session")

# Create agent with guardrails automatically configured from pipeline configuration
AGENT = GuardrailAgent(
config=PIPELINE_CONFIG,
Expand All @@ -65,6 +69,7 @@ async def main() -> None:
AGENT,
user_input,
run_config=RunConfig(tracing_disabled=True),
session=session,
)
print(f"Assistant: {result.final_output}")
except EOFError:
Expand Down
13 changes: 13 additions & 0 deletions src/guardrails/_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .runtime import load_pipeline_bundles
from .types import GuardrailLLMContextProto, GuardrailResult
from .utils.context import validate_guardrail_context
from .utils.conversation import append_assistant_response, normalize_conversation

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -257,6 +258,18 @@ def _instantiate_all_guardrails(self) -> dict[str, list]:
guardrails[stage_name] = instantiate_guardrails(stage, default_spec_registry) if stage else []
return guardrails

def _normalize_conversation(self, payload: Any) -> list[dict[str, Any]]:
"""Normalize arbitrary conversation payloads."""
return normalize_conversation(payload)

def _conversation_with_response(
self,
conversation: list[dict[str, Any]],
response: Any,
) -> list[dict[str, Any]]:
"""Append the assistant response to a normalized conversation."""
return append_assistant_response(conversation, response)

def _validate_context(self, context: Any) -> None:
"""Validate context against all guardrails."""
for stage_guardrails in self.guardrails.values():
Expand Down
47 changes: 43 additions & 4 deletions src/guardrails/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ._base_client import GuardrailsResponse
from .exceptions import GuardrailTripwireTriggered
from .types import GuardrailResult
from .utils.conversation import merge_conversation_with_items

logger = logging.getLogger(__name__)

Expand All @@ -25,6 +26,7 @@ async def _stream_with_guardrails(
llm_stream: Any, # coroutine or async iterator of OpenAI chunks
preflight_results: list[GuardrailResult],
input_results: list[GuardrailResult],
conversation_history: list[dict[str, Any]] | None = None,
Copy link

Copilot AI Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The union syntax list[dict[str, Any]] | None requires Python 3.10+. Consider using Optional[list[dict[str, Any]]] for broader compatibility.

Copilot uses AI. Check for mistakes.
check_interval: int = 100,
suppress_tripwire: bool = False,
) -> AsyncIterator[GuardrailsResponse]:
Expand All @@ -46,7 +48,16 @@ async def _stream_with_guardrails(
# Run output guardrails periodically
if chunk_count % check_interval == 0:
try:
await self._run_stage_guardrails("output", accumulated_text, suppress_tripwire=suppress_tripwire)
history = merge_conversation_with_items(
conversation_history or [],
[{"role": "assistant", "content": accumulated_text}],
)
await self._run_stage_guardrails(
"output",
accumulated_text,
conversation_history=history,
suppress_tripwire=suppress_tripwire,
)
except GuardrailTripwireTriggered:
# Clear accumulated output and re-raise
accumulated_text = ""
Expand All @@ -57,7 +68,16 @@ async def _stream_with_guardrails(

# Final output check
if accumulated_text:
await self._run_stage_guardrails("output", accumulated_text, suppress_tripwire=suppress_tripwire)
history = merge_conversation_with_items(
conversation_history or [],
[{"role": "assistant", "content": accumulated_text}],
)
await self._run_stage_guardrails(
"output",
accumulated_text,
conversation_history=history,
suppress_tripwire=suppress_tripwire,
)
# Note: This final result won't be yielded since stream is complete
# but the results are available in the last chunk

Expand All @@ -66,6 +86,7 @@ def _stream_with_guardrails_sync(
llm_stream: Any, # iterator of OpenAI chunks
preflight_results: list[GuardrailResult],
input_results: list[GuardrailResult],
conversation_history: list[dict[str, Any]] | None = None,
Copy link

Copilot AI Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The union syntax list[dict[str, Any]] | None requires Python 3.10+. Consider using Optional[list[dict[str, Any]]] for broader compatibility.

Copilot uses AI. Check for mistakes.
check_interval: int = 100,
suppress_tripwire: bool = False,
):
Expand All @@ -83,7 +104,16 @@ def _stream_with_guardrails_sync(
# Run output guardrails periodically
if chunk_count % check_interval == 0:
try:
self._run_stage_guardrails("output", accumulated_text, suppress_tripwire=suppress_tripwire)
history = merge_conversation_with_items(
conversation_history or [],
[{"role": "assistant", "content": accumulated_text}],
)
self._run_stage_guardrails(
"output",
accumulated_text,
conversation_history=history,
suppress_tripwire=suppress_tripwire,
)
except GuardrailTripwireTriggered:
# Clear accumulated output and re-raise
accumulated_text = ""
Expand All @@ -94,6 +124,15 @@ def _stream_with_guardrails_sync(

# Final output check
if accumulated_text:
self._run_stage_guardrails("output", accumulated_text, suppress_tripwire=suppress_tripwire)
history = merge_conversation_with_items(
conversation_history or [],
[{"role": "assistant", "content": accumulated_text}],
)
self._run_stage_guardrails(
"output",
accumulated_text,
conversation_history=history,
suppress_tripwire=suppress_tripwire,
)
# Note: This final result won't be yielded since stream is complete
# but the results are available in the last chunk
Loading
Loading