diff --git a/src/microsoft/opentelemetry/a365/constants.py b/src/microsoft/opentelemetry/a365/constants.py index 3afd4437..1c8acd35 100644 --- a/src/microsoft/opentelemetry/a365/constants.py +++ b/src/microsoft/opentelemetry/a365/constants.py @@ -15,6 +15,7 @@ EXECUTE_TOOL_OPERATION_NAME = "execute_tool" OUTPUT_MESSAGES_OPERATION_NAME = "output_messages" CHAT_OPERATION_NAME = "chat" +APPLY_GUARDRAIL_OPERATION_NAME = "apply_guardrail" # --- OpenTelemetry semantic conventions --- ERROR_TYPE_KEY = "error.type" @@ -109,6 +110,31 @@ CHANNEL_NAME_KEY = "microsoft.channel.name" CHANNEL_LINK_KEY = "microsoft.channel.link" +# --- Guardrail / Security --- +GEN_AI_GUARDIAN_ID_KEY = "microsoft.guardian.id" +GEN_AI_GUARDIAN_NAME_KEY = "microsoft.guardian.name" +GEN_AI_GUARDIAN_PROVIDER_NAME_KEY = "microsoft.guardian.provider.name" +GEN_AI_GUARDIAN_VERSION_KEY = "microsoft.guardian.version" +GEN_AI_SECURITY_DECISION_TYPE_KEY = "microsoft.security.decision.type" +GEN_AI_SECURITY_DECISION_REASON_KEY = "microsoft.security.decision.reason" +GEN_AI_SECURITY_DECISION_CODE_KEY = "microsoft.security.decision.code" +GEN_AI_SECURITY_TARGET_TYPE_KEY = "microsoft.security.target.type" +GEN_AI_SECURITY_TARGET_ID_KEY = "microsoft.security.target.id" +GEN_AI_SECURITY_POLICY_ID_KEY = "microsoft.security.policy.id" +GEN_AI_SECURITY_POLICY_NAME_KEY = "microsoft.security.policy.name" +GEN_AI_SECURITY_POLICY_VERSION_KEY = "microsoft.security.policy.version" +GEN_AI_SECURITY_CONTENT_INPUT_HASH_KEY = "microsoft.security.content.input.hash" +GEN_AI_SECURITY_CONTENT_MODIFIED_KEY = "microsoft.security.content.modified" +GEN_AI_SECURITY_EXTERNAL_EVENT_ID_KEY = "microsoft.security.external_event_id" +GEN_AI_SECURITY_CONTENT_INPUT_VALUE_KEY = "microsoft.security.content.input.value" +GEN_AI_SECURITY_CONTENT_OUTPUT_VALUE_KEY = "microsoft.security.content.output.value" +GEN_AI_SECURITY_FINDING_EVENT_NAME = "microsoft.security.finding" +GEN_AI_SECURITY_RISK_CATEGORY_KEY = "microsoft.security.risk.category" +GEN_AI_SECURITY_RISK_SEVERITY_KEY = "microsoft.security.risk.severity" +GEN_AI_SECURITY_RISK_SCORE_KEY = "microsoft.security.risk.score" +GEN_AI_SECURITY_RISK_METADATA_KEY = "microsoft.security.risk.metadata" +GEN_AI_SECURITY_POLICY_DECISION_TYPE_KEY = "microsoft.security.policy.decision.type" + # --- Telemetry SDK attributes --- TELEMETRY_SDK_NAME_KEY = "telemetry.sdk.name" TELEMETRY_SDK_LANGUAGE_KEY = "telemetry.sdk.language" diff --git a/src/microsoft/opentelemetry/a365/core/__init__.py b/src/microsoft/opentelemetry/a365/core/__init__.py index ca6a4a67..ce9fb582 100644 --- a/src/microsoft/opentelemetry/a365/core/__init__.py +++ b/src/microsoft/opentelemetry/a365/core/__init__.py @@ -4,7 +4,13 @@ # Microsoft Agent 365 Python SDK for OpenTelemetry tracing. from microsoft.opentelemetry.a365.core.agent_details import AgentDetails +from microsoft.opentelemetry.a365.core.apply_guardrail_scope import ApplyGuardrailScope from microsoft.opentelemetry.a365.core.execute_tool_scope import ExecuteToolScope +from microsoft.opentelemetry.a365.core.guardrail_decision_type import GuardrailDecisionType +from microsoft.opentelemetry.a365.core.guardrail_details import GuardrailDetails +from microsoft.opentelemetry.a365.core.guardrail_finding import GuardrailFinding +from microsoft.opentelemetry.a365.core.guardrail_risk_severity import GuardrailRiskSeverity +from microsoft.opentelemetry.a365.core.guardrail_target_type import GuardrailTargetType from microsoft.opentelemetry.a365.core.inference_call_details import InferenceCallDetails from microsoft.opentelemetry.a365.core.models.service_endpoint import ServiceEndpoint from microsoft.opentelemetry.a365.core.inference_operation_type import InferenceOperationType @@ -49,10 +55,17 @@ # Base scope class "OpenTelemetryScope", # Specific scope classes + "ApplyGuardrailScope", "ExecuteToolScope", "InvokeAgentScope", "InferenceScope", "OutputScope", + # Guardrail data classes and constants + "GuardrailDecisionType", + "GuardrailDetails", + "GuardrailFinding", + "GuardrailRiskSeverity", + "GuardrailTargetType", # Middleware "BaggageBuilder", # Data classes diff --git a/src/microsoft/opentelemetry/a365/core/apply_guardrail_scope.py b/src/microsoft/opentelemetry/a365/core/apply_guardrail_scope.py new file mode 100644 index 00000000..4e06cad0 --- /dev/null +++ b/src/microsoft/opentelemetry/a365/core/apply_guardrail_scope.py @@ -0,0 +1,260 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""OpenTelemetry tracing scope for guardrail (security guardian) evaluations.""" + +from __future__ import annotations + +from opentelemetry.trace import SpanKind + +from microsoft.opentelemetry.a365.core.agent_details import AgentDetails +from microsoft.opentelemetry.a365.core.constants import ( + APPLY_GUARDRAIL_OPERATION_NAME, + CHANNEL_LINK_KEY, + CHANNEL_NAME_KEY, + GEN_AI_CALLER_CLIENT_IP_KEY, + GEN_AI_CONVERSATION_ID_KEY, + GEN_AI_GUARDIAN_ID_KEY, + GEN_AI_GUARDIAN_NAME_KEY, + GEN_AI_GUARDIAN_PROVIDER_NAME_KEY, + GEN_AI_GUARDIAN_VERSION_KEY, + GEN_AI_SECURITY_CONTENT_INPUT_HASH_KEY, + GEN_AI_SECURITY_CONTENT_INPUT_VALUE_KEY, + GEN_AI_SECURITY_CONTENT_MODIFIED_KEY, + GEN_AI_SECURITY_CONTENT_OUTPUT_VALUE_KEY, + GEN_AI_SECURITY_DECISION_CODE_KEY, + GEN_AI_SECURITY_DECISION_REASON_KEY, + GEN_AI_SECURITY_DECISION_TYPE_KEY, + GEN_AI_SECURITY_EXTERNAL_EVENT_ID_KEY, + GEN_AI_SECURITY_FINDING_EVENT_NAME, + GEN_AI_SECURITY_POLICY_DECISION_TYPE_KEY, + GEN_AI_SECURITY_POLICY_ID_KEY, + GEN_AI_SECURITY_POLICY_NAME_KEY, + GEN_AI_SECURITY_POLICY_VERSION_KEY, + GEN_AI_SECURITY_RISK_CATEGORY_KEY, + GEN_AI_SECURITY_RISK_METADATA_KEY, + GEN_AI_SECURITY_RISK_SCORE_KEY, + GEN_AI_SECURITY_RISK_SEVERITY_KEY, + GEN_AI_SECURITY_TARGET_ID_KEY, + GEN_AI_SECURITY_TARGET_TYPE_KEY, + USER_EMAIL_KEY, + USER_ID_KEY, + USER_NAME_KEY, +) +from microsoft.opentelemetry.a365.core.guardrail_details import GuardrailDetails +from microsoft.opentelemetry.a365.core.guardrail_finding import GuardrailFinding +from microsoft.opentelemetry.a365.core.message_utils import normalize_input_messages, serialize_messages +from microsoft.opentelemetry.a365.core.models.messages import InputMessagesParam +from microsoft.opentelemetry.a365.core.models.user_details import UserDetails +from microsoft.opentelemetry.a365.core.opentelemetry_scope import OpenTelemetryScope +from microsoft.opentelemetry.a365.core.request import Request +from microsoft.opentelemetry.a365.core.span_details import SpanDetails +from microsoft.opentelemetry.a365.core.utils import validate_and_normalize_ip + + +class ApplyGuardrailScope(OpenTelemetryScope): + """Provides OpenTelemetry tracing scope for security guardrail evaluations. + + Guardian spans SHOULD be children of the operation span they are protecting + (e.g., inference or execute_tool spans). Multiple guardian spans MAY exist + under a single operation span if multiple guardians are chained. + + Example usage:: + + from microsoft.opentelemetry.a365.core import ( + ApplyGuardrailScope, GuardrailDetails, AgentDetails, + GuardrailDecisionType, GuardrailTargetType, GuardrailRiskSeverity, + GuardrailFinding, + ) + + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=GuardrailDecisionType.ALLOW, + guardian_name="Azure Content Safety", + ) + + with ApplyGuardrailScope.start(details, agent_details) as scope: + # ... run guardrail evaluation ... + scope.record_finding(GuardrailFinding( + risk_category="hate_speech", + risk_severity=GuardrailRiskSeverity.HIGH, + risk_score=0.95, + )) + scope.record_decision(GuardrailDecisionType.DENY, "Blocked by policy") + """ + + @staticmethod + def start( + details: GuardrailDetails, + agent_details: AgentDetails, + request: Request | None = None, + user_details: UserDetails | None = None, + span_details: SpanDetails | None = None, + ) -> "ApplyGuardrailScope": + """Create and start a new scope for guardrail evaluation tracing. + + Args: + details: Guardrail evaluation details (required). + agent_details: Agent identity details (required). + request: Optional request context (conversation ID, channel, content). + user_details: Optional human user details. + span_details: Optional span configuration (parent context, timing, kind). + + Returns: + A new ApplyGuardrailScope instance. + """ + return ApplyGuardrailScope(details, agent_details, request, user_details, span_details) + + def __init__( + self, + details: GuardrailDetails, + agent_details: AgentDetails, + request: Request | None = None, + user_details: UserDetails | None = None, + span_details: SpanDetails | None = None, + ): + """Initialize the guardrail scope. + + Args: + details: Guardrail evaluation details. + agent_details: Agent identity details. + request: Optional request context. + user_details: Optional human user details. + span_details: Optional span configuration. + """ + # Default span kind to INTERNAL for guardrail evaluations + resolved_span_details = ( + SpanDetails( + span_kind=span_details.span_kind if span_details and span_details.span_kind else SpanKind.INTERNAL, + parent_context=span_details.parent_context if span_details else None, + start_time=span_details.start_time if span_details else None, + end_time=span_details.end_time if span_details else None, + span_links=span_details.span_links if span_details else None, + ) + if span_details + else SpanDetails(span_kind=SpanKind.INTERNAL) + ) + + super().__init__( + operation_name=APPLY_GUARDRAIL_OPERATION_NAME, + activity_name=self._build_activity_name(details), + agent_details=agent_details, + span_details=resolved_span_details, + ) + + # Set guardrail-specific attributes + self.set_tag_maybe(GEN_AI_SECURITY_TARGET_TYPE_KEY, details.target_type) + self.set_tag_maybe(GEN_AI_SECURITY_DECISION_TYPE_KEY, details.decision_type) + self.set_tag_maybe(GEN_AI_GUARDIAN_ID_KEY, details.guardian_id) + self.set_tag_maybe(GEN_AI_GUARDIAN_NAME_KEY, details.guardian_name) + self.set_tag_maybe(GEN_AI_GUARDIAN_PROVIDER_NAME_KEY, details.guardian_provider_name) + self.set_tag_maybe(GEN_AI_GUARDIAN_VERSION_KEY, details.guardian_version) + self.set_tag_maybe(GEN_AI_SECURITY_TARGET_ID_KEY, details.target_id) + self.set_tag_maybe(GEN_AI_SECURITY_DECISION_REASON_KEY, details.decision_reason) + self.set_tag_maybe(GEN_AI_SECURITY_DECISION_CODE_KEY, details.decision_code) + self.set_tag_maybe(GEN_AI_SECURITY_POLICY_ID_KEY, details.policy_id) + self.set_tag_maybe(GEN_AI_SECURITY_POLICY_NAME_KEY, details.policy_name) + self.set_tag_maybe(GEN_AI_SECURITY_POLICY_VERSION_KEY, details.policy_version) + self.set_tag_maybe(GEN_AI_SECURITY_CONTENT_INPUT_HASH_KEY, details.content_input_hash) + self.set_tag_maybe(GEN_AI_SECURITY_CONTENT_MODIFIED_KEY, details.content_modified) + self.set_tag_maybe(GEN_AI_SECURITY_EXTERNAL_EVENT_ID_KEY, details.external_event_id) + + # Set request context if provided + if request: + self.set_tag_maybe(GEN_AI_CONVERSATION_ID_KEY, request.conversation_id) + if request.channel: + self.set_tag_maybe(CHANNEL_NAME_KEY, request.channel.name) + self.set_tag_maybe(CHANNEL_LINK_KEY, request.channel.link) + + # Set user details if provided + if user_details: + self.set_tag_maybe(USER_ID_KEY, user_details.user_id) + self.set_tag_maybe(USER_EMAIL_KEY, user_details.user_email) + self.set_tag_maybe(USER_NAME_KEY, user_details.user_name) + self.set_tag_maybe( + GEN_AI_CALLER_CLIENT_IP_KEY, + validate_and_normalize_ip(user_details.user_client_ip), + ) + + @staticmethod + def _build_activity_name(details: GuardrailDetails) -> str: + """Build the span display name from guardrail details.""" + if details.guardian_name: + return f"{APPLY_GUARDRAIL_OPERATION_NAME} {details.guardian_name} {details.target_type}" + return f"{APPLY_GUARDRAIL_OPERATION_NAME} {details.target_type}" + + def record_decision(self, decision_type: str, reason: str | None = None) -> None: + """Update the guardrail decision on the span. + + Use this to update the decision mid-flight if the initial decision + changes after evaluation completes. + + Args: + decision_type: The decision type (use GuardrailDecisionType constants). + reason: Optional human-readable reason for the decision. + """ + self.set_tag_maybe(GEN_AI_SECURITY_DECISION_TYPE_KEY, decision_type) + if reason is not None: + self.set_tag_maybe(GEN_AI_SECURITY_DECISION_REASON_KEY, reason) + + def record_content_output(self, output_value: str) -> None: + """Record the sanitized/modified output content (opt-in). + + This is an opt-in field for recording output content after guardrail + processing. Only set this when content capture is explicitly enabled. + + Args: + output_value: The output content string. + """ + self.set_tag_maybe(GEN_AI_SECURITY_CONTENT_OUTPUT_VALUE_KEY, output_value) + + def record_content_input(self, input_value: InputMessagesParam | str) -> None: + """Record the input content being evaluated (opt-in). + + This is an opt-in field for recording input content sent to the + guardrail. Only set this when content capture is explicitly enabled. + + Accepts plain strings or structured ``InputMessages`` containers. + Structured messages are normalized and serialized to a JSON string + before being set as an attribute. + + Args: + input_value: The input content as a string or InputMessagesParam. + """ + if isinstance(input_value, str): + self.set_tag_maybe(GEN_AI_SECURITY_CONTENT_INPUT_VALUE_KEY, input_value) + else: + wrapper = normalize_input_messages(input_value) + self.set_tag_maybe(GEN_AI_SECURITY_CONTENT_INPUT_VALUE_KEY, serialize_messages(wrapper)) + + def record_finding(self, finding: GuardrailFinding) -> None: + """Record a security finding as a span event. + + Each call adds a separate ``microsoft.security.finding`` event to the + span. Multiple findings can be recorded for a single guardrail evaluation. + + Args: + finding: The security finding to record. + """ + if not self._span or not self._is_telemetry_enabled(): + return + + attributes: dict[str, str | float | list[str]] = { + GEN_AI_SECURITY_RISK_CATEGORY_KEY: finding.risk_category, + GEN_AI_SECURITY_RISK_SEVERITY_KEY: finding.risk_severity, + } + + if finding.risk_score is not None: + attributes[GEN_AI_SECURITY_RISK_SCORE_KEY] = finding.risk_score + if finding.risk_metadata is not None: + attributes[GEN_AI_SECURITY_RISK_METADATA_KEY] = finding.risk_metadata + if finding.policy_decision_type is not None: + attributes[GEN_AI_SECURITY_POLICY_DECISION_TYPE_KEY] = finding.policy_decision_type + if finding.policy_id is not None: + attributes[GEN_AI_SECURITY_POLICY_ID_KEY] = finding.policy_id + if finding.policy_name is not None: + attributes[GEN_AI_SECURITY_POLICY_NAME_KEY] = finding.policy_name + if finding.policy_version is not None: + attributes[GEN_AI_SECURITY_POLICY_VERSION_KEY] = finding.policy_version + + self._span.add_event(GEN_AI_SECURITY_FINDING_EVENT_NAME, attributes=attributes) diff --git a/src/microsoft/opentelemetry/a365/core/constants.py b/src/microsoft/opentelemetry/a365/core/constants.py index b5f0410b..17b72aa4 100644 --- a/src/microsoft/opentelemetry/a365/core/constants.py +++ b/src/microsoft/opentelemetry/a365/core/constants.py @@ -8,6 +8,7 @@ EXECUTE_TOOL_OPERATION_NAME = "execute_tool" OUTPUT_MESSAGES_OPERATION_NAME = "output_messages" CHAT_OPERATION_NAME = "chat" +APPLY_GUARDRAIL_OPERATION_NAME = "apply_guardrail" # --- OpenTelemetry semantic conventions --- ERROR_TYPE_KEY = "error.type" @@ -103,6 +104,31 @@ CHANNEL_NAME_KEY = "microsoft.channel.name" CHANNEL_LINK_KEY = "microsoft.channel.link" +# --- Guardrail / Security --- +GEN_AI_GUARDIAN_ID_KEY = "microsoft.guardian.id" +GEN_AI_GUARDIAN_NAME_KEY = "microsoft.guardian.name" +GEN_AI_GUARDIAN_PROVIDER_NAME_KEY = "microsoft.guardian.provider.name" +GEN_AI_GUARDIAN_VERSION_KEY = "microsoft.guardian.version" +GEN_AI_SECURITY_DECISION_TYPE_KEY = "microsoft.security.decision.type" +GEN_AI_SECURITY_DECISION_REASON_KEY = "microsoft.security.decision.reason" +GEN_AI_SECURITY_DECISION_CODE_KEY = "microsoft.security.decision.code" +GEN_AI_SECURITY_TARGET_TYPE_KEY = "microsoft.security.target.type" +GEN_AI_SECURITY_TARGET_ID_KEY = "microsoft.security.target.id" +GEN_AI_SECURITY_POLICY_ID_KEY = "microsoft.security.policy.id" +GEN_AI_SECURITY_POLICY_NAME_KEY = "microsoft.security.policy.name" +GEN_AI_SECURITY_POLICY_VERSION_KEY = "microsoft.security.policy.version" +GEN_AI_SECURITY_CONTENT_INPUT_HASH_KEY = "microsoft.security.content.input.hash" +GEN_AI_SECURITY_CONTENT_MODIFIED_KEY = "microsoft.security.content.modified" +GEN_AI_SECURITY_EXTERNAL_EVENT_ID_KEY = "microsoft.security.external_event_id" +GEN_AI_SECURITY_CONTENT_INPUT_VALUE_KEY = "microsoft.security.content.input.value" +GEN_AI_SECURITY_CONTENT_OUTPUT_VALUE_KEY = "microsoft.security.content.output.value" +GEN_AI_SECURITY_FINDING_EVENT_NAME = "microsoft.security.finding" +GEN_AI_SECURITY_RISK_CATEGORY_KEY = "microsoft.security.risk.category" +GEN_AI_SECURITY_RISK_SEVERITY_KEY = "microsoft.security.risk.severity" +GEN_AI_SECURITY_RISK_SCORE_KEY = "microsoft.security.risk.score" +GEN_AI_SECURITY_RISK_METADATA_KEY = "microsoft.security.risk.metadata" +GEN_AI_SECURITY_POLICY_DECISION_TYPE_KEY = "microsoft.security.policy.decision.type" + # --- Telemetry SDK attributes --- TELEMETRY_SDK_NAME_KEY = "telemetry.sdk.name" TELEMETRY_SDK_LANGUAGE_KEY = "telemetry.sdk.language" diff --git a/src/microsoft/opentelemetry/a365/core/exporters/utils.py b/src/microsoft/opentelemetry/a365/core/exporters/utils.py index decdfb28..eeb295f0 100644 --- a/src/microsoft/opentelemetry/a365/core/exporters/utils.py +++ b/src/microsoft/opentelemetry/a365/core/exporters/utils.py @@ -36,6 +36,7 @@ A365_SERVICE_TENANT_ID_ENV, A365_SUPPRESS_INVOKE_AGENT_INPUT_ENV, A365_USE_S2S_ENDPOINT_ENV, + APPLY_GUARDRAIL_OPERATION_NAME, CHAT_OPERATION_NAME, ENABLE_A365_OBSERVABILITY_EXPORTER, EXECUTE_TOOL_OPERATION_NAME, @@ -61,6 +62,7 @@ EXECUTE_TOOL_OPERATION_NAME, OUTPUT_MESSAGES_OPERATION_NAME, CHAT_OPERATION_NAME, + APPLY_GUARDRAIL_OPERATION_NAME, InferenceOperationType.CHAT.value, } ) diff --git a/src/microsoft/opentelemetry/a365/core/guardrail_decision_type.py b/src/microsoft/opentelemetry/a365/core/guardrail_decision_type.py new file mode 100644 index 00000000..dd74f5f1 --- /dev/null +++ b/src/microsoft/opentelemetry/a365/core/guardrail_decision_type.py @@ -0,0 +1,14 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""String constants for guardrail decision types.""" + + +class GuardrailDecisionType: + """Well-known guardrail decision type values.""" + + ALLOW = "allow" + AUDIT = "audit" + DENY = "deny" + MODIFY = "modify" + WARN = "warn" diff --git a/src/microsoft/opentelemetry/a365/core/guardrail_details.py b/src/microsoft/opentelemetry/a365/core/guardrail_details.py new file mode 100644 index 00000000..85d97e15 --- /dev/null +++ b/src/microsoft/opentelemetry/a365/core/guardrail_details.py @@ -0,0 +1,47 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Data class for guardrail evaluation details.""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass +class GuardrailDetails: + """Immutable input contract describing a guardrail evaluation. + + Attributes: + target_type: What content is being guarded (e.g., "llm_input", "tool_call"). + decision_type: The guardian's decision (e.g., "allow", "deny"). + guardian_name: Human-readable guardian name. + guardian_id: Unique guardian identifier. + guardian_provider_name: Provider name (e.g., "azure.ai.content_safety"). + guardian_version: Guardian version string. + target_id: ID of the targeted content. + decision_reason: Human-readable decision reason. + decision_code: Machine-readable decision code. + policy_id: Triggered policy ID. + policy_name: Triggered policy name. + policy_version: Policy version. + content_input_hash: Hash of input content for forensic correlation. + content_modified: Whether the content was altered by the guardrail. + external_event_id: External event ID for SIEM correlation. + """ + + target_type: str + decision_type: str + guardian_name: str | None = None + guardian_id: str | None = None + guardian_provider_name: str | None = None + guardian_version: str | None = None + target_id: str | None = None + decision_reason: str | None = None + decision_code: str | None = None + policy_id: str | None = None + policy_name: str | None = None + policy_version: str | None = None + content_input_hash: str | None = None + content_modified: bool | None = None + external_event_id: str | None = None diff --git a/src/microsoft/opentelemetry/a365/core/guardrail_finding.py b/src/microsoft/opentelemetry/a365/core/guardrail_finding.py new file mode 100644 index 00000000..91bf6ee9 --- /dev/null +++ b/src/microsoft/opentelemetry/a365/core/guardrail_finding.py @@ -0,0 +1,33 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Data class for a single guardrail security finding.""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass +class GuardrailFinding: + """A single security finding from a guardrail evaluation. + + Attributes: + risk_category: Category of risk detected (e.g., "hate_speech", "pii", "jailbreak"). + risk_severity: Severity level (use GuardrailRiskSeverity constants). + policy_decision_type: Per-finding decision override. + policy_id: Policy that triggered this finding. + policy_name: Policy name. + policy_version: Policy version. + risk_score: Confidence score from 0.0 to 1.0. + risk_metadata: Non-PII structural metadata about the finding. + """ + + risk_category: str + risk_severity: str + policy_decision_type: str | None = None + policy_id: str | None = None + policy_name: str | None = None + policy_version: str | None = None + risk_score: float | None = None + risk_metadata: list[str] | None = None diff --git a/src/microsoft/opentelemetry/a365/core/guardrail_risk_severity.py b/src/microsoft/opentelemetry/a365/core/guardrail_risk_severity.py new file mode 100644 index 00000000..728cdda7 --- /dev/null +++ b/src/microsoft/opentelemetry/a365/core/guardrail_risk_severity.py @@ -0,0 +1,14 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""String constants for guardrail risk severity levels.""" + + +class GuardrailRiskSeverity: + """Well-known risk severity level values.""" + + NONE = "none" + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" diff --git a/src/microsoft/opentelemetry/a365/core/guardrail_target_type.py b/src/microsoft/opentelemetry/a365/core/guardrail_target_type.py new file mode 100644 index 00000000..142bbb07 --- /dev/null +++ b/src/microsoft/opentelemetry/a365/core/guardrail_target_type.py @@ -0,0 +1,21 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""String constants for guardrail target types.""" + + +class GuardrailTargetType: + """Well-known guardrail target type values. + + Users may also supply custom string values not listed here. + """ + + LLM_INPUT = "llm_input" + LLM_OUTPUT = "llm_output" + TOOL_CALL = "tool_call" + TOOL_DEFINITION = "tool_definition" + MEMORY_STORE = "memory_store" + MEMORY_RETRIEVE = "memory_retrieve" + KNOWLEDGE_QUERY = "knowledge_query" + KNOWLEDGE_RESULT = "knowledge_result" + MESSAGE = "message" diff --git a/tests/a365/test_apply_guardrail_scope.py b/tests/a365/test_apply_guardrail_scope.py new file mode 100644 index 00000000..722a9859 --- /dev/null +++ b/tests/a365/test_apply_guardrail_scope.py @@ -0,0 +1,443 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import os +import unittest +from unittest.mock import MagicMock, patch + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.trace import SpanKind + +from microsoft.opentelemetry.a365.core.agent_details import AgentDetails +from microsoft.opentelemetry.a365.core.apply_guardrail_scope import ApplyGuardrailScope +from microsoft.opentelemetry.a365.core.channel import Channel +from microsoft.opentelemetry.a365.core.constants import ( + APPLY_GUARDRAIL_OPERATION_NAME, + CHANNEL_LINK_KEY, + CHANNEL_NAME_KEY, + GEN_AI_CONVERSATION_ID_KEY, + GEN_AI_GUARDIAN_ID_KEY, + GEN_AI_GUARDIAN_NAME_KEY, + GEN_AI_GUARDIAN_PROVIDER_NAME_KEY, + GEN_AI_GUARDIAN_VERSION_KEY, + GEN_AI_OPERATION_NAME_KEY, + GEN_AI_SECURITY_CONTENT_INPUT_HASH_KEY, + GEN_AI_SECURITY_CONTENT_INPUT_VALUE_KEY, + GEN_AI_SECURITY_CONTENT_MODIFIED_KEY, + GEN_AI_SECURITY_CONTENT_OUTPUT_VALUE_KEY, + GEN_AI_SECURITY_DECISION_CODE_KEY, + GEN_AI_SECURITY_DECISION_REASON_KEY, + GEN_AI_SECURITY_DECISION_TYPE_KEY, + GEN_AI_SECURITY_EXTERNAL_EVENT_ID_KEY, + GEN_AI_SECURITY_FINDING_EVENT_NAME, + GEN_AI_SECURITY_POLICY_DECISION_TYPE_KEY, + GEN_AI_SECURITY_POLICY_ID_KEY, + GEN_AI_SECURITY_POLICY_NAME_KEY, + GEN_AI_SECURITY_POLICY_VERSION_KEY, + GEN_AI_SECURITY_RISK_CATEGORY_KEY, + GEN_AI_SECURITY_RISK_METADATA_KEY, + GEN_AI_SECURITY_RISK_SCORE_KEY, + GEN_AI_SECURITY_RISK_SEVERITY_KEY, + GEN_AI_SECURITY_TARGET_ID_KEY, + GEN_AI_SECURITY_TARGET_TYPE_KEY, + USER_EMAIL_KEY, + USER_ID_KEY, + USER_NAME_KEY, +) +from microsoft.opentelemetry.a365.core.exporters.utils import ( + GEN_AI_OPERATION_NAMES, + filter_and_partition_by_identity, +) +from microsoft.opentelemetry.a365.core.guardrail_decision_type import GuardrailDecisionType +from microsoft.opentelemetry.a365.core.guardrail_details import GuardrailDetails +from microsoft.opentelemetry.a365.core.guardrail_finding import GuardrailFinding +from microsoft.opentelemetry.a365.core.guardrail_risk_severity import GuardrailRiskSeverity +from microsoft.opentelemetry.a365.core.guardrail_target_type import GuardrailTargetType +from microsoft.opentelemetry.a365.core.models.user_details import UserDetails +from microsoft.opentelemetry.a365.core.opentelemetry_scope import OpenTelemetryScope +from microsoft.opentelemetry.a365.core.request import Request + + +@patch.dict(os.environ, {"ENABLE_OBSERVABILITY": "true"}) +class TestApplyGuardrailScope(unittest.TestCase): + """Tests for ApplyGuardrailScope.""" + + def setUp(self): + """Set up a real TracerProvider so spans are recorded.""" + self._provider = TracerProvider() + trace.set_tracer_provider(self._provider) + # Reset the cached tracer so the scope picks up the new provider + OpenTelemetryScope._tracer = None + + def tearDown(self): + self._provider.shutdown() + OpenTelemetryScope._tracer = None + + def _make_agent_details(self): + return AgentDetails( + agent_id="agent-123", + agent_name="Test Agent", + tenant_id="tenant-456", + ) + + def _make_guardrail_details(self, **kwargs): + defaults = { + "target_type": GuardrailTargetType.LLM_INPUT, + "decision_type": GuardrailDecisionType.ALLOW, + "guardian_name": "Azure Content Safety", + } + defaults.update(kwargs) + return GuardrailDetails(**defaults) + + def test_span_name_with_guardian_name(self): + """Span name includes guardian name and target type.""" + details = self._make_guardrail_details() + scope = ApplyGuardrailScope.start(details, self._make_agent_details()) + try: + self.assertIsNotNone(scope._span) + self.assertEqual(scope._span.name, "apply_guardrail Azure Content Safety llm_input") + finally: + scope.dispose() + + def test_span_name_without_guardian_name(self): + """Span name falls back to target type only when no guardian name.""" + details = self._make_guardrail_details(guardian_name=None) + scope = ApplyGuardrailScope.start(details, self._make_agent_details()) + try: + self.assertIsNotNone(scope._span) + self.assertEqual(scope._span.name, "apply_guardrail llm_input") + finally: + scope.dispose() + + def test_span_kind_defaults_to_internal(self): + """Span kind defaults to INTERNAL.""" + details = self._make_guardrail_details() + scope = ApplyGuardrailScope.start(details, self._make_agent_details()) + try: + self.assertIsNotNone(scope._span) + self.assertEqual(scope._span.kind, SpanKind.INTERNAL) + finally: + scope.dispose() + + def test_operation_name_attribute(self): + """Span has gen_ai.operation.name = 'apply_guardrail'.""" + details = self._make_guardrail_details() + scope = ApplyGuardrailScope.start(details, self._make_agent_details()) + try: + attrs = dict(scope._span.attributes) + self.assertEqual(attrs[GEN_AI_OPERATION_NAME_KEY], APPLY_GUARDRAIL_OPERATION_NAME) + finally: + scope.dispose() + + def test_guardrail_attributes_set(self): + """All guardrail-specific attributes are set on the span.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.TOOL_CALL, + decision_type=GuardrailDecisionType.DENY, + guardian_name="My Guardian", + guardian_id="guardian-1", + guardian_provider_name="azure.ai.content_safety", + guardian_version="1.0.0", + target_id="target-abc", + decision_reason="Blocked", + decision_code="BLOCKED_HATE", + policy_id="pol-1", + policy_name="Hate Speech Policy", + policy_version="2.0", + content_input_hash="sha256:abc123", + content_modified=True, + external_event_id="evt-999", + ) + scope = ApplyGuardrailScope.start(details, self._make_agent_details()) + try: + attrs = dict(scope._span.attributes) + self.assertEqual(attrs[GEN_AI_SECURITY_TARGET_TYPE_KEY], "tool_call") + self.assertEqual(attrs[GEN_AI_SECURITY_DECISION_TYPE_KEY], "deny") + self.assertEqual(attrs[GEN_AI_GUARDIAN_ID_KEY], "guardian-1") + self.assertEqual(attrs[GEN_AI_GUARDIAN_NAME_KEY], "My Guardian") + self.assertEqual(attrs[GEN_AI_GUARDIAN_PROVIDER_NAME_KEY], "azure.ai.content_safety") + self.assertEqual(attrs[GEN_AI_GUARDIAN_VERSION_KEY], "1.0.0") + self.assertEqual(attrs[GEN_AI_SECURITY_TARGET_ID_KEY], "target-abc") + self.assertEqual(attrs[GEN_AI_SECURITY_DECISION_REASON_KEY], "Blocked") + self.assertEqual(attrs[GEN_AI_SECURITY_DECISION_CODE_KEY], "BLOCKED_HATE") + self.assertEqual(attrs[GEN_AI_SECURITY_POLICY_ID_KEY], "pol-1") + self.assertEqual(attrs[GEN_AI_SECURITY_POLICY_NAME_KEY], "Hate Speech Policy") + self.assertEqual(attrs[GEN_AI_SECURITY_POLICY_VERSION_KEY], "2.0") + self.assertEqual(attrs[GEN_AI_SECURITY_CONTENT_INPUT_HASH_KEY], "sha256:abc123") + self.assertEqual(attrs[GEN_AI_SECURITY_CONTENT_MODIFIED_KEY], True) + self.assertEqual(attrs[GEN_AI_SECURITY_EXTERNAL_EVENT_ID_KEY], "evt-999") + finally: + scope.dispose() + + def test_request_context_attributes(self): + """Request context sets conversation_id and channel attributes.""" + details = self._make_guardrail_details() + request = Request( + conversation_id="conv-1", + channel=Channel(name="teams", link="https://teams.example.com/conv-1"), + ) + scope = ApplyGuardrailScope.start(details, self._make_agent_details(), request=request) + try: + attrs = dict(scope._span.attributes) + self.assertEqual(attrs[GEN_AI_CONVERSATION_ID_KEY], "conv-1") + self.assertEqual(attrs[CHANNEL_NAME_KEY], "teams") + self.assertEqual(attrs[CHANNEL_LINK_KEY], "https://teams.example.com/conv-1") + finally: + scope.dispose() + + def test_user_details_attributes(self): + """User details are set on the span.""" + details = self._make_guardrail_details() + user = UserDetails(user_id="user-1", user_email="user@example.com", user_name="Test User") + scope = ApplyGuardrailScope.start(details, self._make_agent_details(), user_details=user) + try: + attrs = dict(scope._span.attributes) + self.assertEqual(attrs[USER_ID_KEY], "user-1") + self.assertEqual(attrs[USER_EMAIL_KEY], "user@example.com") + self.assertEqual(attrs[USER_NAME_KEY], "Test User") + finally: + scope.dispose() + + def test_record_decision_updates_attributes(self): + """record_decision updates the decision type and reason.""" + details = self._make_guardrail_details(decision_type=GuardrailDecisionType.ALLOW) + scope = ApplyGuardrailScope.start(details, self._make_agent_details()) + try: + scope.record_decision(GuardrailDecisionType.DENY, "Content blocked") + attrs = dict(scope._span.attributes) + self.assertEqual(attrs[GEN_AI_SECURITY_DECISION_TYPE_KEY], "deny") + self.assertEqual(attrs[GEN_AI_SECURITY_DECISION_REASON_KEY], "Content blocked") + finally: + scope.dispose() + + def test_record_content_output(self): + """record_content_output sets the output value attribute.""" + details = self._make_guardrail_details() + scope = ApplyGuardrailScope.start(details, self._make_agent_details()) + try: + scope.record_content_output("sanitized output") + attrs = dict(scope._span.attributes) + self.assertEqual(attrs[GEN_AI_SECURITY_CONTENT_OUTPUT_VALUE_KEY], "sanitized output") + finally: + scope.dispose() + + def test_record_content_input(self): + """record_content_input sets the input value attribute.""" + details = self._make_guardrail_details() + scope = ApplyGuardrailScope.start(details, self._make_agent_details()) + try: + scope.record_content_input("user message content") + attrs = dict(scope._span.attributes) + self.assertEqual(attrs[GEN_AI_SECURITY_CONTENT_INPUT_VALUE_KEY], "user message content") + finally: + scope.dispose() + + def test_record_content_input_structured_messages(self): + """record_content_input serializes structured InputMessages to JSON string.""" + from microsoft.opentelemetry.a365.core.models.messages import ( + ChatMessage, + InputMessages, + MessageRole, + TextPart, + ) + + details = self._make_guardrail_details() + scope = ApplyGuardrailScope.start(details, self._make_agent_details()) + try: + messages = InputMessages(messages=[ChatMessage(role=MessageRole.USER, parts=[TextPart(content="Hello")])]) + scope.record_content_input(messages) + attrs = dict(scope._span.attributes) + value = attrs[GEN_AI_SECURITY_CONTENT_INPUT_VALUE_KEY] + self.assertIsInstance(value, str) + self.assertIn("Hello", value) + finally: + scope.dispose() + + def test_record_finding_adds_event(self): + """record_finding adds a security finding event to the span.""" + details = self._make_guardrail_details() + scope = ApplyGuardrailScope.start(details, self._make_agent_details()) + try: + finding = GuardrailFinding( + risk_category="hate_speech", + risk_severity=GuardrailRiskSeverity.HIGH, + risk_score=0.95, + risk_metadata=["token_index:42"], + policy_decision_type=GuardrailDecisionType.DENY, + policy_id="pol-1", + policy_name="Hate Policy", + policy_version="1.0", + ) + scope.record_finding(finding) + + events = scope._span.events + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, GEN_AI_SECURITY_FINDING_EVENT_NAME) + event_attrs = dict(event.attributes) + self.assertEqual(event_attrs[GEN_AI_SECURITY_RISK_CATEGORY_KEY], "hate_speech") + self.assertEqual(event_attrs[GEN_AI_SECURITY_RISK_SEVERITY_KEY], "high") + self.assertAlmostEqual(event_attrs[GEN_AI_SECURITY_RISK_SCORE_KEY], 0.95) + self.assertEqual(event_attrs[GEN_AI_SECURITY_RISK_METADATA_KEY], ("token_index:42",)) + self.assertEqual(event_attrs[GEN_AI_SECURITY_POLICY_DECISION_TYPE_KEY], "deny") + self.assertEqual(event_attrs[GEN_AI_SECURITY_POLICY_ID_KEY], "pol-1") + self.assertEqual(event_attrs[GEN_AI_SECURITY_POLICY_NAME_KEY], "Hate Policy") + self.assertEqual(event_attrs[GEN_AI_SECURITY_POLICY_VERSION_KEY], "1.0") + finally: + scope.dispose() + + def test_record_multiple_findings(self): + """Multiple findings can be recorded on a single span.""" + details = self._make_guardrail_details() + scope = ApplyGuardrailScope.start(details, self._make_agent_details()) + try: + scope.record_finding(GuardrailFinding("pii", GuardrailRiskSeverity.MEDIUM)) + scope.record_finding(GuardrailFinding("jailbreak", GuardrailRiskSeverity.CRITICAL)) + + events = scope._span.events + self.assertEqual(len(events), 2) + self.assertEqual(dict(events[0].attributes)[GEN_AI_SECURITY_RISK_CATEGORY_KEY], "pii") + self.assertEqual(dict(events[1].attributes)[GEN_AI_SECURITY_RISK_CATEGORY_KEY], "jailbreak") + finally: + scope.dispose() + + def test_context_manager_usage(self): + """Scope works as a context manager.""" + details = self._make_guardrail_details() + with ApplyGuardrailScope.start(details, self._make_agent_details()) as scope: + self.assertIsNotNone(scope._span) + scope.record_decision(GuardrailDecisionType.ALLOW) + # Span should be ended after context exit + self.assertTrue(scope._has_ended) + + def test_optional_fields_not_set_when_none(self): + """Optional None fields are not set as attributes.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=GuardrailDecisionType.ALLOW, + ) + scope = ApplyGuardrailScope.start(details, self._make_agent_details()) + try: + attrs = dict(scope._span.attributes) + self.assertNotIn(GEN_AI_GUARDIAN_ID_KEY, attrs) + self.assertNotIn(GEN_AI_GUARDIAN_NAME_KEY, attrs) + self.assertNotIn(GEN_AI_GUARDIAN_PROVIDER_NAME_KEY, attrs) + self.assertNotIn(GEN_AI_SECURITY_POLICY_ID_KEY, attrs) + self.assertNotIn(GEN_AI_SECURITY_CONTENT_INPUT_HASH_KEY, attrs) + self.assertNotIn(GEN_AI_SECURITY_CONTENT_MODIFIED_KEY, attrs) + self.assertNotIn(GEN_AI_SECURITY_EXTERNAL_EVENT_ID_KEY, attrs) + finally: + scope.dispose() + + +class TestGuardrailExporterEligibility(unittest.TestCase): + """Tests that apply_guardrail spans are eligible for export.""" + + def test_apply_guardrail_in_operation_names(self): + """apply_guardrail is in the exporter operation name allowlist.""" + self.assertIn(APPLY_GUARDRAIL_OPERATION_NAME, GEN_AI_OPERATION_NAMES) + + def test_filter_partition_includes_guardrail_spans(self): + """filter_and_partition_by_identity includes apply_guardrail spans.""" + span = MagicMock() + span.attributes = { + "gen_ai.operation.name": "apply_guardrail", + "microsoft.tenant.id": "tenant-1", + "gen_ai.agent.id": "agent-1", + } + + groups = filter_and_partition_by_identity([span]) + self.assertIn(("tenant-1", "agent-1"), groups) + self.assertEqual(len(groups[("tenant-1", "agent-1")]), 1) + + def test_filter_partition_excludes_non_eligible_spans(self): + """Non-eligible operation names are filtered out.""" + guardrail_span = MagicMock() + guardrail_span.attributes = { + "gen_ai.operation.name": "apply_guardrail", + "microsoft.tenant.id": "tenant-1", + "gen_ai.agent.id": "agent-1", + } + http_span = MagicMock() + http_span.attributes = { + "gen_ai.operation.name": "http_request", + "microsoft.tenant.id": "tenant-1", + "gen_ai.agent.id": "agent-1", + } + + groups = filter_and_partition_by_identity([guardrail_span, http_span]) + spans_in_group = groups[("tenant-1", "agent-1")] + self.assertEqual(len(spans_in_group), 1) + self.assertEqual(spans_in_group[0], guardrail_span) + + +class TestGuardrailDataClasses(unittest.TestCase): + """Tests for guardrail data class constants.""" + + def test_decision_type_values(self): + self.assertEqual(GuardrailDecisionType.ALLOW, "allow") + self.assertEqual(GuardrailDecisionType.AUDIT, "audit") + self.assertEqual(GuardrailDecisionType.DENY, "deny") + self.assertEqual(GuardrailDecisionType.MODIFY, "modify") + self.assertEqual(GuardrailDecisionType.WARN, "warn") + + def test_risk_severity_values(self): + self.assertEqual(GuardrailRiskSeverity.NONE, "none") + self.assertEqual(GuardrailRiskSeverity.LOW, "low") + self.assertEqual(GuardrailRiskSeverity.MEDIUM, "medium") + self.assertEqual(GuardrailRiskSeverity.HIGH, "high") + self.assertEqual(GuardrailRiskSeverity.CRITICAL, "critical") + + def test_target_type_values(self): + self.assertEqual(GuardrailTargetType.LLM_INPUT, "llm_input") + self.assertEqual(GuardrailTargetType.LLM_OUTPUT, "llm_output") + self.assertEqual(GuardrailTargetType.TOOL_CALL, "tool_call") + self.assertEqual(GuardrailTargetType.TOOL_DEFINITION, "tool_definition") + self.assertEqual(GuardrailTargetType.MEMORY_STORE, "memory_store") + self.assertEqual(GuardrailTargetType.MEMORY_RETRIEVE, "memory_retrieve") + self.assertEqual(GuardrailTargetType.KNOWLEDGE_QUERY, "knowledge_query") + self.assertEqual(GuardrailTargetType.KNOWLEDGE_RESULT, "knowledge_result") + self.assertEqual(GuardrailTargetType.MESSAGE, "message") + + def test_guardrail_details_required_fields(self): + details = GuardrailDetails(target_type="llm_input", decision_type="allow") + self.assertEqual(details.target_type, "llm_input") + self.assertEqual(details.decision_type, "allow") + self.assertIsNone(details.guardian_name) + + def test_guardrail_finding_required_fields(self): + finding = GuardrailFinding(risk_category="pii", risk_severity="high") + self.assertEqual(finding.risk_category, "pii") + self.assertEqual(finding.risk_severity, "high") + self.assertIsNone(finding.risk_score) + self.assertIsNone(finding.risk_metadata) + + +@patch.dict(os.environ, {"ENABLE_OBSERVABILITY": "false"}) +class TestApplyGuardrailScopeDisabled(unittest.TestCase): + """Tests that scope is a no-op when telemetry is disabled.""" + + def test_no_span_when_disabled(self): + """No span is created when telemetry is disabled.""" + # Reset the enabled_by_distro flag + OpenTelemetryScope._enabled_by_distro = False + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=GuardrailDecisionType.ALLOW, + ) + agent_details = AgentDetails(agent_id="a", tenant_id="t") + scope = ApplyGuardrailScope.start(details, agent_details) + try: + self.assertIsNone(scope._span) + # These should not raise + scope.record_decision(GuardrailDecisionType.DENY, "blocked") + scope.record_content_output("output") + scope.record_content_input("input") + scope.record_finding(GuardrailFinding("pii", "high")) + finally: + scope.dispose() + + +if __name__ == "__main__": + unittest.main()