Skip to content

feat(voice): Add v2 spike handler for 45-min behavioral interviews#58

Merged
xCatG merged 20 commits into
mainfrom
feature/voice-handler-v2-spike
Mar 21, 2026
Merged

feat(voice): Add v2 spike handler for 45-min behavioral interviews#58
xCatG merged 20 commits into
mainfrom
feature/voice-handler-v2-spike

Conversation

@xCatG
Copy link
Copy Markdown
Owner

@xCatG xCatG commented Mar 21, 2026

Summary

Prototype voice handler (handler_v2_spike.py) addressing all gaps from the voice handler gap report:

  • Session resumption — persistent Gemini handle storage via StorageBackend (REQ-1/4)
  • Context window compression — token-count trigger with sliding window strategy (REQ-2)
  • Transparent GoAway handling — automatic reconnection (REQ-3)
  • Wall clock timer — 45-min session limit with 5-min warning injection (REQ-6)
  • Unified termination — 5 reasons (USER_ENDED, TIME_LIMIT, AI_CONCLUDED, AI_EARLY_TERMINATION, DISCONNECTED) funnel through _handle_termination() (REQ-5)
  • Transcript suppression — blocks transcript events from reaching frontend (REQ-7)

Architecture highlights

  • Four-coroutine task racing: _receive_from_client, _send_to_client, _session_timer, _heartbeat orchestrated via asyncio.wait(FIRST_COMPLETED)
  • Sync classifier + async router: Pure classify_adk_event()_send_to_client() routing
  • Typed state: VoiceSessionState dataclass replaces v1's Dict[str, Any]
  • ADK type stubs: Module-level _Blob/_Content/_Part avoid hard google.genai dependency in tests
  • No existing files modified — all new code in 2 new files

Files

File Lines Purpose
src/python/role_play/voice/handler_v2_spike.py 707 V2 handler implementation
test/python/unit/voice/test_handler_v2_spike.py 745 60 unit tests across 12 test classes

Test plan

  • 60/60 unit tests passing (pytest -o "addopts=" test/python/unit/voice/)
  • Code review + simplification pass completed
  • Integration test with real WebSocket connection (future)
  • Manual test against Gemini Live API (future)
  • Load test for concurrent sessions (future)

🤖 Generated with Claude Code

xCatG and others added 15 commits March 20, 2026 09:48
Design for handler_v2_spike.py addressing all gaps from the voice
handler gap report: session resumption, context compression, GoAway
handling, wall clock timer, five termination reasons, and transcript
suppression. Validated through two rounds of spec review.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add transcript_buffer field to VoiceSessionState for evaluation
- Heartbeat ConnectionError now calls _handle_termination(DISCONNECTED)
- Flag session state warning approach as "likely won't work"
- Use RPS_ prefix on sentinels to avoid false positives
- Guard WebSocket send in _handle_termination with try/except
- Compression config gracefully skipped if SDK lacks support

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
11-task TDD plan covering config, dataclasses, event classifier,
termination handler, timer, heartbeat, event router, client receiver,
orchestrator, entry point, and final verification. Validated through
plan review with fixes for imports, stats counter, and spec deviation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…persistence

Implements _send_to_client routing audio/transcript/turn_status/session_resumption/go_away/unknown ADK events; transcripts are logged but not forwarded to frontend (REQ-7). Also fixes _make_state factory to use AsyncMock for chat_logger so awaitable calls work correctly in all tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements _receive_from_client for VoiceHandlerV2: routes audio/pcm
blobs to send_realtime, text/plain to send_content with logging,
end_session to USER_ENDED termination, and WebSocketDisconnect to
DISCONNECTED termination. Invalid JSON increments error stats.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…full test suite

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… handling

- Extract _Blob/_Content/_Part stubs to module level (was duplicated 3x)
- Fix `except (ConnectionError, Exception)` → split into proper blocks
- Use setdefault+increment for stats instead of .get() fallback
- Share _make_event() test helper at module level across test classes
- Simplify TestSendToClient mock setup using shared helper

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented Mar 21, 2026

Claude encountered an error —— View job


I'll analyze this and get back to you.

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new, experimental voice handler (V2 spike) specifically designed for 45-minute behavioral interviews. The primary goal is to address several critical limitations identified in the previous voice handler, focusing on improving session stability, resilience, and intelligent interaction with the underlying ADK Live API. Key improvements include advanced session state management, robust error handling, and a modular, concurrent architecture that allows for better control over the streaming lifecycle.

Highlights

  • Enhanced Session Management: Implemented robust session resumption, a 45-minute wall clock timer with a 5-minute warning, and unified termination handling for various exit scenarios.
  • Optimized ADK Interaction: Introduced context window compression and transparent handling of GoAway events for improved stability and efficiency with the ADK Live API.
  • Streamlined Event Processing: Developed a clear separation between synchronous event classification and asynchronous event routing, including suppression of raw transcripts from the frontend.
  • Concurrent Architecture: Utilized a four-coroutine task racing model (receive, send, timer, heartbeat) with asyncio.Event for coordinated session lifecycle management.
  • New Codebase: All features are implemented in new files (handler_v2_spike.py, voice_config_v2.py, and corresponding tests) to ensure no regressions in existing functionality.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a prototype v2 voice handler, which is a significant feature addition. The implementation is well-structured, particularly with the use of a typed session state dataclass and the separation of concerns in event processing. The asynchronous task orchestration is robust. My review focuses on improving type safety, consistency with existing project utilities, and ensuring test accuracy. I've identified a few areas for improvement, including leveraging Pydantic models for request validation, standardizing imports and utility function usage, and refining a test mock to better reflect production behavior.

Comment on lines +434 to +517
async def _receive_from_client(
self,
websocket,
state: VoiceSessionState,
env_info,
) -> None:
"""Receive from client and forward to ADK."""
try:
while not state.stop_event.is_set():
data = await websocket.receive_text()

try:
request = json.loads(data)
except (json.JSONDecodeError, ValueError) as e:
logger.warning(f"Invalid JSON from client: {e}")
state.stats.setdefault("errors", 0)
state.stats["errors"] += 1
continue

mime_type = request.get("mime_type", "")
raw_data = request.get("data", "")
end_session = request.get("end_session", False)

if end_session:
await self._handle_termination(
websocket, state, VoiceConfigV2.REASON_USER_ENDED,
)
return

try:
decoded = base64.b64decode(raw_data)
except Exception:
state.stats.setdefault("errors", 0)
state.stats["errors"] += 1
continue

try:
if mime_type == "audio/pcm":
# PCM logging in non-production
if not getattr(env_info, "is_production", True):
try:
await state.chat_logger.log_pcm_audio(
user_id=state.user_id,
session_id=state.session_id,
audio_data=decoded,
)
except (AttributeError, Exception) as e:
logger.debug(f"PCM logging skipped: {e}")

blob = _Blob(mime_type=mime_type, data=decoded)
state.live_request_queue.send_realtime(blob)
state.stats.setdefault("audio_chunks_sent", 0)
state.stats["audio_chunks_sent"] += 1

elif mime_type == "text/plain":
text_data = decoded.decode("utf-8")
await state.chat_logger.log_message(
user_id=state.user_id,
session_id=state.session_id,
role="user",
content=text_data,
message_number=-1,
)
content = _Content(parts=[_Part(text=text_data)])
state.live_request_queue.send_content(content)
state.stats.setdefault("text_chunks_sent", 0)
state.stats["text_chunks_sent"] += 1

except Exception as e:
logger.error(f"Error processing client input: {e}")
state.stats.setdefault("errors", 0)
state.stats["errors"] += 1

except WebSocketDisconnect:
logger.info(f"Client disconnected from session {state.session_id}")
await self._handle_termination(
websocket, state, VoiceConfigV2.REASON_DISCONNECTED,
)
except Exception as e:
logger.error(f"Error in _receive_from_client: {e}")
await self._handle_termination(
websocket, state, VoiceConfigV2.REASON_DISCONNECTED,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation manually parses the incoming JSON and uses dict.get() to extract fields. This bypasses the Pydantic VoiceRequest model which is designed for this purpose and includes validation for mime_type and data decoding. Using the Pydantic model would make the code safer, more readable, and consistent with the project's standards. Please also move the from .models import VoiceRequest import to the top of the file.

    async def _receive_from_client(
        self,
        websocket,
        state: VoiceSessionState,
        env_info,
    ) -> None:
        """Receive from client and forward to ADK."""
        try:
            while not state.stop_event.is_set():
                data = await websocket.receive_text()

                try:
                    from .models import VoiceRequest
                    request = VoiceRequest.model_validate_json(data)
                except (json.JSONDecodeError, ValueError) as e:
                    logger.warning(f"Invalid JSON from client: {e}")
                    state.stats.setdefault("errors", 0)
                    state.stats["errors"] += 1
                    continue

                if request.end_session:
                    await self._handle_termination(
                        websocket, state, VoiceConfigV2.REASON_USER_ENDED,
                    )
                    return

                try:
                    decoded = request.decode_data()
                except ValueError as e:
                    logger.warning(f"Data decoding error: {e}")
                    state.stats.setdefault("errors", 0)
                    state.stats["errors"] += 1
                    continue

                try:
                    if request.mime_type == "audio/pcm":
                        # PCM logging in non-production
                        if not getattr(env_info, "is_production", True):
                            try:
                                await state.chat_logger.log_pcm_audio(
                                    user_id=state.user_id,
                                    session_id=state.session_id,
                                    audio_data=decoded,
                                )
                            except (AttributeError, Exception) as e:
                                logger.debug(f"PCM logging skipped: {e}")

                        blob = _Blob(mime_type=request.mime_type, data=decoded)
                        state.live_request_queue.send_realtime(blob)
                        state.stats.setdefault("audio_chunks_sent", 0)
                        state.stats["audio_chunks_sent"] += 1

                    elif request.mime_type == "text/plain":
                        text_data = decoded
                        await state.chat_logger.log_message(
                            user_id=state.user_id,
                            session_id=state.session_id,
                            role="user",
                            content=text_data,
                            message_number=-1,
                        )
                        content = _Content(parts=[_Part(text=text_data)])
                        state.live_request_queue.send_content(content)
                        state.stats.setdefault("text_chunks_sent", 0)
                        state.stats["text_chunks_sent"] += 1

                except Exception as e:
                    logger.error(f"Error processing client input: {e}")
                    state.stats.setdefault("errors", 0)
                    state.stats["errors"] += 1

        except WebSocketDisconnect:
            logger.info(f"Client disconnected from session {state.session_id}")
            await self._handle_termination(
                websocket, state, VoiceConfigV2.REASON_DISCONNECTED,
            )
        except Exception as e:
            logger.error(f"Error in _receive_from_client: {e}")
            await self._handle_termination(
                websocket, state, VoiceConfigV2.REASON_DISCONNECTED,
            )

"type": "session_ended",
"reason": reason,
"detail": detail,
"timestamp": datetime.now(timezone.utc).isoformat(),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with the existing codebase and to centralize time-related logic, please use the utc_now_isoformat utility function from role_play.common.time_utils instead of datetime.now(timezone.utc).isoformat(). This applies to several places in this file (e.g., lines 325, 360, 402, 658, 678). Please also add the import from ..common.time_utils import utc_now_isoformat at the top of the file.

Suggested change
"timestamp": datetime.now(timezone.utc).isoformat(),
"timestamp": utc_now_isoformat(),

Comment on lines +579 to +588
try:
from ..common.exceptions import TokenExpiredError, AuthenticationError
from ..server.dependencies import get_auth_manager
auth_manager = get_auth_manager(storage)
token_data = auth_manager.verify_token(token)
user = await storage.get_user(token_data.user_id)
return user
except Exception as e:
logger.error(f"JWT validation error: {e}")
return None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Imports should be at the top of the file, as per PEP 8 guidelines. Placing them inside a function can lead to circular import issues, slower execution on repeated calls, and makes dependencies harder to track. Please move these imports to the top of the file.

Comment on lines +679 to +681
async def fast_receive(*a, **kw):
state.stop_event.set()
state.termination_reason = "USER_ENDED"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This mock for fast_receive manually sets stop_event and termination_reason. To make the test more accurately reflect the production logic, it would be better to have the mock call handler._handle_termination, which is the designated single point for session termination.

Suggested change
async def fast_receive(*a, **kw):
state.stop_event.set()
state.termination_reason = "USER_ENDED"
async def fast_receive(*a, **kw):
await handler._handle_termination(ws, state, "USER_ENDED")

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 542d417a15

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +406 to +409
handle_key = VoiceConfigV2.HANDLE_KEY_TEMPLATE.format(
user_id=state.user_id, session_id=state.session_id,
)
await self._storage.write(handle_key, classification.data)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Implement the read side of session resumption

This only persists the Gemini resumption handle; I couldn't find any corresponding read/use path in handler_v2_spike.py (repo-wide search for HANDLE_KEY_TEMPLATE only returns this write and the constant). If a websocket drops mid-interview, the reconnect path has no way to pass the saved handle back into ADK, so the user starts a fresh live session instead of resuming the existing one.

Useful? React with 👍 / 👎.

Comment on lines +266 to +268
await asyncio.wait_for(
state.stop_event.wait(),
timeout=state.warning_seconds,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Enforce the timeout once the session is already past warning

If elapsed >= session_timeout - warning_seconds—for example with any custom timeout shorter than 5 minutes, or after reconnecting to a session that's already near/over the 45-minute cap—phase 1 times out immediately and this branch still waits the full warning_seconds. That lets expired sessions continue for up to another 300 seconds before TIME_LIMIT fires, which breaks the advertised absolute wall-clock limit.

Useful? React with 👍 / 👎.

Comment on lines +648 to +651
# Note: _initialize_adk would be called here in production
# For the spike tests, we test the auth flow only
# state = await self._initialize_adk(...)
# await self._handle_streaming(websocket, state, env_info)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Start ADK streaming after successful authentication

After auth succeeds, handle_voice_session() sends the initial connecting status and then returns because the ADK initialization and _handle_streaming() call are commented out. Any client that tries this handler will get a stuck connection with no config/ready message and none of the v2 timer, transcript, or termination logic will ever execute.

Useful? React with 👍 / 👎.

Four smoke tests validating real API assumptions:
1. Type stub compatibility (_Blob/_Content/_Part vs google.genai types)
2. Live session round-trip (send text, receive audio/transcript)
3. Session resumption handle emission
4. classify_adk_event against real Gemini events

Excluded from default test runs via pytest marker (-m "not smoke").
Run with: GOOGLE_API_KEY=<key> pytest -m smoke test/python/smoke/ -v -s

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented Mar 21, 2026

Claude encountered an error —— View job


I'll analyze this and get back to you.

Use gemini-2.5-flash-live-preview (matching v1 handler) instead of
non-existent gemini-2.5-flash-preview-native-audio-dialog.
Model is now configurable via GEMINI_LIVE_MODEL env var.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented Mar 21, 2026

Claude encountered an error —— View job


I'll analyze this and get back to you.

…o-preview-12-2025

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented Mar 21, 2026

Claude encountered an error —— View job


I'll analyze this and get back to you.

…w_isoformat

- Fix session timer edge case where reconnecting to a near-expired
  session could run 300s past the wall clock limit (Codex #2).
  Timer now terminates immediately if already past deadline, and
  caps phase 2 wait to actual remaining time.
- Replace manual JSON parsing with VoiceRequest Pydantic model in
  _receive_from_client for validation consistency (Gemini #1).
- Replace datetime.now(timezone.utc).isoformat() with
  utc_now_isoformat() from time_utils across 7 call sites (Gemini #2).
- Add 2 new timer tests: already-expired session, past-warning-threshold.

54 tests passing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented Mar 21, 2026

Claude encountered an error —— View job


I'll analyze this and get back to you.

- Add _initialize_adk() method that creates Runner, LiveRequestQueue,
  and returns a typed VoiceSessionState
- Add router property with WebSocket endpoint at /voice-v2/ws/{session_id}
- Extend BaseHandler for compatibility with dynamic handler registration
- Register voice_v2 handler in dev.yaml config
- Entry point now runs: auth → ADK init → config → streaming → cleanup
- Constructor accepts optional storage (injectable for tests, defaults
  to get_storage_backend() for production)

Uses gemini-2.5-flash-native-audio-preview-12-2025 model.
54 unit tests + 20 v1 tests passing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented Mar 21, 2026

Claude encountered an error —— View job


I'll analyze this and get back to you.

@xCatG xCatG merged commit 356396f into main Mar 21, 2026
2 of 3 checks passed
@xCatG xCatG deleted the feature/voice-handler-v2-spike branch March 21, 2026 19:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant