From cb2dbdb3a1024116be7727903b6fe5d85892ec20 Mon Sep 17 00:00:00 2001 From: Steven C Date: Mon, 3 Nov 2025 17:06:01 -0500 Subject: [PATCH 1/6] Use Presidio for masking --- docs/ref/checks/competitors.md | 4 +- docs/ref/checks/custom_prompt_check.md | 4 +- docs/ref/checks/hallucination_detection.md | 4 +- docs/ref/checks/jailbreak.md | 4 +- docs/ref/checks/keywords.md | 4 +- docs/ref/checks/moderation.md | 4 +- docs/ref/checks/nsfw.md | 4 +- docs/ref/checks/off_topic_prompts.md | 4 +- docs/ref/checks/prompt_injection_detection.md | 4 +- docs/ref/checks/secret_keys.md | 4 +- docs/ref/checks/urls.md | 6 +- pyproject.toml | 1 + src/guardrails/_base_client.py | 215 +++++++++++------- .../checks/text/hallucination_detection.py | 3 - src/guardrails/checks/text/keywords.py | 1 - src/guardrails/checks/text/llm_base.py | 5 - src/guardrails/checks/text/moderation.py | 1 - src/guardrails/checks/text/pii.py | 58 +++-- .../checks/text/prompt_injection_detection.py | 2 - src/guardrails/checks/text/secret_keys.py | 1 - src/guardrails/checks/text/urls.py | 1 - tests/unit/checks/test_keywords.py | 1 - tests/unit/test_base_client.py | 158 +++++++++++-- 23 files changed, 331 insertions(+), 162 deletions(-) diff --git a/docs/ref/checks/competitors.md b/docs/ref/checks/competitors.md index 919d21e..27d7651 100644 --- a/docs/ref/checks/competitors.md +++ b/docs/ref/checks/competitors.md @@ -30,11 +30,9 @@ Returns a `GuardrailResult` with the following `info` dictionary: { "guardrail_name": "Competitor Detection", "competitors_found": ["competitor1"], - "checked_competitors": ["competitor1", "rival-company.com"], - "checked_text": "Original input text" + "checked_competitors": ["competitor1", "rival-company.com"] } ``` - **`competitors_found`**: List of competitors detected in the text - **`checked_competitors`**: List of competitors that were configured for detection -- **`checked_text`**: Original input text diff --git a/docs/ref/checks/custom_prompt_check.md b/docs/ref/checks/custom_prompt_check.md index d21b194..a8512ff 100644 --- a/docs/ref/checks/custom_prompt_check.md +++ b/docs/ref/checks/custom_prompt_check.md @@ -35,12 +35,10 @@ Returns a `GuardrailResult` with the following `info` dictionary: "guardrail_name": "Custom Prompt Check", "flagged": true, "confidence": 0.85, - "threshold": 0.7, - "checked_text": "Original input text" + "threshold": 0.7 } ``` - **`flagged`**: Whether the custom validation criteria were met - **`confidence`**: Confidence score (0.0 to 1.0) for the validation - **`threshold`**: The confidence threshold that was configured -- **`checked_text`**: Original input text diff --git a/docs/ref/checks/hallucination_detection.md b/docs/ref/checks/hallucination_detection.md index a73e9b3..0ad63f4 100644 --- a/docs/ref/checks/hallucination_detection.md +++ b/docs/ref/checks/hallucination_detection.md @@ -113,8 +113,7 @@ Returns a `GuardrailResult` with the following `info` dictionary: "hallucination_type": "factual_error", "hallucinated_statements": ["Our premium plan costs $299/month"], "verified_statements": ["We offer customer support"], - "threshold": 0.7, - "checked_text": "Our premium plan costs $299/month and we offer customer support" + "threshold": 0.7 } ``` @@ -125,7 +124,6 @@ Returns a `GuardrailResult` with the following `info` dictionary: - **`hallucinated_statements`**: Specific statements that are contradicted or unsupported - **`verified_statements`**: Statements that are supported by your documents - **`threshold`**: The confidence threshold that was configured -- **`checked_text`**: Original input text Tip: `hallucination_type` is typically one of `factual_error`, `unsupported_claim`, or `none`. diff --git a/docs/ref/checks/jailbreak.md b/docs/ref/checks/jailbreak.md index ca58dfb..b493f22 100644 --- a/docs/ref/checks/jailbreak.md +++ b/docs/ref/checks/jailbreak.md @@ -56,15 +56,13 @@ Returns a `GuardrailResult` with the following `info` dictionary: "guardrail_name": "Jailbreak", "flagged": true, "confidence": 0.85, - "threshold": 0.7, - "checked_text": "Original input text" + "threshold": 0.7 } ``` - **`flagged`**: Whether a jailbreak attempt was detected - **`confidence`**: Confidence score (0.0 to 1.0) for the detection - **`threshold`**: The confidence threshold that was configured -- **`checked_text`**: Original input text ## Related checks diff --git a/docs/ref/checks/keywords.md b/docs/ref/checks/keywords.md index 440fb32..bc2b354 100644 --- a/docs/ref/checks/keywords.md +++ b/docs/ref/checks/keywords.md @@ -25,11 +25,9 @@ Returns a `GuardrailResult` with the following `info` dictionary: { "guardrail_name": "Keyword Filter", "matched": ["confidential", "secret"], - "checked": ["confidential", "secret", "internal only"], - "checked_text": "This is confidential information that should be kept secret" + "checked": ["confidential", "secret", "internal only"] } ``` - **`matched`**: List of keywords found in the text - **`checked`**: List of keywords that were configured for detection -- **`checked_text`**: Original input text diff --git a/docs/ref/checks/moderation.md b/docs/ref/checks/moderation.md index 597b65a..2a7b590 100644 --- a/docs/ref/checks/moderation.md +++ b/docs/ref/checks/moderation.md @@ -57,12 +57,10 @@ Returns a `GuardrailResult` with the following `info` dictionary: "violence": 0.12, "self-harm": 0.08, "sexual": 0.03 - }, - "checked_text": "Original input text" + } } ``` - **`flagged`**: Whether any category violation was detected - **`categories`**: Boolean flags for each category indicating violations - **`category_scores`**: Confidence scores (0.0 to 1.0) for each category -- **`checked_text`**: Original input text diff --git a/docs/ref/checks/nsfw.md b/docs/ref/checks/nsfw.md index 2341096..da6acfb 100644 --- a/docs/ref/checks/nsfw.md +++ b/docs/ref/checks/nsfw.md @@ -44,15 +44,13 @@ Returns a `GuardrailResult` with the following `info` dictionary: "guardrail_name": "NSFW Text", "flagged": true, "confidence": 0.85, - "threshold": 0.7, - "checked_text": "Original input text" + "threshold": 0.7 } ``` - **`flagged`**: Whether NSFW content was detected - **`confidence`**: Confidence score (0.0 to 1.0) for the detection - **`threshold`**: The confidence threshold that was configured -- **`checked_text`**: Original input text ### Examples diff --git a/docs/ref/checks/off_topic_prompts.md b/docs/ref/checks/off_topic_prompts.md index cf31999..75297f5 100644 --- a/docs/ref/checks/off_topic_prompts.md +++ b/docs/ref/checks/off_topic_prompts.md @@ -35,12 +35,10 @@ Returns a `GuardrailResult` with the following `info` dictionary: "guardrail_name": "Off Topic Prompts", "flagged": false, "confidence": 0.85, - "threshold": 0.7, - "checked_text": "Original input text" + "threshold": 0.7 } ``` - **`flagged`**: Whether the content aligns with your business scope - **`confidence`**: Confidence score (0.0 to 1.0) for the prompt injection detection assessment - **`threshold`**: The confidence threshold that was configured -- **`checked_text`**: Original input text diff --git a/docs/ref/checks/prompt_injection_detection.md b/docs/ref/checks/prompt_injection_detection.md index a9f01af..8aac17f 100644 --- a/docs/ref/checks/prompt_injection_detection.md +++ b/docs/ref/checks/prompt_injection_detection.md @@ -73,8 +73,7 @@ Returns a `GuardrailResult` with the following `info` dictionary: "name": "get_weather", "arguments": "{'location': 'Tokyo'}" } - ], - "checked_text": "[{'role': 'user', 'content': 'What is the weather in Tokyo?'}]" + ] } ``` @@ -84,7 +83,6 @@ Returns a `GuardrailResult` with the following `info` dictionary: - **`threshold`**: The confidence threshold that was configured - **`user_goal`**: The tracked user intent from conversation - **`action`**: The list of function calls or tool outputs analyzed for alignment -- **`checked_text`**: Serialized conversation history inspected during analysis ## Benchmark Results diff --git a/docs/ref/checks/secret_keys.md b/docs/ref/checks/secret_keys.md index eb7a917..a3eaf6f 100644 --- a/docs/ref/checks/secret_keys.md +++ b/docs/ref/checks/secret_keys.md @@ -34,10 +34,8 @@ Returns a `GuardrailResult` with the following `info` dictionary: ```json { "guardrail_name": "Secret Keys", - "detected_secrets": ["sk-abc123...", "Bearer xyz789..."], - "checked_text": "Original input text" + "detected_secrets": ["sk-abc123...", "Bearer xyz789..."] } ``` - **`detected_secrets`**: List of potential secrets detected in the text -- **`checked_text`**: Original input text (unchanged) diff --git a/docs/ref/checks/urls.md b/docs/ref/checks/urls.md index a2c99e1..25e7047 100644 --- a/docs/ref/checks/urls.md +++ b/docs/ref/checks/urls.md @@ -64,8 +64,7 @@ Returns a `GuardrailResult` with the following `info` dictionary: "detected": ["https://example.com", "https://user:pass@malicious.com"], "allowed": ["https://example.com"], "blocked": ["https://user:pass@malicious.com"], - "blocked_reasons": ["https://user:pass@malicious.com: Contains userinfo (potential credential injection)"], - "checked_text": "Visit https://example.com or login at https://user:pass@malicious.com" + "blocked_reasons": ["https://user:pass@malicious.com: Contains userinfo (potential credential injection)"] } ``` @@ -76,5 +75,4 @@ Returns a `GuardrailResult` with the following `info` dictionary: - **`detected`**: All URLs detected in the text using regex patterns - **`allowed`**: URLs that passed all security checks and allow list validation - **`blocked`**: URLs that were blocked due to security policies or allow list restrictions -- **`blocked_reasons`**: Detailed explanations for why each URL was blocked -- **`checked_text`**: Original input text that was scanned \ No newline at end of file +- **`blocked_reasons`**: Detailed explanations for why each URL was blocked \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 7f5fb70..09a5b69 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ dependencies = [ "openai-agents>=0.3.3", "pip>=25.0.1", "presidio-analyzer>=2.2.360", + "presidio-anonymizer>=2.2.360", ] classifiers = [ "Typing :: Typed", diff --git a/src/guardrails/_base_client.py b/src/guardrails/_base_client.py index a599a3d..a225855 100644 --- a/src/guardrails/_base_client.py +++ b/src/guardrails/_base_client.py @@ -97,13 +97,13 @@ def _content_to_text(content) -> str: if isinstance(part, dict): part_type = part.get("type") text_val = part.get("text", "") - if part_type in {"input_text", "text", "output_text", "summary_text"} and isinstance(text_val, str): + if part_type in {"text", "input_text", "output_text"} and isinstance(text_val, str): parts.append(text_val) else: # Object-like content part ptype = getattr(part, "type", None) ptext = getattr(part, "text", "") - if ptype in {"input_text", "text", "output_text", "summary_text"} and isinstance(ptext, str): + if ptype in {"text", "input_text", "output_text"} and isinstance(ptext, str): parts.append(ptext) return " ".join(parts).strip() return "" @@ -153,99 +153,160 @@ def _apply_preflight_modifications( preflight_results: Results from pre-flight guardrails Returns: - Modified data with pre-flight changes applied + Modified data with PII masking applied if PII was detected """ if not preflight_results: return data - # Get PII mappings from preflight results for individual text processing - pii_mappings = {} + # Look specifically for PII guardrail results with actual modifications + pii_result = None for result in preflight_results: - if "detected_entities" in result.info: - detected = result.info["detected_entities"] - for entity_type, entities in detected.items(): - for entity in entities: - # Map original PII to masked token - pii_mappings[entity] = f"<{entity_type}>" - - if not pii_mappings: - return data + # Only PII guardrail modifies text - check name first (faster) + if result.info.get("guardrail_name") == "Contains PII" and result.info.get("pii_detected"): + pii_result = result + break # PII is the only guardrail that modifies text - def _mask_text(text: str) -> str: - """Apply PII masking to individual text with robust replacement.""" - if not isinstance(text, str): - return text - - masked_text = text + # If no PII modifications were made, return original data + if pii_result is None: + return data - # Sort PII entities by length (longest first) to avoid partial replacements - # (shouldn't need this as Presidio should handle this, but just in case) - sorted_pii = sorted(pii_mappings.items(), key=lambda x: len(x[0]), reverse=True) + # Apply PII-masked text to data + if isinstance(data, str): + # Simple case: string input (Responses API) + checked_text = pii_result.info.get("checked_text") + return checked_text if checked_text is not None else data - for original_pii, masked_token in sorted_pii: - if original_pii in masked_text: - # Use replace() which handles special characters safely - masked_text = masked_text.replace(original_pii, masked_token) + # Complex case: message list (Chat API) + _, latest_user_idx = self._extract_latest_user_message(data) + if latest_user_idx == -1: + return data - return masked_text + # Get current content + current_content = ( + data[latest_user_idx]["content"] + if isinstance(data[latest_user_idx], dict) + else getattr(data[latest_user_idx], "content", None) + ) - if isinstance(data, str): - # Handle string input (for responses API) - return _mask_text(data) - else: - # Handle message list input (primarily for chat API and structured Responses API) - _, latest_user_idx = self._extract_latest_user_message(data) - if latest_user_idx == -1: + # Apply PII-masked text based on content type + if isinstance(current_content, str): + # Plain string content - replace with masked version + checked_text = pii_result.info.get("checked_text") + if checked_text is None: return data + return self._update_message_content(data, latest_user_idx, checked_text) + + if isinstance(current_content, list): + # Structured content - mask each text part individually using Presidio + return self._apply_pii_masking_to_structured_content(data, pii_result, latest_user_idx, current_content) + + # Unknown content type, return unchanged + return data + + def _update_message_content( + self, data: list[dict[str, str]], user_idx: int, new_content: Any + ) -> list[dict[str, str]]: + """Update message content at the specified index. - # Use shallow copy for efficiency - we only modify the content field of one message - modified_messages = data.copy() - - # Extract current content safely - current_content = ( - data[latest_user_idx]["content"] if isinstance(data[latest_user_idx], dict) else getattr(data[latest_user_idx], "content", None) - ) - - # Apply modifications based on content type - if isinstance(current_content, str): - # Plain string content - mask individually - modified_content = _mask_text(current_content) - elif isinstance(current_content, list): - # Structured content - mask each text part individually - modified_content = [] - for part in current_content: - if isinstance(part, dict): - part_type = part.get("type") - if part_type in {"input_text", "text", "output_text", "summary_text"} and "text" in part: - # Mask this specific text part individually - original_text = part["text"] - masked_text = _mask_text(original_text) - modified_content.append({**part, "text": masked_text}) - else: - # Keep non-text parts unchanged - modified_content.append(part) - else: - # Keep unknown parts unchanged - modified_content.append(part) + Args: + data: Message list + user_idx: Index of message to update + new_content: New content value + + Returns: + Modified message list or original if update fails + """ + modified_messages = data.copy() + try: + if isinstance(modified_messages[user_idx], dict): + modified_messages[user_idx] = { + **modified_messages[user_idx], + "content": new_content, + } else: - # Unknown content type - skip modifications - return data + modified_messages[user_idx].content = new_content + except Exception: + return data + return modified_messages + + def _apply_pii_masking_to_structured_content( + self, + data: list[dict[str, str]], + pii_result: GuardrailResult, + user_idx: int, + current_content: list, + ) -> list[dict[str, str]]: + """Apply PII masking to structured content parts using Presidio. + + Args: + data: Message list with structured content + pii_result: PII guardrail result containing detected entities + user_idx: Index of the user message to modify + current_content: The structured content list (already extracted) - # Only modify the specific message that needs content changes - if modified_content != current_content: - if isinstance(modified_messages[latest_user_idx], dict): - modified_messages[latest_user_idx] = { - **modified_messages[latest_user_idx], - "content": modified_content, - } + Returns: + Modified messages with PII masking applied to each text part + """ + from presidio_anonymizer import AnonymizerEngine + from presidio_anonymizer.entities import OperatorConfig + + # Extract detected entity types + detected = pii_result.info.get("detected_entities", {}) + if not detected: + return data + + # Get Presidio engines - entity types are guaranteed valid from detection + from .checks.text.pii import _get_analyzer_engine + + analyzer = _get_analyzer_engine() + anonymizer = AnonymizerEngine() + entity_types = list(detected.keys()) + + # Create operators for each entity type + operators = { + entity_type: OperatorConfig("replace", {"new_value": f"<{entity_type}>"}) + for entity_type in entity_types + } + + def _mask_text(text: str) -> str: + """Mask using Presidio's analyzer and anonymizer.""" + if not text: + return text + + analyzer_results = analyzer.analyze(text, entities=entity_types, language="en") + if analyzer_results: + return anonymizer.anonymize(text=text, analyzer_results=analyzer_results, operators=operators).text + return text + + # Mask each text part + modified_content = [] + for part in current_content: + if isinstance(part, dict): + part_text = part.get("text") + if ( + part.get("type") in {"text", "input_text", "output_text"} + and isinstance(part_text, str) + and part_text + ): + modified_content.append({**part, "text": _mask_text(part_text)}) else: - # Fallback: if it's an object-like, set attribute when possible + modified_content.append(part) + else: + # Handle object-based content parts + if ( + hasattr(part, "type") + and hasattr(part, "text") + and part.type in {"text", "input_text", "output_text"} + and isinstance(part.text, str) + and part.text + ): try: - modified_messages[latest_user_idx].content = modified_content + part.text = _mask_text(part.text) except Exception: - return data + pass + modified_content.append(part) - return modified_messages + return self._update_message_content(data, user_idx, modified_content) def _instantiate_all_guardrails(self) -> dict[str, list]: """Instantiate guardrails for all stages.""" diff --git a/src/guardrails/checks/text/hallucination_detection.py b/src/guardrails/checks/text/hallucination_detection.py index 82a7795..93b33a8 100644 --- a/src/guardrails/checks/text/hallucination_detection.py +++ b/src/guardrails/checks/text/hallucination_detection.py @@ -233,7 +233,6 @@ async def hallucination_detection( "guardrail_name": "Hallucination Detection", **analysis.model_dump(), "threshold": config.confidence_threshold, - "checked_text": candidate, # Hallucination Detection doesn't modify text, pass through unchanged }, ) @@ -248,7 +247,6 @@ async def hallucination_detection( return create_error_result( guardrail_name="Hallucination Detection", analysis=error_output, - checked_text=candidate, additional_info={ "threshold": config.confidence_threshold, "reasoning": f"Validation failed: {str(e)}", @@ -268,7 +266,6 @@ async def hallucination_detection( return create_error_result( guardrail_name="Hallucination Detection", analysis=error_output, - checked_text=candidate, additional_info={ "threshold": config.confidence_threshold, "reasoning": f"Analysis failed: {str(e)}", diff --git a/src/guardrails/checks/text/keywords.py b/src/guardrails/checks/text/keywords.py index dce9618..297bf96 100644 --- a/src/guardrails/checks/text/keywords.py +++ b/src/guardrails/checks/text/keywords.py @@ -116,7 +116,6 @@ def match_keywords( "matched": unique, "checked": config.keywords, "sanitized_keywords": sanitized_keywords, - "checked_text": data, # Keyword filtering doesn't modify text, pass through unchanged }, ) diff --git a/src/guardrails/checks/text/llm_base.py b/src/guardrails/checks/text/llm_base.py index ed6a71f..e776006 100644 --- a/src/guardrails/checks/text/llm_base.py +++ b/src/guardrails/checks/text/llm_base.py @@ -126,7 +126,6 @@ class LLMErrorOutput(LLMOutput): def create_error_result( guardrail_name: str, analysis: LLMErrorOutput, - checked_text: str, additional_info: dict[str, Any] | None = None, ) -> GuardrailResult: """Create a standardized GuardrailResult from an LLM error output. @@ -134,7 +133,6 @@ def create_error_result( Args: guardrail_name: Name of the guardrail that failed. analysis: The LLM error output. - checked_text: The text that was being checked. additional_info: Optional additional fields to include in info dict. Returns: @@ -145,7 +143,6 @@ def create_error_result( result_info: dict[str, Any] = { "guardrail_name": guardrail_name, - "checked_text": checked_text, "error": error_message, **analysis.model_dump(), } @@ -389,7 +386,6 @@ async def guardrail_func( return create_error_result( guardrail_name=name, analysis=analysis, - checked_text=data, ) # Compare severity levels @@ -400,7 +396,6 @@ async def guardrail_func( "guardrail_name": name, **analysis.model_dump(), "threshold": config.confidence_threshold, - "checked_text": data, # LLM-based guardrails don't modify text, pass through unchanged }, ) diff --git a/src/guardrails/checks/text/moderation.py b/src/guardrails/checks/text/moderation.py index c536669..4cf33de 100644 --- a/src/guardrails/checks/text/moderation.py +++ b/src/guardrails/checks/text/moderation.py @@ -218,7 +218,6 @@ async def moderation( "flagged_categories": flagged_categories, "categories_checked": config.categories, "category_details": category_details, - "checked_text": data, # Moderation doesn't modify text, pass through unchanged }, ) diff --git a/src/guardrails/checks/text/pii.py b/src/guardrails/checks/text/pii.py index d8dc90f..2f12144 100644 --- a/src/guardrails/checks/text/pii.py +++ b/src/guardrails/checks/text/pii.py @@ -84,6 +84,8 @@ from presidio_analyzer.predefined_recognizers.country_specific.korea.kr_rrn_recognizer import ( KrRrnRecognizer, ) +from presidio_anonymizer import AnonymizerEngine +from presidio_anonymizer.entities import OperatorConfig from pydantic import BaseModel, ConfigDict, Field from guardrails.registry import default_spec_registry @@ -129,6 +131,19 @@ def _get_analyzer_engine() -> AnalyzerEngine: return engine +@functools.lru_cache(maxsize=1) +def _get_anonymizer_engine() -> AnonymizerEngine: + """Return a cached AnonymizerEngine for PII masking. + + Uses Presidio's built-in anonymization for optimal performance and + correct handling of overlapping entities, Unicode, and special characters. + + Returns: + AnonymizerEngine: Configured anonymizer for replacing PII entities. + """ + return AnonymizerEngine() + + class PIIEntity(str, Enum): """Supported PII entity types for detection. @@ -283,13 +298,10 @@ def _detect_pii(text: str, config: PIIConfig) -> PiiDetectionResult: def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> str: - """Mask detected PII from text by replacing with entity type markers. + """Mask detected PII using Presidio's AnonymizerEngine. - Handles overlapping entities using these rules: - 1. Full overlap: Use entity with higher score - 2. One contained in another: Use larger text span - 3. Partial intersection: Replace each individually - 4. No overlap: Replace normally + Uses Presidio's built-in anonymization for optimal performance and + correct handling of overlapping entities, Unicode, and special characters. Args: text (str): The text to mask. @@ -305,21 +317,25 @@ def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> st if not text: raise ValueError("Text cannot be empty or None") - # Sort by start position and score for consistent handling - sorted_results = sorted(detection.analyzer_results, key=lambda x: (x.start, -x.score, -x.end)) + if not detection.analyzer_results: + return text - # Process results in order, tracking text offsets - result = text - offset = 0 + # Use Presidio's optimized anonymizer with replace operator + anonymizer = _get_anonymizer_engine() - for res in sorted_results: - start = res.start + offset - end = res.end + offset - replacement = f"<{res.entity_type}>" - result = result[:start] + replacement + result[end:] - offset += len(replacement) - (end - start) + # Create operators mapping each entity type to a replace operator + operators = { + entity_type: OperatorConfig("replace", {"new_value": f"<{entity_type}>"}) + for entity_type in detection.mapping.keys() + } - return result + result = anonymizer.anonymize( + text=text, + analyzer_results=detection.analyzer_results, + operators=operators, + ) + + return result.text def _as_result(detection: PiiDetectionResult, config: PIIConfig, name: str, text: str) -> GuardrailResult: @@ -335,11 +351,11 @@ def _as_result(detection: PiiDetectionResult, config: PIIConfig, name: str, text GuardrailResult: Always includes checked_text. Triggers tripwire only if PII found AND block=True. """ - # Mask the text if PII is found - checked_text = _mask_pii(text, detection, config) if detection.mapping else text - # Only trigger tripwire if PII is found AND block=True tripwire_triggered = bool(detection.mapping) and config.block + + # Mask the text if PII is found + checked_text = _mask_pii(text, detection, config) if detection.mapping else text return GuardrailResult( tripwire_triggered=tripwire_triggered, diff --git a/src/guardrails/checks/text/prompt_injection_detection.py b/src/guardrails/checks/text/prompt_injection_detection.py index ea4ebf5..50ebfcb 100644 --- a/src/guardrails/checks/text/prompt_injection_detection.py +++ b/src/guardrails/checks/text/prompt_injection_detection.py @@ -296,7 +296,6 @@ async def prompt_injection_detection( "evidence": analysis.evidence, "user_goal": user_goal_text, "action": recent_messages, - "checked_text": str(conversation_history), }, ) return result @@ -377,7 +376,6 @@ def _create_skip_result( "evidence": None, "user_goal": user_goal, "action": action or [], - "checked_text": data, }, ) diff --git a/src/guardrails/checks/text/secret_keys.py b/src/guardrails/checks/text/secret_keys.py index b14c477..ebae557 100644 --- a/src/guardrails/checks/text/secret_keys.py +++ b/src/guardrails/checks/text/secret_keys.py @@ -338,7 +338,6 @@ def _detect_secret_keys(text: str, cfg: SecretCfg, custom_regex: list[str] | Non info={ "guardrail_name": "Secret Keys", "detected_secrets": secrets, - "checked_text": text, # Secret key detection doesn't modify text, pass through unchanged }, ) diff --git a/src/guardrails/checks/text/urls.py b/src/guardrails/checks/text/urls.py index b059c15..f8a4b89 100644 --- a/src/guardrails/checks/text/urls.py +++ b/src/guardrails/checks/text/urls.py @@ -293,7 +293,6 @@ async def urls(ctx: Any, data: str, config: URLConfig) -> GuardrailResult: "allowed": allowed, "blocked": blocked, "blocked_reasons": blocked_reasons, - "checked_text": data, }, ) diff --git a/tests/unit/checks/test_keywords.py b/tests/unit/checks/test_keywords.py index b9175dd..9ac19a0 100644 --- a/tests/unit/checks/test_keywords.py +++ b/tests/unit/checks/test_keywords.py @@ -19,7 +19,6 @@ def test_match_keywords_sanitizes_trailing_punctuation() -> None: assert result.info["sanitized_keywords"] == ["token", "secret", "KEY"] # noqa: S101 assert result.info["matched"] == ["token"] # noqa: S101 assert result.info["guardrail_name"] == "Test Guardrail" # noqa: S101 - assert result.info["checked_text"] == "Leaked token appears here." # noqa: S101 def test_match_keywords_deduplicates_case_insensitive_matches() -> None: diff --git a/tests/unit/test_base_client.py b/tests/unit/test_base_client.py index 1d97db1..cca811e 100644 --- a/tests/unit/test_base_client.py +++ b/tests/unit/test_base_client.py @@ -36,7 +36,7 @@ def test_extract_latest_user_message_content_parts() -> None: "role": "user", "content": [ {"type": "input_text", "text": "first"}, - {"type": "summary_text", "text": "second"}, + {"type": "output_text", "text": "second"}, ], }, ] @@ -58,12 +58,17 @@ def test_extract_latest_user_message_missing_user() -> None: def test_apply_preflight_modifications_masks_user_message() -> None: - """Mask PII tokens for the most recent user message.""" + """Mask PII tokens for the most recent user message using PII guardrail.""" client = GuardrailsBaseClient() guardrail_results = [ GuardrailResult( tripwire_triggered=False, - info={"detected_entities": {"PERSON": ["Alice Smith"]}}, + info={ + "guardrail_name": "Contains PII", + "pii_detected": True, + "detected_entities": {"PERSON": ["Alice Smith"]}, + "checked_text": "My name is .", + }, ) ] messages = [ @@ -78,12 +83,17 @@ def test_apply_preflight_modifications_masks_user_message() -> None: def test_apply_preflight_modifications_handles_strings() -> None: - """Apply masking for string payloads.""" + """Apply masking for string payloads using PII guardrail.""" client = GuardrailsBaseClient() guardrail_results = [ GuardrailResult( tripwire_triggered=False, - info={"detected_entities": {"PHONE": ["+1-555-0100"]}}, + info={ + "guardrail_name": "Contains PII", + "pii_detected": True, + "detected_entities": {"PHONE": ["+1-555-0100"]}, + "checked_text": "", + }, ) ] @@ -104,19 +114,24 @@ def test_apply_preflight_modifications_skips_when_no_entities() -> None: def test_apply_preflight_modifications_structured_content() -> None: - """Structured content parts should be masked individually.""" + """Structured content parts should be masked individually using PII guardrail.""" client = GuardrailsBaseClient() guardrail_results = [ GuardrailResult( tripwire_triggered=False, - info={"detected_entities": {"PHONE": ["123-456"]}}, + info={ + "guardrail_name": "Contains PII", + "pii_detected": True, + "detected_entities": {"PHONE_NUMBER": ["123-456-7890"]}, + "checked_text": "Call ", + }, ) ] messages = [ { "role": "user", "content": [ - {"type": "input_text", "text": "Call 123-456"}, + {"type": "input_text", "text": "Call 123-456-7890"}, {"type": "json", "value": {"raw": "no change"}}, ], } @@ -124,7 +139,7 @@ def test_apply_preflight_modifications_structured_content() -> None: modified = client._apply_preflight_modifications(messages, guardrail_results) - assert modified[0]["content"][0]["text"] == "Call " # noqa: S101 + assert modified[0]["content"][0]["text"] == "Call " # noqa: S101 assert modified[0]["content"][1]["value"] == {"raw": "no change"} # noqa: S101 @@ -134,7 +149,12 @@ def test_apply_preflight_modifications_object_message_handles_failure() -> None: guardrail_results = [ GuardrailResult( tripwire_triggered=False, - info={"detected_entities": {"NAME": ["Alice"]}}, + info={ + "guardrail_name": "Contains PII", + "pii_detected": True, + "detected_entities": {"NAME": ["Alice"]}, + "checked_text": "", + }, ) ] @@ -159,7 +179,17 @@ def __setattr__(self, key: str, value: Any) -> None: def test_apply_preflight_modifications_no_user_message() -> None: """When no user message exists, data should be returned unchanged.""" client = GuardrailsBaseClient() - guardrail_results = [GuardrailResult(tripwire_triggered=False, info={"detected_entities": {"NAME": ["Alice"]}})] + guardrail_results = [ + GuardrailResult( + tripwire_triggered=False, + info={ + "guardrail_name": "Contains PII", + "pii_detected": True, + "detected_entities": {"NAME": ["Alice"]}, + "checked_text": "", + }, + ) + ] messages = [{"role": "assistant", "content": "hi"}] modified = client._apply_preflight_modifications(messages, guardrail_results) @@ -168,9 +198,19 @@ def test_apply_preflight_modifications_no_user_message() -> None: def test_apply_preflight_modifications_non_dict_part_preserved() -> None: - """Non-dict content parts should be preserved as-is.""" + """Non-dict content parts should be preserved as-is when PII guardrail runs.""" client = GuardrailsBaseClient() - guardrail_results = [GuardrailResult(tripwire_triggered=False, info={"detected_entities": {"NAME": ["Alice"]}})] + guardrail_results = [ + GuardrailResult( + tripwire_triggered=False, + info={ + "guardrail_name": "Contains PII", + "pii_detected": True, + "detected_entities": {"NAME": ["Alice"]}, + "checked_text": "raw text", + }, + ) + ] messages = [ { "role": "user", @@ -180,6 +220,8 @@ def test_apply_preflight_modifications_non_dict_part_preserved() -> None: modified = client._apply_preflight_modifications(messages, guardrail_results) + # Content is a list (not string), so structured content path is used + # which preserves non-dict parts assert modified[0]["content"][0] == "raw text" # noqa: S101 @@ -316,7 +358,15 @@ def fake_validate(gr: Any, ctx: Any) -> None: def test_apply_preflight_modifications_leaves_unknown_content() -> None: """Unknown content types should remain untouched.""" client = GuardrailsBaseClient() - result = GuardrailResult(tripwire_triggered=False, info={"detected_entities": {"NAME": ["Alice"]}}) + result = GuardrailResult( + tripwire_triggered=False, + info={ + "guardrail_name": "Contains PII", + "pii_detected": True, + "detected_entities": {"NAME": ["Alice"]}, + "checked_text": "", + }, + ) messages = [{"role": "user", "content": {"unknown": "value"}}] modified = client._apply_preflight_modifications(messages, [result]) @@ -327,7 +377,15 @@ def test_apply_preflight_modifications_leaves_unknown_content() -> None: def test_apply_preflight_modifications_non_string_text_retained() -> None: """Content parts without string text should remain unchanged.""" client = GuardrailsBaseClient() - result = GuardrailResult(tripwire_triggered=False, info={"detected_entities": {"PHONE": ["123"]}}) + result = GuardrailResult( + tripwire_triggered=False, + info={ + "guardrail_name": "Contains PII", + "pii_detected": True, + "detected_entities": {"PHONE": ["123"]}, + "checked_text": "", + }, + ) messages = [ { "role": "user", @@ -400,3 +458,73 @@ def test_create_default_context_uses_existing_context() -> None: assert client._create_default_context() is existing # noqa: S101 finally: guardrails_context.clear_context() + + +def test_apply_preflight_modifications_ignores_non_pii_guardrails() -> None: + """Non-PII guardrails should not trigger text modifications.""" + client = GuardrailsBaseClient() + guardrail_results = [ + GuardrailResult( + tripwire_triggered=False, + info={ + "guardrail_name": "Moderation", + "detected_entities": {"PERSON": ["Alice"]}, # Should be ignored + }, + ) + ] + messages = [{"role": "user", "content": "Hello Alice"}] + + modified = client._apply_preflight_modifications(messages, guardrail_results) + + # Should return original - no PII guardrail present + assert modified is messages # noqa: S101 + + +def test_apply_preflight_modifications_only_uses_pii_checked_text() -> None: + """Only PII guardrail's checked_text should be used.""" + client = GuardrailsBaseClient() + guardrail_results = [ + # Moderation result (should be ignored) + GuardrailResult( + tripwire_triggered=False, + info={ + "guardrail_name": "Moderation", + }, + ), + # PII result (should be used) + GuardrailResult( + tripwire_triggered=False, + info={ + "guardrail_name": "Contains PII", + "pii_detected": True, + "detected_entities": {"EMAIL_ADDRESS": ["user@example.com"]}, + "checked_text": "Contact ", + }, + ), + ] + + masked = client._apply_preflight_modifications("Contact user@example.com", guardrail_results) + + # Should use PII's checked_text, not moderation's + assert masked == "Contact " # noqa: S101 + + +def test_apply_preflight_modifications_no_pii_detected() -> None: + """When PII guardrail runs but finds nothing, don't modify text.""" + client = GuardrailsBaseClient() + guardrail_results = [ + GuardrailResult( + tripwire_triggered=False, + info={ + "guardrail_name": "Contains PII", + "pii_detected": False, # No PII found + "detected_entities": {}, + "checked_text": "Clean text", + }, + ), + ] + + result = client._apply_preflight_modifications("Clean text", guardrail_results) + + # Should return original since no PII was detected + assert result == "Clean text" # noqa: S101 From d992fc5860fad5977f92036932eb7f2bee5a94f6 Mon Sep 17 00:00:00 2001 From: Steven C Date: Tue, 4 Nov 2025 17:18:48 -0500 Subject: [PATCH 2/6] Improve PII to handle encoded content --- docs/ref/checks/pii.md | 51 ++- examples/basic/pii_mask_example.py | 3 + src/guardrails/_base_client.py | 37 +- src/guardrails/checks/text/pii.py | 411 ++++++++++++++++++++-- src/guardrails/utils/safety_identifier.py | 1 - tests/unit/checks/test_pii.py | 294 +++++++++++++++- tests/unit/test_safety_identifier.py | 1 - 7 files changed, 742 insertions(+), 56 deletions(-) diff --git a/docs/ref/checks/pii.md b/docs/ref/checks/pii.md index f51791e..3392d58 100644 --- a/docs/ref/checks/pii.md +++ b/docs/ref/checks/pii.md @@ -2,22 +2,33 @@ Detects personally identifiable information (PII) such as SSNs, phone numbers, credit card numbers, and email addresses using Microsoft's [Presidio library](https://microsoft.github.io/presidio/). Will automatically mask detected PII or block content based on configuration. +**Advanced Security Features:** + +- **Unicode normalization**: Prevents bypasses using fullwidth characters (@) or zero-width spaces +- **Encoded PII detection**: Optionally detects PII hidden in Base64, URL-encoded, or hex strings +- **URL context awareness**: Detects emails in query parameters (e.g., `GET /api?user=john@example.com`) +- **Custom recognizers**: Includes CVV/CVC codes and BIC/SWIFT codes beyond Presidio defaults + ## Configuration ```json { "name": "Contains PII", "config": { - "entities": ["EMAIL_ADDRESS", "US_SSN", "CREDIT_CARD", "PHONE_NUMBER"], - "block": false + "entities": ["EMAIL_ADDRESS", "US_SSN", "CREDIT_CARD", "PHONE_NUMBER", "CVV", "BIC_SWIFT"], + "block": false, + "detect_encoded_pii": false } } ``` ### Parameters -- **`entities`** (required): List of PII entity types to detect. See the full list of [supported entities](https://microsoft.github.io/presidio/supported_entities/). +- **`entities`** (required): List of PII entity types to detect. Includes: + - Standard Presidio entities: See the full list of [supported entities](https://microsoft.github.io/presidio/supported_entities/) + - Custom entities: `CVV` (credit card security codes), `BIC_SWIFT` (bank identification codes) - **`block`** (optional): Whether to block content or just mask PII (default: `false`) +- **`detect_encoded_pii`** (optional): If `true`, detects PII in Base64/URL-encoded/hex strings (default: `false`) ## Implementation Notes @@ -41,6 +52,8 @@ Detects personally identifiable information (PII) such as SSNs, phone numbers, c Returns a `GuardrailResult` with the following `info` dictionary: +### Basic Example (Plain PII) + ```json { "guardrail_name": "Contains PII", @@ -55,8 +68,34 @@ Returns a `GuardrailResult` with the following `info` dictionary: } ``` -- **`detected_entities`**: Detected entities and their values +### With Encoded PII Detection Enabled + +When `detect_encoded_pii: true`, the guardrail also detects and masks encoded PII: + +```json +{ + "guardrail_name": "Contains PII", + "detected_entities": { + "EMAIL_ADDRESS": [ + "user@email.com", + "am9obkBleGFtcGxlLmNvbQ==", + "%6a%6f%65%40domain.com", + "6a6f686e406578616d706c652e636f6d" + ] + }, + "entity_types_checked": ["EMAIL_ADDRESS"], + "checked_text": "Contact or or ", + "block_mode": false, + "pii_detected": true +} +``` + +Note: Encoded PII is masked with `` to distinguish it from plain text PII. + +### Field Descriptions + +- **`detected_entities`**: Detected entities and their values (includes both plain and encoded forms when `detect_encoded_pii` is enabled) - **`entity_types_checked`**: List of entity types that were configured for detection -- **`checked_text`**: Text with PII masked (if PII was found) or original text (if no PII was found) +- **`checked_text`**: Text with PII masked. Plain PII uses ``, encoded PII uses `` - **`block_mode`**: Whether the check was configured to block or mask -- **`pii_detected`**: Boolean indicating if any PII was found +- **`pii_detected`**: Boolean indicating if any PII was found (plain or encoded) \ No newline at end of file diff --git a/examples/basic/pii_mask_example.py b/examples/basic/pii_mask_example.py index 58ca48d..5d4dd4b 100644 --- a/examples/basic/pii_mask_example.py +++ b/examples/basic/pii_mask_example.py @@ -33,8 +33,11 @@ "PHONE_NUMBER", "US_SSN", "CREDIT_CARD", + "CVV", + "BIC_SWIFT", ], "block": False, # Default - won't block, just mask + "detect_encoded_pii": True, }, } ], diff --git a/src/guardrails/_base_client.py b/src/guardrails/_base_client.py index a225855..9e26149 100644 --- a/src/guardrails/_base_client.py +++ b/src/guardrails/_base_client.py @@ -183,9 +183,7 @@ def _apply_preflight_modifications( # Get current content current_content = ( - data[latest_user_idx]["content"] - if isinstance(data[latest_user_idx], dict) - else getattr(data[latest_user_idx], "content", None) + data[latest_user_idx]["content"] if isinstance(data[latest_user_idx], dict) else getattr(data[latest_user_idx], "content", None) ) # Apply PII-masked text based on content type @@ -195,17 +193,15 @@ def _apply_preflight_modifications( if checked_text is None: return data return self._update_message_content(data, latest_user_idx, checked_text) - + if isinstance(current_content, list): # Structured content - mask each text part individually using Presidio return self._apply_pii_masking_to_structured_content(data, pii_result, latest_user_idx, current_content) - + # Unknown content type, return unchanged return data - def _update_message_content( - self, data: list[dict[str, str]], user_idx: int, new_content: Any - ) -> list[dict[str, str]]: + def _update_message_content(self, data: list[dict[str, str]], user_idx: int, new_content: Any) -> list[dict[str, str]]: """Update message content at the specified index. Args: @@ -263,31 +259,30 @@ def _apply_pii_masking_to_structured_content( entity_types = list(detected.keys()) # Create operators for each entity type - operators = { - entity_type: OperatorConfig("replace", {"new_value": f"<{entity_type}>"}) - for entity_type in entity_types - } + operators = {entity_type: OperatorConfig("replace", {"new_value": f"<{entity_type}>"}) for entity_type in entity_types} def _mask_text(text: str) -> str: - """Mask using Presidio's analyzer and anonymizer.""" + """Mask using Presidio's analyzer and anonymizer with Unicode normalization.""" if not text: return text - analyzer_results = analyzer.analyze(text, entities=entity_types, language="en") + # Import normalization function from pii module + from .checks.text.pii import _normalize_unicode + + # Normalize to prevent bypasses + normalized = _normalize_unicode(text) + + analyzer_results = analyzer.analyze(normalized, entities=entity_types, language="en") if analyzer_results: - return anonymizer.anonymize(text=text, analyzer_results=analyzer_results, operators=operators).text - return text + return anonymizer.anonymize(text=normalized, analyzer_results=analyzer_results, operators=operators).text + return normalized # Mask each text part modified_content = [] for part in current_content: if isinstance(part, dict): part_text = part.get("text") - if ( - part.get("type") in {"text", "input_text", "output_text"} - and isinstance(part_text, str) - and part_text - ): + if part.get("type") in {"text", "input_text", "output_text"} and isinstance(part_text, str) and part_text: modified_content.append({**part, "text": _mask_text(part_text)}) else: modified_content.append(part) diff --git a/src/guardrails/checks/text/pii.py b/src/guardrails/checks/text/pii.py index 2f12144..aa4adda 100644 --- a/src/guardrails/checks/text/pii.py +++ b/src/guardrails/checks/text/pii.py @@ -71,15 +71,20 @@ from __future__ import annotations +import base64 +import binascii import functools import logging +import re +import unicodedata +import urllib.parse from collections import defaultdict from collections.abc import Sequence from dataclasses import dataclass from enum import Enum from typing import Any, Final -from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, RecognizerResult +from presidio_analyzer import AnalyzerEngine, Pattern, PatternRecognizer, RecognizerRegistry, RecognizerResult from presidio_analyzer.nlp_engine import NlpEngineProvider from presidio_analyzer.predefined_recognizers.country_specific.korea.kr_rrn_recognizer import ( KrRrnRecognizer, @@ -96,14 +101,24 @@ logger = logging.getLogger(__name__) +# Zero-width and invisible Unicode characters that can be used to bypass detection +_ZERO_WIDTH_CHARS = re.compile( + r"[\u200b\u200c\u200d\u2060\ufeff]" # Zero-width space, ZWNJ, ZWJ, word joiner, BOM +) + +# Patterns for detecting encoded content +# Note: Hex must be checked BEFORE Base64 since hex strings can match Base64 pattern +_HEX_PATTERN = re.compile(r"\b[0-9a-fA-F]{24,}\b") # Reduced from 32 to 24 (12 bytes min) +_BASE64_PATTERN = re.compile(r"(?:data:|base64,)?([A-Za-z0-9+/]{16,}={0,2})") # Handle data: URI, min 16 chars +_URL_ENCODED_PATTERN = re.compile(r"(?:%[0-9A-Fa-f]{2})+") # Match all consecutive sequences + @functools.lru_cache(maxsize=1) def _get_analyzer_engine() -> AnalyzerEngine: """Return a cached AnalyzerEngine configured with Presidio recognizers. The engine loads Presidio's default recognizers for English and explicitly - registers the built-in KR_RRN recognizer to make it available alongside - other PII detectors within the guardrail. + registers custom recognizers for KR_RRN, CVV/CVC codes, and BIC/SWIFT codes. Returns: AnalyzerEngine: Analyzer configured with English NLP support and @@ -121,8 +136,54 @@ def _get_analyzer_engine() -> AnalyzerEngine: registry = RecognizerRegistry(supported_languages=["en"]) registry.load_predefined_recognizers(languages=["en"], nlp_engine=nlp_engine) + + # Add custom recognizers registry.add_recognizer(KrRrnRecognizer(supported_language="en")) + # CVV/CVC recognizer (3-4 digits, often near credit card context) + cvv_pattern = Pattern( + name="cvv_pattern", + regex=r"\b(?:cvv|cvc|security\s*code|card\s*code)[:\s=]*(\d{3,4})\b", + score=0.85, + ) + registry.add_recognizer( + PatternRecognizer( + supported_entity="CVV", + patterns=[cvv_pattern], + supported_language="en", + ) + ) + + # BIC/SWIFT code recognizer (8 or 11 characters: 4 bank + 2 country + 2 location + 3 branch) + bic_pattern = Pattern( + name="bic_swift_pattern", + regex=r"\b[A-Z]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?\b", + score=0.75, + ) + registry.add_recognizer( + PatternRecognizer( + supported_entity="BIC_SWIFT", + patterns=[bic_pattern], + supported_language="en", + ) + ) + + # Email in URL/query parameter context (Presidio's default fails in these contexts) + # Matches: user=john@example.com, email=test@domain.org, etc. + # Uses lookbehind to avoid capturing delimiters + email_in_url_pattern = Pattern( + name="email_in_url_pattern", + regex=r"(?<=[\?&=\/])[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", + score=0.9, + ) + registry.add_recognizer( + PatternRecognizer( + supported_entity="EMAIL_ADDRESS", + patterns=[email_in_url_pattern], + supported_language="en", + ) + ) + engine = AnalyzerEngine( registry=registry, nlp_engine=nlp_engine, @@ -148,7 +209,7 @@ class PIIEntity(str, Enum): """Supported PII entity types for detection. Includes global and region-specific types (US, UK, Spain, Italy, etc.). - These map to Presidio's supported entity labels. + These map to Presidio's supported entity labels, plus custom recognizers. Example values: "US_SSN", "EMAIL_ADDRESS", "IP_ADDRESS", "IN_PAN", etc. """ @@ -167,6 +228,10 @@ class PIIEntity(str, Enum): MEDICAL_LICENSE = "MEDICAL_LICENSE" URL = "URL" + # Custom recognizers + CVV = "CVV" + BIC_SWIFT = "BIC_SWIFT" + # USA US_BANK_NUMBER = "US_BANK_NUMBER" US_DRIVER_LICENSE = "US_DRIVER_LICENSE" @@ -227,6 +292,9 @@ class PIIConfig(BaseModel): block (bool): If True, triggers tripwire when PII is detected (blocking behavior). If False, only masks PII without blocking. Defaults to False. + detect_encoded_pii (bool): If True, detects PII in Base64/URL-encoded/hex strings. + Adds ~30-40ms latency but catches obfuscated PII. + Defaults to False. """ entities: list[PIIEntity] = Field( @@ -237,6 +305,10 @@ class PIIConfig(BaseModel): default=False, description="If True, triggers tripwire when PII is detected (blocking mode). If False, masks PII without blocking (masking mode, only works in pre-flight stage).", # noqa: E501 ) + detect_encoded_pii: bool = Field( + default=False, + description="If True, detects PII in encoded content (Base64, URL-encoded, hex). Adds latency but improves security.", # noqa: E501 + ) model_config = ConfigDict(extra="forbid") @@ -248,10 +320,12 @@ class PiiDetectionResult: Attributes: mapping (dict[str, list[str]]): Mapping from entity type to list of detected strings. analyzer_results (Sequence[RecognizerResult]): Raw analyzer results for position information. + encoded_detections (dict[str, list[str]] | None): Optional mapping of encoded PII detections. """ mapping: dict[str, list[str]] analyzer_results: Sequence[RecognizerResult] + encoded_detections: dict[str, list[str]] | None = None def to_dict(self) -> dict[str, list[str]]: """Convert the result to a dictionary. @@ -261,10 +335,22 @@ def to_dict(self) -> dict[str, list[str]]: """ return {k: v.copy() for k, v in self.mapping.items()} + def has_pii(self) -> bool: + """Check if any PII was detected (plain or encoded). + + Returns: + bool: True if PII was detected. + """ + return bool(self.mapping) or bool(self.encoded_detections) + def _detect_pii(text: str, config: PIIConfig) -> PiiDetectionResult: """Run Presidio analysis and collect findings by entity type. + Applies Unicode normalization before analysis to prevent bypasses using + fullwidth characters or zero-width spaces. This ensures that obfuscation + techniques cannot evade PII detection. + Supports detection of Korean (KR_RRN) and other region-specific entities via Presidio recognizers registered with the analyzer engine. @@ -281,35 +367,217 @@ def _detect_pii(text: str, config: PIIConfig) -> PiiDetectionResult: if not text: raise ValueError("Text cannot be empty or None") + # Normalize Unicode to prevent detection bypasses + normalized_text = _normalize_unicode(text) + engine = _get_analyzer_engine() # Run analysis for all configured entities # Region-specific recognizers (e.g., KR_RRN) are registered with language="en" - analyzer_results = engine.analyze(text, entities=[e.value for e in config.entities], language="en") + entity_values = [e.value for e in config.entities] + analyzer_results = engine.analyze(normalized_text, entities=entity_values, language="en") - # Filter results and create mapping - entity_values = {e.value for e in config.entities} - filtered_results = [res for res in analyzer_results if res.entity_type in entity_values] + # Group results by entity type + # Note: No filtering needed as engine.analyze already returns only requested entities grouped: dict[str, list[str]] = defaultdict(list) - for res in filtered_results: - grouped[res.entity_type].append(text[res.start : res.end]) + for res in analyzer_results: + grouped[res.entity_type].append(normalized_text[res.start : res.end]) + + return PiiDetectionResult(mapping=dict(grouped), analyzer_results=analyzer_results) + + +def _normalize_unicode(text: str) -> str: + """Normalize Unicode text to prevent detection bypasses. + + Applies NFKC normalization to convert fullwidth and other variant characters + to their canonical forms, then strips zero-width characters that could be + used to corrupt detection spans. + + Security rationale: + - Fullwidth characters (e.g., @ → @, 0 → 0) bypass regex patterns + - Zero-width spaces (\u200b) corrupt entity spans and cause leaks + - NFKC normalization handles ligatures, superscripts, circled chars, etc. + + Args: + text (str): The text to normalize. + + Returns: + str: Normalized text safe for PII detection. + + Examples: + >>> _normalize_unicode("test@example.com") # Fullwidth @ and . + 'test@example.com' + >>> _normalize_unicode("192\u200b.168.1.1") # Zero-width space in IP + '192.168.1.1' + """ + if not text: + return text + + # Step 1: NFKC normalization converts fullwidth → ASCII and decomposes ligatures + normalized = unicodedata.normalize("NFKC", text) + + # Step 2: Strip zero-width and invisible characters + cleaned = _ZERO_WIDTH_CHARS.sub("", normalized) + + return cleaned + + +@dataclass(frozen=True, slots=True) +class EncodedCandidate: + """Represents a potentially encoded string found in text. + + Attributes: + encoded_text: The encoded string as it appears in original text. + decoded_text: The decoded version (may be None if decoding failed). + encoding_type: Type of encoding (base64, url, hex). + start: Start position in original text. + end: End position in original text. + """ + + encoded_text: str + decoded_text: str | None + encoding_type: str + start: int + end: int + + +def _try_decode_base64(text: str) -> str | None: + """Attempt to decode Base64 string. + + Args: + text: String that looks like Base64. + + Returns: + Decoded string if valid, None otherwise. + """ + try: + decoded_bytes = base64.b64decode(text, validate=True) + # Check if result is valid UTF-8 + return decoded_bytes.decode("utf-8", errors="strict") + except (binascii.Error, UnicodeDecodeError, ValueError): + return None + - return PiiDetectionResult(mapping=dict(grouped), analyzer_results=filtered_results) +def _try_decode_hex(text: str) -> str | None: + """Attempt to decode hex string. + Args: + text: String that looks like hex. + + Returns: + Decoded string if valid, None otherwise. + """ + try: + decoded_bytes = bytes.fromhex(text) + return decoded_bytes.decode("utf-8", errors="strict") + except (ValueError, UnicodeDecodeError): + return None + + +def _build_decoded_text(text: str) -> tuple[str, list[EncodedCandidate]]: + """Build a fully decoded version of text by decoding all encoded chunks. + + Strategy: + 1. Find all encoded chunks (Hex, Base64, URL) + 2. Decode each chunk in place to build a fully decoded sentence + 3. Track mappings from decoded positions → original encoded spans + + This handles partial encodings like %6a%61%6e%65%40securemail.net → jane@securemail.net + + Args: + text: Text that may contain encoded chunks. -def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> str: + Returns: + Tuple of (decoded_text, candidates_with_positions) + """ + candidates = [] + used_spans = set() + + # Find hex candidates FIRST (most specific pattern) + for match in _HEX_PATTERN.finditer(text): + decoded = _try_decode_hex(match.group()) + if decoded and len(decoded) > 3: + candidates.append( + EncodedCandidate( + encoded_text=match.group(), + decoded_text=decoded, + encoding_type="hex", + start=match.start(), + end=match.end(), + ) + ) + used_spans.add((match.start(), match.end())) + + # Find Base64 candidates + for match in _BASE64_PATTERN.finditer(text): + if (match.start(), match.end()) in used_spans: + continue + + b64_string = match.group(1) if match.lastindex else match.group() + decoded = _try_decode_base64(b64_string) + if decoded and len(decoded) > 3: + candidates.append( + EncodedCandidate( + encoded_text=match.group(), + decoded_text=decoded, + encoding_type="base64", + start=match.start(), + end=match.end(), + ) + ) + used_spans.add((match.start(), match.end())) + + # Build fully decoded text by replacing Hex and Base64 chunks first + candidates.sort(key=lambda c: c.start, reverse=True) + decoded_text = text + for candidate in candidates: + if candidate.decoded_text: + decoded_text = decoded_text[: candidate.start] + candidate.decoded_text + decoded_text[candidate.end :] + + # URL decode the ENTIRE text (handles partial encodings like %6a%61%6e%65%40securemail.net) + # This must happen AFTER Base64/Hex replacement to handle mixed encodings correctly + url_decoded = urllib.parse.unquote(decoded_text) + + # If URL decoding changed the text, track encoded spans for masking + if url_decoded != decoded_text: + # Find URL-encoded spans in the ORIGINAL text for masking purposes + for match in _URL_ENCODED_PATTERN.finditer(text): + if any(start <= match.start() < end or start < match.end() <= end for start, end in used_spans): + continue + + decoded_chunk = urllib.parse.unquote(match.group()) + if decoded_chunk != match.group(): + candidates.append( + EncodedCandidate( + encoded_text=match.group(), + decoded_text=decoded_chunk, + encoding_type="url", + start=match.start(), + end=match.end(), + ) + ) + decoded_text = url_decoded + + return decoded_text, candidates + + +def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> tuple[str, dict[str, list[str]]]: """Mask detected PII using Presidio's AnonymizerEngine. + Normalizes Unicode before masking to ensure consistency with detection. Uses Presidio's built-in anonymization for optimal performance and correct handling of overlapping entities, Unicode, and special characters. + If detect_encoded_pii is enabled, also detects and masks PII in + Base64/URL-encoded/hex strings using a hybrid approach. + Args: text (str): The text to mask. detection (PiiDetectionResult): Results from PII detection. config (PIIConfig): PII detection configuration. Returns: - str: Text with PII replaced by entity type markers. + Tuple of (masked_text, encoded_detections_mapping). Raises: ValueError: If text is empty or None. @@ -317,25 +585,107 @@ def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> st if not text: raise ValueError("Text cannot be empty or None") + # Normalize Unicode to match detection normalization + normalized_text = _normalize_unicode(text) + if not detection.analyzer_results: - return text + # Check encoded content even if no direct PII found + if config.detect_encoded_pii: + return _mask_encoded_pii(normalized_text, config) + return normalized_text, {} # Use Presidio's optimized anonymizer with replace operator anonymizer = _get_anonymizer_engine() # Create operators mapping each entity type to a replace operator - operators = { - entity_type: OperatorConfig("replace", {"new_value": f"<{entity_type}>"}) - for entity_type in detection.mapping.keys() - } + operators = {entity_type: OperatorConfig("replace", {"new_value": f"<{entity_type}>"}) for entity_type in detection.mapping.keys()} result = anonymizer.anonymize( - text=text, + text=normalized_text, analyzer_results=detection.analyzer_results, operators=operators, ) - return result.text + masked_text = result.text + encoded_detections = {} + + # If enabled, also check for encoded PII + if config.detect_encoded_pii: + masked_text, encoded_detections = _mask_encoded_pii(masked_text, config) + + return masked_text, encoded_detections + + +def _mask_encoded_pii(text: str, config: PIIConfig) -> tuple[str, dict[str, list[str]]]: + """Detect and mask PII in encoded content (Base64, URL-encoded, hex). + + Strategy: + 1. Build fully decoded text by decoding all encoded chunks in place + 2. Pass the entire decoded text to Presidio once + 3. Map detections back to mask the encoded versions in original text + + Args: + text: Text potentially containing encoded PII. + config: PII configuration specifying which entities to detect. + + Returns: + Tuple of (masked_text, encoded_detections_mapping). + """ + # Build fully decoded text and get candidate mappings + decoded_text, candidates = _build_decoded_text(text) + + if not candidates: + return text, {} + + # Pass fully decoded text to Presidio ONCE + engine = _get_analyzer_engine() + analyzer_results = engine.analyze(decoded_text, entities=[e.value for e in config.entities], language="en") + + if not analyzer_results: + return text, {} + + # Map detections back to encoded chunks in original text + # Strategy: Check if the decoded chunk contributed to any PII detection + masked_text = text + encoded_detections: dict[str, list[str]] = defaultdict(list) + + # For each candidate, check if any PII was detected that includes its decoded content + # Sort candidates by start position (reverse) to mask from end to start + for candidate in sorted(candidates, key=lambda c: c.start, reverse=True): + if not candidate.decoded_text: + continue + + found_entities = set() + for res in analyzer_results: + detected_value = decoded_text[res.start : res.end] + candidate_lower = candidate.decoded_text.lower() + detected_lower = detected_value.lower() + + # Check if candidate's decoded text overlaps with the detection + # Handle partial encodings where encoded span may include extra characters + # e.g., %3A%6a%6f%65%40 → ":joe@" but only "joe@" is in email "joe@domain.com" + has_overlap = ( + candidate_lower in detected_lower # Candidate is substring of detection + or detected_lower in candidate_lower # Detection is substring of candidate + or ( + len(candidate_lower) >= 3 + and any( # Any 3-char chunk overlaps + candidate_lower[i : i + 3] in detected_lower + for i in range(0, len(candidate_lower) - 2, 2) # Step by 2 for efficiency + ) + ) + ) + + if has_overlap: + found_entities.add(res.entity_type) + encoded_detections[res.entity_type].append(candidate.encoded_text) + + if found_entities: + # Mask the encoded version in original text + entity_marker = f"<{next(iter(found_entities))}_ENCODED>" + masked_text = masked_text[: candidate.start] + entity_marker + masked_text[candidate.end :] + + return masked_text, dict(encoded_detections) def _as_result(detection: PiiDetectionResult, config: PIIConfig, name: str, text: str) -> GuardrailResult: @@ -351,21 +701,30 @@ def _as_result(detection: PiiDetectionResult, config: PIIConfig, name: str, text GuardrailResult: Always includes checked_text. Triggers tripwire only if PII found AND block=True. """ + # Mask the text (returns masked text and any encoded detections) + checked_text, encoded_detections = _mask_pii(text, detection, config) if detection.mapping or config.detect_encoded_pii else (text, {}) + + # Merge plain and encoded detections + all_detections = dict(detection.mapping) + for entity_type, values in encoded_detections.items(): + if entity_type in all_detections: + all_detections[entity_type].extend(values) + else: + all_detections[entity_type] = values + # Only trigger tripwire if PII is found AND block=True - tripwire_triggered = bool(detection.mapping) and config.block - - # Mask the text if PII is found - checked_text = _mask_pii(text, detection, config) if detection.mapping else text + has_pii = bool(all_detections) + tripwire_triggered = has_pii and config.block return GuardrailResult( tripwire_triggered=tripwire_triggered, info={ "guardrail_name": name, - "detected_entities": detection.mapping, + "detected_entities": all_detections, "entity_types_checked": config.entities, "checked_text": checked_text, "block_mode": config.block, - "pii_detected": bool(detection.mapping), + "pii_detected": has_pii, }, ) diff --git a/src/guardrails/utils/safety_identifier.py b/src/guardrails/utils/safety_identifier.py index 50a87ff..5a8a181 100644 --- a/src/guardrails/utils/safety_identifier.py +++ b/src/guardrails/utils/safety_identifier.py @@ -65,4 +65,3 @@ def supports_safety_identifier( # Default OpenAI client (no custom base_url) supports it return True - diff --git a/tests/unit/checks/test_pii.py b/tests/unit/checks/test_pii.py index 04a508b..71a5f82 100644 --- a/tests/unit/checks/test_pii.py +++ b/tests/unit/checks/test_pii.py @@ -8,7 +8,7 @@ import pytest -from guardrails.checks.text.pii import PIIConfig, PIIEntity, pii +from guardrails.checks.text.pii import PIIConfig, PIIEntity, _normalize_unicode, pii from guardrails.types import GuardrailResult @@ -232,3 +232,295 @@ async def test_pii_accepts_valid_korean_rrn_dates() -> None: # Should detect if date is valid assert result.info["pii_detected"] is True # noqa: S101 assert "KR_RRN" in result.info["detected_entities"] # noqa: S101 + + +# Security Tests: Unicode Normalization + + +def test_normalize_unicode_fullwidth_characters() -> None: + """Fullwidth characters should be normalized to ASCII.""" + # Fullwidth @ and . (@ . → @ .) + text = "test@example.com" + normalized = _normalize_unicode(text) + assert normalized == "test@example.com" # noqa: S101 + + +def test_normalize_unicode_zero_width_space() -> None: + """Zero-width spaces should be stripped.""" + # Zero-width space (\u200b) inserted in IP address + text = "192\u200b.168\u200b.1\u200b.1" + normalized = _normalize_unicode(text) + assert normalized == "192.168.1.1" # noqa: S101 + + +def test_normalize_unicode_mixed_obfuscation() -> None: + """Mixed obfuscation techniques should be normalized.""" + # Fullwidth digits + zero-width spaces + text = "SSN: 123\u200b-45\u200b-6789" + normalized = _normalize_unicode(text) + assert normalized == "SSN: 123-45-6789" # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_email_with_fullwidth_at_sign() -> None: + """Email with fullwidth @ should be detected after normalization.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False) + # Fullwidth @ (@) + text = "Contact: test@example.com" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "EMAIL_ADDRESS" in result.info["detected_entities"] # noqa: S101 + assert "" in result.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_phone_with_zero_width_spaces() -> None: + """Phone number with zero-width spaces should be detected after normalization.""" + config = PIIConfig(entities=[PIIEntity.PHONE_NUMBER], block=False) + # Zero-width spaces inserted between digits + text = "Call: 212\u200b-555\u200b-1234" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "PHONE_NUMBER" in result.info["detected_entities"] # noqa: S101 + assert "" in result.info["checked_text"] # noqa: S101 + + +# Custom Recognizer Tests: CVV and BIC/SWIFT + + +@pytest.mark.asyncio +async def test_pii_detects_cvv_code() -> None: + """CVV codes should be detected by custom recognizer.""" + config = PIIConfig(entities=[PIIEntity.CVV], block=False) + text = "Card CVV: 123" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "CVV" in result.info["detected_entities"] # noqa: S101 + assert "" in result.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_cvc_variant() -> None: + """CVC variant should also be detected.""" + config = PIIConfig(entities=[PIIEntity.CVV], block=False) + text = "Security code 4567" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "CVV" in result.info["detected_entities"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_cvv_with_equals() -> None: + """CVV with equals sign should be detected (from red team feedback).""" + config = PIIConfig(entities=[PIIEntity.CVV], block=False) + text = "Payment: cvv=533" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "CVV" in result.info["detected_entities"] # noqa: S101 + assert "" in result.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_bic_swift_code() -> None: + """BIC/SWIFT codes should be detected by custom recognizer.""" + config = PIIConfig(entities=[PIIEntity.BIC_SWIFT], block=False) + text = "Bank code: DEUTDEFF500" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "BIC_SWIFT" in result.info["detected_entities"] # noqa: S101 + assert "" in result.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_8char_bic() -> None: + """8-character BIC codes (without branch) should be detected.""" + config = PIIConfig(entities=[PIIEntity.BIC_SWIFT], block=False) + text = "Transfer to CHASUS33" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "BIC_SWIFT" in result.info["detected_entities"] # noqa: S101 + + +# Encoded PII Detection Tests + + +@pytest.mark.asyncio +async def test_pii_detects_base64_encoded_email() -> None: + """Base64-encoded email should be detected when flag is enabled.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False, detect_encoded_pii=True) + # am9obkBleGFtcGxlLmNvbQ== is base64 for john@example.com + text = "Contact: am9obkBleGFtcGxlLmNvbQ==" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "EMAIL_ADDRESS" in result.info["detected_entities"] # noqa: S101 + assert "" in result.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_ignores_base64_when_flag_disabled() -> None: + """Base64-encoded email should NOT be detected when flag is disabled (default).""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False, detect_encoded_pii=False) + text = "Contact: am9obkBleGFtcGxlLmNvbQ==" + result = await pii(None, text, config) + + # Should not detect because flag is off + assert result.info["pii_detected"] is False # noqa: S101 + assert "am9obkBleGFtcGxlLmNvbQ==" in result.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_url_encoded_email() -> None: + """URL-encoded email should be detected when flag is enabled.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False, detect_encoded_pii=True) + # %6a%61%6e%65%40%65%78%61%6d%70%6c%65%2e%63%6f%6d is URL-encoded jane@example.com + text = "Email: %6a%61%6e%65%40%65%78%61%6d%70%6c%65%2e%63%6f%6d" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "EMAIL_ADDRESS" in result.info["detected_entities"] # noqa: S101 + assert "" in result.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_hex_encoded_email() -> None: + """Hex-encoded email should be detected when flag is enabled.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False, detect_encoded_pii=True) + # 6a6f686e406578616d706c652e636f6d is hex for john@example.com + text = "Hex: 6a6f686e406578616d706c652e636f6d" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "EMAIL_ADDRESS" in result.info["detected_entities"] # noqa: S101 + assert "" in result.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_respects_entity_config_for_encoded() -> None: + """Encoded content should only be masked if entity is in config.""" + # Config only looks for PERSON, not EMAIL + config = PIIConfig(entities=[PIIEntity.PERSON], block=False, detect_encoded_pii=True) + # Base64 contains email, not person name + text = "Name: John. Email: am9obkBleGFtcGxlLmNvbQ==" + result = await pii(None, text, config) + + # Should detect John but NOT the base64 email + assert result.info["pii_detected"] is True # noqa: S101 + assert "PERSON" in result.info["detected_entities"] # noqa: S101 + assert "" in result.info["checked_text"] # noqa: S101 + # Base64 should remain unchanged + assert "am9obkBleGFtcGxlLmNvbQ==" in result.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_both_plain_and_encoded() -> None: + """Should detect both plain and encoded PII in same text.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False, detect_encoded_pii=True) + text = "Plain: alice@example.com and encoded: am9obkBleGFtcGxlLmNvbQ==" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "EMAIL_ADDRESS" in result.info["detected_entities"] # noqa: S101 + # Should have both markers + assert "" in result.info["checked_text"] # noqa: S101 + assert "" in result.info["checked_text"] # noqa: S101 + # Original email values should be masked + assert "alice@example.com" not in result.info["checked_text"] # noqa: S101 + assert "am9obkBleGFtcGxlLmNvbQ==" not in result.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_data_uri_base64() -> None: + """Data URI format Base64 should be detected.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False, detect_encoded_pii=True) + # data:Ym9iQHNlcnZlci5uZXQ= contains bob@server.net + text = "URI: data:Ym9iQHNlcnZlci5uZXQ=" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "EMAIL_ADDRESS" in result.info["detected_entities"] # noqa: S101 + assert "" in result.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_30char_hex() -> None: + """Hex strings of 24+ chars should be detected (lowered from 32).""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False, detect_encoded_pii=True) + # 6a6f686e406578616d706c652e636f6d is hex for john@example.com (30 chars) + text = "Hex: 6a6f686e406578616d706c652e636f6d" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "EMAIL_ADDRESS" in result.info["detected_entities"] # noqa: S101 + assert "" in result.info["checked_text"] # noqa: S101 + # Hex string should be removed + assert "6a6f686e406578616d706c652e636f6d" not in result.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_partial_url_encoded_email() -> None: + """Test detection of partially URL-encoded email addresses.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False, detect_encoded_pii=True) + # %6a%61%6e%65%40 = jane@ + text = "%6a%61%6e%65%40securemail.net" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "EMAIL_ADDRESS" in result.info["detected_entities"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_mixed_url_encoded_email() -> None: + """Test detection of mixed URL-encoded email with text.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False, detect_encoded_pii=True) + # partial%2Dencode%3A = partial-encode: + # %6a%6f%65%40 = joe@ + text = "partial%2Dencode%3A%6a%6f%65%40domain.com" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "EMAIL_ADDRESS" in result.info["detected_entities"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_url_encoded_prefix() -> None: + """Test detection of URL-encoded email with encoded prefix.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False, detect_encoded_pii=True) + # %3A%6a%6f%65%40 = :joe@ + text = "%3A%6a%6f%65%40domain.com" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "EMAIL_ADDRESS" in result.info["detected_entities"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_hex_encoded_email_in_url_context() -> None: + """Test detection of hex-encoded email in URL query parameters.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False, detect_encoded_pii=True) + # 6a6f686e406578616d706c652e636f6d = john@example.com + text = "GET /api?user=6a6f686e406578616d706c652e636f6d" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "EMAIL_ADDRESS" in result.info["detected_entities"] # noqa: S101 + assert "" in result.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_plain_email_in_url_context() -> None: + """Test detection of plain email in URL query parameters.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False, detect_encoded_pii=False) + text = "GET /api?user=john@example.com" + result = await pii(None, text, config) + + assert result.info["pii_detected"] is True # noqa: S101 + assert "EMAIL_ADDRESS" in result.info["detected_entities"] # noqa: S101 + assert "john@example.com" in result.info["detected_entities"]["EMAIL_ADDRESS"] # noqa: S101 diff --git a/tests/unit/test_safety_identifier.py b/tests/unit/test_safety_identifier.py index b427098..723f74f 100644 --- a/tests/unit/test_safety_identifier.py +++ b/tests/unit/test_safety_identifier.py @@ -70,4 +70,3 @@ def test_does_not_support_safety_identifier_for_alternative_provider() -> None: mock_client.__class__.__name__ = "AsyncOpenAI" assert supports_safety_identifier(mock_client) is False # noqa: S101 - From 40aa80f210b0b029af387f030b6cf08cc99ab933 Mon Sep 17 00:00:00 2001 From: Steven C Date: Wed, 5 Nov 2025 10:29:34 -0500 Subject: [PATCH 3/6] Reject large encoded content as DOS --- src/guardrails/_base_client.py | 18 +++++++---- src/guardrails/checks/text/pii.py | 51 +++++++++++++++++++++++++------ 2 files changed, 53 insertions(+), 16 deletions(-) diff --git a/src/guardrails/_base_client.py b/src/guardrails/_base_client.py index 9e26149..a0b6a7c 100644 --- a/src/guardrails/_base_client.py +++ b/src/guardrails/_base_client.py @@ -9,7 +9,7 @@ import logging from dataclasses import dataclass from pathlib import Path -from typing import Any, Union +from typing import Any, Final, Union from openai.types import Completion from openai.types.chat import ChatCompletion, ChatCompletionChunk @@ -26,6 +26,9 @@ # Type alias for OpenAI response types OpenAIResponseType = Union[Completion, ChatCompletion, ChatCompletionChunk, Response] # noqa: UP007 +# Text content types recognized in message content parts +_TEXT_CONTENT_TYPES: Final[set[str]] = {"text", "input_text", "output_text"} + @dataclass(frozen=True, slots=True) class GuardrailResults: @@ -97,13 +100,13 @@ def _content_to_text(content) -> str: if isinstance(part, dict): part_type = part.get("type") text_val = part.get("text", "") - if part_type in {"text", "input_text", "output_text"} and isinstance(text_val, str): + if part_type in _TEXT_CONTENT_TYPES and isinstance(text_val, str): parts.append(text_val) else: # Object-like content part ptype = getattr(part, "type", None) ptext = getattr(part, "text", "") - if ptype in {"text", "input_text", "output_text"} and isinstance(ptext, str): + if ptype in _TEXT_CONTENT_TYPES and isinstance(ptext, str): parts.append(ptext) return " ".join(parts).strip() return "" @@ -282,7 +285,7 @@ def _mask_text(text: str) -> str: for part in current_content: if isinstance(part, dict): part_text = part.get("text") - if part.get("type") in {"text", "input_text", "output_text"} and isinstance(part_text, str) and part_text: + if part.get("type") in _TEXT_CONTENT_TYPES and isinstance(part_text, str) and part_text: modified_content.append({**part, "text": _mask_text(part_text)}) else: modified_content.append(part) @@ -291,7 +294,7 @@ def _mask_text(text: str) -> str: if ( hasattr(part, "type") and hasattr(part, "text") - and part.type in {"text", "input_text", "output_text"} + and part.type in _TEXT_CONTENT_TYPES and isinstance(part.text, str) and part.text ): @@ -299,7 +302,10 @@ def _mask_text(text: str) -> str: part.text = _mask_text(part.text) except Exception: pass - modified_content.append(part) + modified_content.append(part) + else: + # Preserve non-dict, non-object parts (e.g., raw strings) + modified_content.append(part) return self._update_message_content(data, user_idx, modified_content) diff --git a/src/guardrails/checks/text/pii.py b/src/guardrails/checks/text/pii.py index aa4adda..ce0b88f 100644 --- a/src/guardrails/checks/text/pii.py +++ b/src/guardrails/checks/text/pii.py @@ -444,33 +444,57 @@ class EncodedCandidate: def _try_decode_base64(text: str) -> str | None: """Attempt to decode Base64 string. + Limits decoded output to 10KB to prevent DoS attacks via memory exhaustion. + Fails closed: raises error if decoded content exceeds limit to prevent PII leaks. + Args: text: String that looks like Base64. Returns: - Decoded string if valid, None otherwise. + Decoded string if valid and under size limit, None if invalid encoding. + + Raises: + ValueError: If decoded content exceeds 10KB (security limit). """ try: decoded_bytes = base64.b64decode(text, validate=True) + # Security: Fail closed - reject content > 10KB to prevent memory DoS and PII bypass + if len(decoded_bytes) > 10_000: + msg = ( + f"Base64 decoded content too large ({len(decoded_bytes):,} bytes). " + ) + raise ValueError(msg) # Check if result is valid UTF-8 return decoded_bytes.decode("utf-8", errors="strict") - except (binascii.Error, UnicodeDecodeError, ValueError): + except (binascii.Error, UnicodeDecodeError): return None def _try_decode_hex(text: str) -> str | None: """Attempt to decode hex string. + Limits decoded output to 10KB to prevent DoS attacks via memory exhaustion. + Fails closed: raises error if decoded content exceeds limit to prevent PII leaks. + Args: text: String that looks like hex. Returns: - Decoded string if valid, None otherwise. + Decoded string if valid and under size limit, None if invalid encoding. + + Raises: + ValueError: If decoded content exceeds 10KB (security limit). """ try: decoded_bytes = bytes.fromhex(text) + # Security: Fail closed - reject content > 10KB to prevent memory DoS and PII bypass + if len(decoded_bytes) > 10_000: + msg = ( + f"Hex decoded content too large ({len(decoded_bytes):,} bytes). " + ) + raise ValueError(msg) return decoded_bytes.decode("utf-8", errors="strict") - except (ValueError, UnicodeDecodeError): + except UnicodeDecodeError: return None @@ -591,8 +615,13 @@ def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> tu if not detection.analyzer_results: # Check encoded content even if no direct PII found if config.detect_encoded_pii: - return _mask_encoded_pii(normalized_text, config) - return normalized_text, {} + masked_text, encoded_detections = _mask_encoded_pii(normalized_text, config, original_text=text) + # If no encoded PII found, return original text to preserve special characters + if not encoded_detections: + return text, {} + return masked_text, encoded_detections + # No PII detected - return original text to preserve special characters + return text, {} # Use Presidio's optimized anonymizer with replace operator anonymizer = _get_anonymizer_engine() @@ -616,7 +645,7 @@ def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> tu return masked_text, encoded_detections -def _mask_encoded_pii(text: str, config: PIIConfig) -> tuple[str, dict[str, list[str]]]: +def _mask_encoded_pii(text: str, config: PIIConfig, original_text: str | None = None) -> tuple[str, dict[str, list[str]]]: """Detect and mask PII in encoded content (Base64, URL-encoded, hex). Strategy: @@ -625,24 +654,26 @@ def _mask_encoded_pii(text: str, config: PIIConfig) -> tuple[str, dict[str, list 3. Map detections back to mask the encoded versions in original text Args: - text: Text potentially containing encoded PII. + text: Normalized text potentially containing encoded PII. config: PII configuration specifying which entities to detect. + original_text: Original (non-normalized) text to return if no PII found. Returns: Tuple of (masked_text, encoded_detections_mapping). + Returns original_text if provided and no PII found, otherwise text. """ # Build fully decoded text and get candidate mappings decoded_text, candidates = _build_decoded_text(text) if not candidates: - return text, {} + return original_text or text, {} # Pass fully decoded text to Presidio ONCE engine = _get_analyzer_engine() analyzer_results = engine.analyze(decoded_text, entities=[e.value for e in config.entities], language="en") if not analyzer_results: - return text, {} + return original_text or text, {} # Map detections back to encoded chunks in original text # Strategy: Check if the decoded chunk contributed to any PII detection From 62ed5e094d41946c84e637fb03b84b434d37f3cc Mon Sep 17 00:00:00 2001 From: Steven C Date: Wed, 5 Nov 2025 10:49:16 -0500 Subject: [PATCH 4/6] Handle hex errors --- src/guardrails/checks/text/pii.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/guardrails/checks/text/pii.py b/src/guardrails/checks/text/pii.py index ce0b88f..ba79a46 100644 --- a/src/guardrails/checks/text/pii.py +++ b/src/guardrails/checks/text/pii.py @@ -461,7 +461,7 @@ def _try_decode_base64(text: str) -> str | None: # Security: Fail closed - reject content > 10KB to prevent memory DoS and PII bypass if len(decoded_bytes) > 10_000: msg = ( - f"Base64 decoded content too large ({len(decoded_bytes):,} bytes). " + f"Base64 decoded content too large ({len(decoded_bytes):,} bytes). Maximum allowed is 10KB." ) raise ValueError(msg) # Check if result is valid UTF-8 @@ -487,12 +487,16 @@ def _try_decode_hex(text: str) -> str | None: """ try: decoded_bytes = bytes.fromhex(text) - # Security: Fail closed - reject content > 10KB to prevent memory DoS and PII bypass - if len(decoded_bytes) > 10_000: - msg = ( - f"Hex decoded content too large ({len(decoded_bytes):,} bytes). " - ) - raise ValueError(msg) + except ValueError: + # Invalid hex string - return None + return None + + # Security: Fail closed - reject content > 10KB to prevent memory DoS and PII bypass + if len(decoded_bytes) > 10_000: + msg = f"Hex decoded content too large ({len(decoded_bytes):,} bytes). Maximum allowed is 10KB." + raise ValueError(msg) + + try: return decoded_bytes.decode("utf-8", errors="strict") except UnicodeDecodeError: return None From a901d25ea9ab8c9ab0019f69bd191952c2220f12 Mon Sep 17 00:00:00 2001 From: Steven C Date: Wed, 5 Nov 2025 16:44:27 -0500 Subject: [PATCH 5/6] fix structured output masking path --- src/guardrails/_base_client.py | 61 +++++++++++-- src/guardrails/checks/text/pii.py | 3 +- tests/unit/test_base_client.py | 137 ++++++++++++++++++++++++++++++ 3 files changed, 193 insertions(+), 8 deletions(-) diff --git a/src/guardrails/_base_client.py b/src/guardrails/_base_client.py index a0b6a7c..fea3cc3 100644 --- a/src/guardrails/_base_client.py +++ b/src/guardrails/_base_client.py @@ -249,11 +249,13 @@ def _apply_pii_masking_to_structured_content( from presidio_anonymizer import AnonymizerEngine from presidio_anonymizer.entities import OperatorConfig - # Extract detected entity types + # Extract detected entity types and config detected = pii_result.info.get("detected_entities", {}) if not detected: return data + detect_encoded_pii = pii_result.info.get("detect_encoded_pii", False) + # Get Presidio engines - entity types are guaranteed valid from detection from .checks.text.pii import _get_analyzer_engine @@ -265,20 +267,65 @@ def _apply_pii_masking_to_structured_content( operators = {entity_type: OperatorConfig("replace", {"new_value": f"<{entity_type}>"}) for entity_type in entity_types} def _mask_text(text: str) -> str: - """Mask using Presidio's analyzer and anonymizer with Unicode normalization.""" + """Mask using Presidio's analyzer and anonymizer with Unicode normalization. + + Handles both plain and encoded PII consistently with main detection path. + """ if not text: return text - # Import normalization function from pii module - from .checks.text.pii import _normalize_unicode + # Import functions from pii module + from .checks.text.pii import _build_decoded_text, _normalize_unicode # Normalize to prevent bypasses normalized = _normalize_unicode(text) + # Check for plain PII analyzer_results = analyzer.analyze(normalized, entities=entity_types, language="en") - if analyzer_results: - return anonymizer.anonymize(text=normalized, analyzer_results=analyzer_results, operators=operators).text - return normalized + has_plain_pii = bool(analyzer_results) + + # Check for encoded PII if enabled + has_encoded_pii = False + encoded_candidates = [] + decoded_text = normalized + + if detect_encoded_pii: + decoded_text, encoded_candidates = _build_decoded_text(normalized) + if encoded_candidates: + # Analyze decoded text + decoded_results = analyzer.analyze(decoded_text, entities=entity_types, language="en") + has_encoded_pii = bool(decoded_results) + + # If no PII found at all, return original text + if not has_plain_pii and not has_encoded_pii: + return text + + # Mask plain PII + masked = normalized + if has_plain_pii: + masked = anonymizer.anonymize(text=masked, analyzer_results=analyzer_results, operators=operators).text + + # Mask encoded PII if found + if has_encoded_pii: + # Re-analyze to get positions in the (potentially) masked text + decoded_text_for_masking, candidates_for_masking = _build_decoded_text(masked) + decoded_results = analyzer.analyze(decoded_text_for_masking, entities=entity_types, language="en") + + if decoded_results: + # Map detections back to mask encoded chunks + for result in decoded_results: + detected_value = decoded_text_for_masking[result.start : result.end] + entity_type = result.entity_type + + # Find candidate that contains this PII + for candidate in candidates_for_masking: + if detected_value in candidate.decoded_text: + # Mask the encoded version + entity_marker = f"<{entity_type}_ENCODED>" + masked = masked[: candidate.start] + entity_marker + masked[candidate.end :] + break + + return masked # Mask each text part modified_content = [] diff --git a/src/guardrails/checks/text/pii.py b/src/guardrails/checks/text/pii.py index ba79a46..d2ec90f 100644 --- a/src/guardrails/checks/text/pii.py +++ b/src/guardrails/checks/text/pii.py @@ -294,7 +294,7 @@ class PIIConfig(BaseModel): Defaults to False. detect_encoded_pii (bool): If True, detects PII in Base64/URL-encoded/hex strings. Adds ~30-40ms latency but catches obfuscated PII. - Defaults to False. + Defaults to False. """ entities: list[PIIEntity] = Field( @@ -760,6 +760,7 @@ def _as_result(detection: PiiDetectionResult, config: PIIConfig, name: str, text "checked_text": checked_text, "block_mode": config.block, "pii_detected": has_pii, + "detect_encoded_pii": config.detect_encoded_pii, }, ) diff --git a/tests/unit/test_base_client.py b/tests/unit/test_base_client.py index cca811e..d1f0622 100644 --- a/tests/unit/test_base_client.py +++ b/tests/unit/test_base_client.py @@ -68,6 +68,7 @@ def test_apply_preflight_modifications_masks_user_message() -> None: "pii_detected": True, "detected_entities": {"PERSON": ["Alice Smith"]}, "checked_text": "My name is .", + "detect_encoded_pii": False, }, ) ] @@ -93,6 +94,7 @@ def test_apply_preflight_modifications_handles_strings() -> None: "pii_detected": True, "detected_entities": {"PHONE": ["+1-555-0100"]}, "checked_text": "", + "detect_encoded_pii": False, }, ) ] @@ -124,6 +126,7 @@ def test_apply_preflight_modifications_structured_content() -> None: "pii_detected": True, "detected_entities": {"PHONE_NUMBER": ["123-456-7890"]}, "checked_text": "Call ", + "detect_encoded_pii": False, }, ) ] @@ -154,6 +157,7 @@ def test_apply_preflight_modifications_object_message_handles_failure() -> None: "pii_detected": True, "detected_entities": {"NAME": ["Alice"]}, "checked_text": "", + "detect_encoded_pii": False, }, ) ] @@ -187,6 +191,7 @@ def test_apply_preflight_modifications_no_user_message() -> None: "pii_detected": True, "detected_entities": {"NAME": ["Alice"]}, "checked_text": "", + "detect_encoded_pii": False, }, ) ] @@ -197,6 +202,133 @@ def test_apply_preflight_modifications_no_user_message() -> None: assert modified is messages # noqa: S101 +def test_apply_preflight_modifications_structured_content_with_encoded_pii() -> None: + """Structured content should detect Base64 encoded PII when flag enabled.""" + client = GuardrailsBaseClient() + guardrail_results = [ + GuardrailResult( + tripwire_triggered=False, + info={ + "guardrail_name": "Contains PII", + "pii_detected": True, + "detected_entities": {"EMAIL_ADDRESS": []}, # Will be detected from encoded + "checked_text": "Email: ", + "detect_encoded_pii": True, + }, + ) + ] + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": "Email: am9obkBleGFtcGxlLmNvbQ=="}, # john@example.com + {"type": "json", "value": {"raw": "no change"}}, + ], + } + ] + + modified = client._apply_preflight_modifications(messages, guardrail_results) + + # Should mask the encoded email with _ENCODED suffix + assert "" in modified[0]["content"][0]["text"] # noqa: S101 + assert "am9obkBleGFtcGxlLmNvbQ==" not in modified[0]["content"][0]["text"] # noqa: S101 + assert modified[0]["content"][1]["value"] == {"raw": "no change"} # noqa: S101 + + +def test_apply_preflight_modifications_structured_content_ignores_encoded_when_disabled() -> None: + """Structured content should ignore encoded PII when flag disabled.""" + client = GuardrailsBaseClient() + guardrail_results = [ + GuardrailResult( + tripwire_triggered=False, + info={ + "guardrail_name": "Contains PII", + "pii_detected": True, + "detected_entities": {"PHONE_NUMBER": ["212-555-1234"]}, + "checked_text": "Call ", + "detect_encoded_pii": False, # Disabled + }, + ) + ] + messages = [ + { + "role": "user", + "content": [ + # Contains both plain and encoded email - should only mask plain phone + {"type": "text", "text": "Call 212-555-1234 or email am9obkBleGFtcGxlLmNvbQ=="}, + ], + } + ] + + modified = client._apply_preflight_modifications(messages, guardrail_results) + + # Should mask phone but NOT encoded email (since detect_encoded_pii=False) + assert "" in modified[0]["content"][0]["text"] # noqa: S101 + assert "am9obkBleGFtcGxlLmNvbQ==" in modified[0]["content"][0]["text"] # noqa: S101 + + +def test_apply_preflight_modifications_structured_content_with_unicode_obfuscation() -> None: + """Structured content should detect Unicode-obfuscated PII after normalization.""" + client = GuardrailsBaseClient() + guardrail_results = [ + GuardrailResult( + tripwire_triggered=False, + info={ + "guardrail_name": "Contains PII", + "pii_detected": True, + "detected_entities": {"EMAIL_ADDRESS": []}, + "checked_text": "Contact: ", + "detect_encoded_pii": False, + }, + ) + ] + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": "Contact: user@example.com"}, # Fullwidth @ and . + ], + } + ] + + modified = client._apply_preflight_modifications(messages, guardrail_results) + + # Should detect and mask the obfuscated email + assert "" in modified[0]["content"][0]["text"] # noqa: S101 + assert "@" not in modified[0]["content"][0]["text"] or "@" not in modified[0]["content"][0]["text"] # noqa: S101 + + +def test_apply_preflight_modifications_structured_content_with_url_encoded_pii() -> None: + """Structured content should detect URL-encoded PII when flag enabled.""" + client = GuardrailsBaseClient() + guardrail_results = [ + GuardrailResult( + tripwire_triggered=False, + info={ + "guardrail_name": "Contains PII", + "pii_detected": True, + "detected_entities": {"EMAIL_ADDRESS": []}, + "checked_text": "User: ", + "detect_encoded_pii": True, + }, + ) + ] + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": "User: %6a%6f%68%6e%40%65%78%61%6d%70%6c%65%2e%63%6f%6d"}, # john@example.com + ], + } + ] + + modified = client._apply_preflight_modifications(messages, guardrail_results) + + # Should mask the URL-encoded email with _ENCODED suffix + assert "" in modified[0]["content"][0]["text"] # noqa: S101 + assert "%6a%6f%68%6e" not in modified[0]["content"][0]["text"] # noqa: S101 + + def test_apply_preflight_modifications_non_dict_part_preserved() -> None: """Non-dict content parts should be preserved as-is when PII guardrail runs.""" client = GuardrailsBaseClient() @@ -208,6 +340,7 @@ def test_apply_preflight_modifications_non_dict_part_preserved() -> None: "pii_detected": True, "detected_entities": {"NAME": ["Alice"]}, "checked_text": "raw text", + "detect_encoded_pii": False, }, ) ] @@ -365,6 +498,7 @@ def test_apply_preflight_modifications_leaves_unknown_content() -> None: "pii_detected": True, "detected_entities": {"NAME": ["Alice"]}, "checked_text": "", + "detect_encoded_pii": False, }, ) messages = [{"role": "user", "content": {"unknown": "value"}}] @@ -384,6 +518,7 @@ def test_apply_preflight_modifications_non_string_text_retained() -> None: "pii_detected": True, "detected_entities": {"PHONE": ["123"]}, "checked_text": "", + "detect_encoded_pii": False, }, ) messages = [ @@ -499,6 +634,7 @@ def test_apply_preflight_modifications_only_uses_pii_checked_text() -> None: "pii_detected": True, "detected_entities": {"EMAIL_ADDRESS": ["user@example.com"]}, "checked_text": "Contact ", + "detect_encoded_pii": False, }, ), ] @@ -520,6 +656,7 @@ def test_apply_preflight_modifications_no_pii_detected() -> None: "pii_detected": False, # No PII found "detected_entities": {}, "checked_text": "Clean text", + "detect_encoded_pii": False, }, ), ] From 7be1b0440420b47ff391fb89bfab22046b429156 Mon Sep 17 00:00:00 2001 From: Steven C Date: Wed, 5 Nov 2025 17:52:23 -0500 Subject: [PATCH 6/6] address copilot nits --- src/guardrails/_base_client.py | 1 - tests/unit/test_base_client.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/guardrails/_base_client.py b/src/guardrails/_base_client.py index fea3cc3..c3894a5 100644 --- a/src/guardrails/_base_client.py +++ b/src/guardrails/_base_client.py @@ -287,7 +287,6 @@ def _mask_text(text: str) -> str: # Check for encoded PII if enabled has_encoded_pii = False encoded_candidates = [] - decoded_text = normalized if detect_encoded_pii: decoded_text, encoded_candidates = _build_decoded_text(normalized) diff --git a/tests/unit/test_base_client.py b/tests/unit/test_base_client.py index d1f0622..18242af 100644 --- a/tests/unit/test_base_client.py +++ b/tests/unit/test_base_client.py @@ -295,7 +295,7 @@ def test_apply_preflight_modifications_structured_content_with_unicode_obfuscati # Should detect and mask the obfuscated email assert "" in modified[0]["content"][0]["text"] # noqa: S101 - assert "@" not in modified[0]["content"][0]["text"] or "@" not in modified[0]["content"][0]["text"] # noqa: S101 + assert "@" not in modified[0]["content"][0]["text"] and "@" not in modified[0]["content"][0]["text"] # noqa: S101 def test_apply_preflight_modifications_structured_content_with_url_encoded_pii() -> None: