# AI Compliance Report Generator for Analytics Code & Business Decks

This notebook demonstrates a fully local, multi-agent compliance review pipeline built with the **Google ADK** package. It scans either analytics code or business presentation text, maps potential issues to synthetic policy rules, and emits both machine-readable and human-readable compliance reports.


In [None]:
# Setup: install dependencies and import ADK components
!pip install -q google-adk

import json
import re
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional

from google.adk.agents import Agent
from google.adk.sessions import Session
from google.adk.tools.function_tool import FunctionTool

# Print the installed ADK version for traceability
import importlib.metadata
print('google-adk version:', importlib.metadata.version('google-adk'))


## Regulation & Policy Knowledge Base

We define a compact, in-notebook policy and regulation dictionary alongside policy bundles for the two supported business contexts.


In [None]:
REGULATION_KB: Dict[str, Dict[str, str]] = {
    'GDPR-PII-LOGGING': {
        'name': 'GDPR – PII in Logs',
        'category': 'privacy',
        'description': 'Avoid logging directly identifiable personal data (emails, phone numbers, IDs).',
        'severity_default': 'high',
    },
    'INT-PII-MASKING': {
        'name': 'Internal – Mask PII',
        'category': 'privacy',
        'description': 'PII must be masked or hashed before storage or export.',
        'severity_default': 'medium',
    },
    'MKT-CLAIMS-01': {
        'name': 'Marketing – No Guaranteed Returns',
        'category': 'marketing',
        'description': 'Marketing material must not promise guaranteed returns or risk-free performance.',
        'severity_default': 'high',
    },
    'MKT-DISCLAIMER-01': {
        'name': 'Marketing – Required Disclaimer',
        'category': 'marketing',
        'description': 'Presentations containing performance metrics must include a standard disclaimer slide.',
        'severity_default': 'medium',
    },
}

POLICY_BUNDLES: Dict[str, List[str]] = {
    'analytics': ['GDPR-PII-LOGGING', 'INT-PII-MASKING'],
    'marketing_deck': ['MKT-CLAIMS-01', 'MKT-DISCLAIMER-01'],
}


## Tool Definitions (FunctionTool)

Each tool encapsulates a deterministic piece of the compliance workflow and is wrapped with `FunctionTool` for use by ADK agents.


In [None]:


def detect_file_type_tool(text_or_code: str) -> Dict[str, Any]:
    # Heuristically classify the input as Python, SQL, or deck text.
    lowered = text_or_code.lower()
    if 'select' in lowered and 'from' in lowered:
        return {'file_type': 'code', 'code_language': 'sql', 'reason': 'SQL keywords detected'}
    if 'def ' in text_or_code or 'import ' in text_or_code:
        return {'file_type': 'code', 'code_language': 'python', 'reason': 'Python syntax detected'}
    if 'slide 1' in lowered or 'slide 2' in lowered:
        return {'file_type': 'deck', 'code_language': None, 'reason': 'Slide markers detected'}
    return {'file_type': 'deck', 'code_language': None, 'reason': 'Defaulting to deck when code cues absent'}


def select_policies_tool(context: Dict[str, Any]) -> Dict[str, Any]:
    # Return the policy bundle for the provided business context.
    business_context = context.get('business_context', 'analytics')
    rules = POLICY_BUNDLES.get(business_context, [])
    explanation = f"Selected {len(rules)} rules for context '{business_context}'."
    return {'rules': rules, 'business_context': business_context, 'explanation': explanation}


def scan_code_for_violations_tool(code: str, rules: List[str]) -> Dict[str, Any]:
    # Scan Python or SQL code for simple privacy red flags.
    findings: List[Dict[str, Any]] = []
    lines = code.splitlines()
    pii_patterns = [r'[\w.-]+@[\w.-]+', r'\d{3}-\d{2}-\d{4}', r'phone', r'ssn', r'email']
    for idx, line in enumerate(lines, start=1):
        lowered = line.lower()
        if any(re.search(pat, line, re.IGNORECASE) for pat in pii_patterns):
            findings.append({
                'id': f'F{len(findings)+1}',
                'kind': 'PII_LOGGED',
                'location': f'line {idx}',
                'snippet': line.strip(),
            })
        if 'requests.post' in lowered or ('http' in lowered and 'requests' in lowered):
            findings.append({
                'id': f'F{len(findings)+1}',
                'kind': 'DATA_EXPORT',
                'location': f'line {idx}',
                'snippet': line.strip(),
            })
        if 'logger' in lowered and ('email' in lowered or 'phone' in lowered or 'ssn' in lowered):
            findings.append({
                'id': f'F{len(findings)+1}',
                'kind': 'PII_LOGGED',
                'location': f'line {idx}',
                'snippet': line.strip(),
            })
    return {'findings': findings}


def scan_deck_for_violations_tool(deck_text: str, rules: List[str]) -> Dict[str, Any]:
    # Scan deck-style text for risky marketing claims or missing disclaimers.
    findings: List[Dict[str, Any]] = []
    slides = [s.strip() for s in deck_text.split('Slide') if s.strip()]
    for slide in slides:
        parts = slide.split(':', 1)
        slide_id = parts[0].strip() if parts else 'unknown'
        content = parts[1] if len(parts) > 1 else slide
        lowered = content.lower()
        if 'guaranteed return' in lowered or 'risk-free' in lowered:
            findings.append({
                'id': f'F{len(findings)+1}',
                'kind': 'UNAPPROVED_CLAIM',
                'location': f'slide {slide_id}',
                'snippet': content.strip(),
            })
    if 'MKT-DISCLAIMER-01' in rules:
        has_disclaimer = any('disclaimer' in s.lower() for s in slides)
        if not has_disclaimer:
            findings.append({
                'id': f'F{len(findings)+1}',
                'kind': 'MISSING_DISCLAIMER',
                'location': 'deck',
                'snippet': 'No disclaimer slide found',
            })
    return {'findings': findings}


def map_findings_to_rules_tool(
    findings: List[Dict[str, Any]],
    rules: List[str],
    file_type: str,
    code_language: Optional[str] = None,
    business_context: Optional[str] = None,
) -> Dict[str, Any]:
    # Attach rule metadata and compute an aggregate risk score.
    def pick_rule(finding_kind: str) -> str:
        if finding_kind in {'PII_LOGGED', 'DATA_EXPORT'}:
            return 'GDPR-PII-LOGGING' if 'GDPR-PII-LOGGING' in rules else (rules[0] if rules else 'UNKNOWN')
        if finding_kind == 'UNAPPROVED_CLAIM':
            return 'MKT-CLAIMS-01'
        if finding_kind == 'MISSING_DISCLAIMER':
            return 'MKT-DISCLAIMER-01'
        return rules[0] if rules else 'UNKNOWN'

    enriched = []
    severity_weight = {'high': 3, 'medium': 2, 'low': 1}
    total_weight = 0
    for finding in findings:
        rule_id = pick_rule(finding.get('kind', ''))
        rule = REGULATION_KB.get(rule_id, {})
        severity = rule.get('severity_default', 'medium')
        total_weight += severity_weight.get(severity, 1)
        enriched.append({
            **finding,
            'rule_id': rule_id,
            'rule_name': rule.get('name', 'Unknown rule'),
            'severity': severity,
            'rationale': f"Finding '{finding.get('kind')}' maps to policy '{rule_id}'.",
        })

    score = min(100, total_weight * 15)
    overall_risk = 'low'
    if score >= 60:
        overall_risk = 'high'
    elif score >= 30:
        overall_risk = 'medium'

    return {
        'file_type': file_type,
        'code_language': code_language,
        'business_context': business_context,
        'rules_applied': rules,
        'findings': enriched,
        'overall_score': score,
        'overall_risk': overall_risk,
    }


def render_compliance_report_tool(result: Dict[str, Any]) -> Dict[str, Any]:
    # Render Markdown and JSON reports from the mapped findings.
    analysis = result.get('analysis', result)
    policy_note = result.get('policy_explanation', '')
    timestamp = datetime.utcnow().isoformat() + 'Z'

    header = f'# Compliance Report
Generated: {timestamp}

'
    overview_lines = [
        f"- File type: **{analysis.get('file_type')}**",
        f"- Code language: **{analysis.get('code_language') or 'N/A'}**",
        f"- Business context: **{analysis.get('business_context') or 'unspecified'}**",
        f"- Policies applied: {', '.join(analysis.get('rules_applied', [])) or 'None'}",
        f"- Policy selection note: {policy_note or 'N/A'}",
        f"- Overall risk: **{analysis.get('overall_risk')}** (score={analysis.get('overall_score')})",
    ]

    summary_rows = []
    for f in analysis.get('findings', []):
        summary_rows.append(
            f"| {f.get('rule_id')} | {f.get('severity')} | {f.get('location')} | {f.get('snippet')} |"
        )
    summary_table = '
'.join([
        '| Rule | Severity | Location | Snippet |',
        '| --- | --- | --- | --- |',
        *summary_rows,
    ]) if summary_rows else 'No violations detected.'

    detail_blocks = []
    for f in analysis.get('findings', []):
        rule = REGULATION_KB.get(f.get('rule_id'), {})
        detail_blocks.append(
            '
'.join([
                f"### Finding {f.get('id')} — {f.get('kind')}",
                f"- **Rule:** {f.get('rule_id')} — {rule.get('name', 'Unknown')}",
                f"- **Severity:** {f.get('severity')}",
                f"- **Location:** {f.get('location')}",
                f"- **Snippet:** `{f.get('snippet')}`",
                f"- **Description:** {rule.get('description', 'No description')}",
                f"- **Recommended remediation:** Align logging/export with {rule.get('name', 'the rule')} and mask PII.",
            ])
        )

    report_markdown = '

'.join([
        header,
        '## Overview',
        '
'.join(overview_lines),
        '## Summary of Violations',
        summary_table,
        '## Detailed Findings',
        '

'.join(detail_blocks) if detail_blocks else 'No detailed findings.',
    ])

    report_json = {
        'generated_at': timestamp,
        'analysis': analysis,
        'policy_explanation': policy_note,
    }

    return {'report_markdown': report_markdown, 'report_json': report_json}


def evaluate_on_test_cases_tool(test_cases: List[Dict[str, Any]]) -> Dict[str, Any]:
    # Run the pipeline on synthetic cases and compute simple metrics.
    results = []
    correct_any = 0
    total = len(test_cases)
    mae = 0.0
    for case in test_cases:
        context = case.get('business_context', 'analytics')
        detection = detect_file_type_tool(case['input_text'])
        policies = select_policies_tool({'business_context': context})
        if detection['file_type'] == 'code':
            findings = scan_code_for_violations_tool(case['input_text'], policies['rules'])['findings']
        else:
            findings = scan_deck_for_violations_tool(case['input_text'], policies['rules'])['findings']
        map_findings_to_rules_tool(
            findings=findings,
            rules=policies['rules'],
            file_type=detection['file_type'],
            code_language=detection.get('code_language'),
            business_context=context,
        )
        results.append({'case': case.get('name', 'case'), 'predicted': len(findings), 'expected': case.get('expected_violation_count', 0)})
        mae += abs(len(findings) - case.get('expected_violation_count', 0))
        if (len(findings) > 0) == (case.get('expected_violation_count', 0) > 0):
            correct_any += 1
    mae = mae / total if total else 0
    accuracy = correct_any / total if total else 0
    explanation = f'Evaluated {total} cases: MAE={mae:.2f}, accuracy(any_violation)={accuracy:.2%}.'
    return {'case_results': results, 'mae': mae, 'accuracy_any': accuracy, 'explanation': explanation}


DETECT_FILE_TYPE = FunctionTool(fn=detect_file_type_tool, name='detect_file_type_tool')
SELECT_POLICIES = FunctionTool(fn=select_policies_tool, name='select_policies_tool')
SCAN_CODE = FunctionTool(fn=scan_code_for_violations_tool, name='scan_code_for_violations_tool')
SCAN_DECK = FunctionTool(fn=scan_deck_for_violations_tool, name='scan_deck_for_violations_tool')
MAP_FINDINGS = FunctionTool(fn=map_findings_to_rules_tool, name='map_findings_to_rules_tool')
RENDER_REPORT = FunctionTool(fn=render_compliance_report_tool, name='render_compliance_report_tool')
EVALUATE_CASES = FunctionTool(fn=evaluate_on_test_cases_tool, name='evaluate_on_test_cases_tool')


## Agent Definitions

The ADK agents are declared to mirror the multi-agent architecture. For offline execution we drive the tools directly, but these agent objects illustrate how orchestration, delegation, and reporting could be coordinated in an ADK workflow.


In [None]:
MODEL_NAME = 'gemini-2.5-flash'

orchestrator_agent = Agent(
    name='orchestrator',
    model=MODEL_NAME,
    instructions=(
        'You are the orchestration agent. Given user input you classify the artifact, '
        'select policies, delegate to specialized agents, and consolidate their outputs into a compliance report. '
        'Prefer calling tools and sub-agents over free-form answers.'
    ),
    tools=[DETECT_FILE_TYPE, SELECT_POLICIES, RENDER_REPORT],
)

code_compliance_agent = Agent(
    name='code_compliance_agent',
    model=MODEL_NAME,
    instructions=(
        'You analyze analytics code (Python or SQL) for policy and regulatory risks. '
        'Use the scanning tools and return only tool outputs.'
    ),
    tools=[SCAN_CODE, MAP_FINDINGS],
)

deck_compliance_agent = Agent(
    name='deck_compliance_agent',
    model=MODEL_NAME,
    instructions=(
        'You analyze business decks for risky claims and missing disclaimers using tools only.'
    ),
    tools=[SCAN_DECK, MAP_FINDINGS],
)

evaluation_agent = Agent(
    name='evaluation_agent',
    model=MODEL_NAME,
    instructions=(
        'You run evaluation suites on the compliance system using the provided tool, summarizing metrics only with tool outputs.'
    ),
    tools=[EVALUATE_CASES],
)


## Service Wrapper Using a Session

The helper below drives the end-to-end pipeline within a lightweight `Session`, showing how state can be reused across runs in the same notebook session.


In [None]:


def run_compliance_service(input_text: str, business_context: str = 'analytics') -> Dict[str, Any]:
    # Run the orchestrated compliance review and return Markdown plus JSON artifacts.
    session = Session()

    detection = detect_file_type_tool(input_text)
    policies = select_policies_tool({'business_context': business_context})

    if detection['file_type'] == 'code':
        raw_findings = scan_code_for_violations_tool(input_text, policies['rules']).get('findings', [])
    else:
        raw_findings = scan_deck_for_violations_tool(input_text, policies['rules']).get('findings', [])

    mapped = map_findings_to_rules_tool(
        findings=raw_findings,
        rules=policies['rules'],
        file_type=detection['file_type'],
        code_language=detection.get('code_language'),
        business_context=policies.get('business_context'),
    )

    report_payload = {'analysis': mapped, 'policy_explanation': policies.get('explanation')}
    rendered = render_compliance_report_tool(report_payload)

    session.last_report = rendered
    return rendered


## Demo: Running the Multi-Agent Compliance Service

We exercise the pipeline on two synthetic artifacts: a Python analytics snippet and a marketing deck with intentional compliance issues.


In [None]:
# Example 1: Python analytics code with PII logging and an external export
analytics_code = '''
import requests

def send_report(user_email, score):
    logger.info(f"user_email={user_email}")
    requests.post('http://external-collector.example.com', json={'email': user_email, 'score': score})
    return 'sent'
'''

analytics_report = run_compliance_service(analytics_code, business_context='analytics')
print(json.dumps(analytics_report['report_json'], indent=2))


In [None]:
# Display Markdown report for analytics example (rendering handled by notebooks)
from IPython.display import Markdown, display

display(Markdown(analytics_report['report_markdown']))


In [None]:
# Example 2: Marketing deck text with unapproved claims and no disclaimer
marketing_deck = '''
Slide 1: Product launch overview
Slide 2: Our customers enjoy guaranteed returns with zero risk
Slide 3: Performance metrics from Q1
'''

marketing_report = run_compliance_service(marketing_deck, business_context='marketing_deck')
print(json.dumps(marketing_report['report_json'], indent=2))


In [None]:
# Display Markdown report for marketing example
from IPython.display import Markdown, display

display(Markdown(marketing_report['report_markdown']))


## Evaluation & Discussion

A handful of synthetic cases are used to illustrate how evaluation tooling can summarize performance of the end-to-end compliance flow.


In [None]:
TEST_CASES = [
    {
        'name': 'clean_python',
        'input_text': "def add(a, b):
    return a + b",
        'expected_violation_count': 0,
        'business_context': 'analytics',
    },
    {
        'name': 'pii_in_logs',
        'input_text': "logger.info('email=alice@example.com')",
        'expected_violation_count': 1,
        'business_context': 'analytics',
    },
    {
        'name': 'marketing_claims',
        'input_text': 'Slide 1: guaranteed returns for all customers',
        'expected_violation_count': 2,
        'business_context': 'marketing_deck',
    },
    {
        'name': 'disclaimer_present',
        'input_text': 'Slide 1: Metrics
Slide 2: Disclaimer: results vary',
        'expected_violation_count': 0,
        'business_context': 'marketing_deck',
    },
]

eval_results = evaluate_on_test_cases_tool(TEST_CASES)
print(json.dumps(eval_results, indent=2))


The simple metrics above capture mean absolute error for predicted violation counts and accuracy for whether any violation was detected. Because the rule set and detection heuristics are intentionally lightweight, the evaluation is illustrative rather than definitive.
