Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 95 additions & 22 deletions tests/test_agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ async def on_user_turn_completed(self, turn_ctx: ChatContext, new_message: ChatM


SESSION_TIMEOUT = 60.0
EVENT_TIMESTAMP_EARLY_TOLERANCE = 0.75
EVENT_TIMESTAMP_LATE_TOLERANCE = 1.25


async def test_events_and_metrics() -> None:
Expand Down Expand Up @@ -102,35 +104,51 @@ async def test_events_and_metrics() -> None:
assert conversation_events[1].item.type == "message"
assert conversation_events[1].item.role == "user"
assert conversation_events[1].item.text_content == "Hello, how are you?"
check_timestamp(conversation_events[1].created_at - t_origin, 3.0, speed_factor=speed)
check_wallclock_event_timestamp(
conversation_events[1].created_at - t_origin, 3.0, speed_factor=speed
)
assert conversation_events[2].item.type == "message"
assert conversation_events[2].item.role == "assistant"
assert conversation_events[2].item.text_content == "I'm doing well, thank you!"
check_timestamp(conversation_events[2].created_at - t_origin, 5.5, speed_factor=speed)
check_wallclock_event_timestamp(
conversation_events[2].created_at - t_origin, 5.5, speed_factor=speed
)

# user_input_transcribed
assert len(user_transcription_events) >= 1
assert user_transcription_events[-1].transcript == "Hello, how are you?"
assert user_transcription_events[-1].is_final is True
check_timestamp(user_transcription_events[-1].created_at - t_origin, 2.7, speed_factor=speed)
check_wallclock_event_timestamp(
user_transcription_events[-1].created_at - t_origin, 2.7, speed_factor=speed
)

# user_state_changed
assert len(user_state_events) == 2
check_timestamp(user_state_events[0].created_at - t_origin, 0.5, speed_factor=speed)
check_wallclock_event_timestamp(
user_state_events[0].created_at - t_origin, 0.5, speed_factor=speed
)
assert user_state_events[0].new_state == "speaking"
check_timestamp(user_state_events[1].created_at - t_origin, 3.0, speed_factor=speed)
check_wallclock_event_timestamp(
user_state_events[1].created_at - t_origin, 3.0, speed_factor=speed
)
assert user_state_events[1].new_state == "listening"

# agent_state_changed
assert len(agent_state_events) == 4
assert agent_state_events[0].old_state == "initializing"
assert agent_state_events[0].new_state == "listening"
assert agent_state_events[1].new_state == "thinking"
check_timestamp(agent_state_events[1].created_at - t_origin, 3.0, speed_factor=speed)
check_wallclock_event_timestamp(
agent_state_events[1].created_at - t_origin, 3.0, speed_factor=speed
)
assert agent_state_events[2].new_state == "speaking"
check_timestamp(agent_state_events[2].created_at - t_origin, 3.5, speed_factor=speed)
check_wallclock_event_timestamp(
agent_state_events[2].created_at - t_origin, 3.5, speed_factor=speed
)
assert agent_state_events[3].new_state == "listening"
check_timestamp(agent_state_events[3].created_at - t_origin, 5.5, speed_factor=speed)
check_wallclock_event_timestamp(
agent_state_events[3].created_at - t_origin, 5.5, speed_factor=speed
)

# metrics
metrics_events = [ev for ev in metrics_events if ev.metrics.type != "vad_metrics"]
Expand Down Expand Up @@ -188,7 +206,9 @@ async def test_tool_call() -> None:
assert (
agent_state_events[3].new_state == "thinking"
) # from speaking to thinking when tool call is executed
check_timestamp(agent_state_events[3].created_at - t_origin, 5.5, speed_factor=speed)
check_wallclock_event_timestamp(
agent_state_events[3].created_at - t_origin, 5.5, speed_factor=speed
)
assert agent_state_events[4].new_state == "speaking"
assert agent_state_events[5].new_state == "listening"

Expand Down Expand Up @@ -263,13 +283,17 @@ async def test_interruption(
assert agent_state_events[1].new_state == "thinking"
assert agent_state_events[2].new_state == "speaking"
assert agent_state_events[3].new_state == "listening"
check_timestamp(
check_wallclock_event_timestamp(
agent_state_events[3].created_at - t_origin, expected_interruption_time, speed_factor=speed
)
assert agent_state_events[4].new_state == "thinking"
check_timestamp(agent_state_events[4].created_at - t_origin, 6.5, speed_factor=speed)
check_wallclock_event_timestamp(
agent_state_events[4].created_at - t_origin, 6.5, speed_factor=speed
)
assert agent_state_events[5].new_state == "listening"
check_timestamp(agent_state_events[5].created_at - t_origin, 6.5, speed_factor=speed)
check_wallclock_event_timestamp(
agent_state_events[5].created_at - t_origin, 6.5, speed_factor=speed
)

assert len(playback_finished_events) == 1
assert playback_finished_events[0].interrupted is True
Expand Down Expand Up @@ -415,7 +439,7 @@ async def test_interruption_before_speaking(
assert agent_state_events[0].new_state == "listening"
assert agent_state_events[1].new_state == "thinking" # without speaking state
assert agent_state_events[2].new_state == "listening"
check_timestamp(
check_wallclock_event_timestamp(
agent_state_events[2].created_at - t_origin, expected_interruption_time, speed_factor=speed
) # interrupted at 3.5s
assert agent_state_events[3].new_state == "thinking"
Expand Down Expand Up @@ -472,7 +496,9 @@ async def test_interrupt_before_speaking_with_pausable_audio() -> None:
assert agent_state_events[0].new_state == "listening"
assert agent_state_events[1].new_state == "thinking"
assert agent_state_events[2].new_state == "listening"
check_timestamp(agent_state_events[2].created_at - t_origin, 3.5, speed_factor=speed)
check_wallclock_event_timestamp(
agent_state_events[2].created_at - t_origin, 3.5, speed_factor=speed
)

# nothing audible reached the transport — the pause cleanup emits a single
# playback_finished with interrupted=True and playback_position=0
Expand Down Expand Up @@ -520,7 +546,9 @@ async def test_false_interruption_before_speaking_resumes() -> None:

# playout was postponed: the noise ran 3.0–3.3s, so "speaking" should fire at
# ~3.8s (resume on VAD EOS=3.3s + 0.5s min_silence_duration)
check_timestamp(speaking_events[0].created_at - t_origin, 3.8, speed_factor=speed)
check_wallclock_event_timestamp(
speaking_events[0].created_at - t_origin, 3.8, speed_factor=speed
)

# the reply plays to completion (not interrupted); playback_position covers the
# full audio duration
Expand Down Expand Up @@ -584,22 +612,32 @@ async def test_generate_reply() -> None:
assert conversation_events[1].item.type == "message"
assert conversation_events[1].item.role == "assistant"
assert conversation_events[1].item.text_content == "What can I do for you!"
check_timestamp(conversation_events[1].created_at - t_origin, 2.5, speed_factor=speed)
check_wallclock_event_timestamp(
conversation_events[1].created_at - t_origin, 2.5, speed_factor=speed
)
assert conversation_events[2].item.type == "message"
assert conversation_events[2].item.role == "user"
assert conversation_events[2].item.text_content == "bye"
check_timestamp(conversation_events[2].created_at - t_origin, 4.5, speed_factor=speed)
check_wallclock_event_timestamp(
conversation_events[2].created_at - t_origin, 4.5, speed_factor=speed
)
assert conversation_events[3].item.type == "message"
assert conversation_events[3].item.role == "assistant"
assert conversation_events[3].item.text_content == "session.say from on_user_turn_completed"
check_timestamp(
conversation_events[3].created_at - t_origin, 5.5, speed_factor=speed, max_abs_diff=1.0
check_wallclock_event_timestamp(
conversation_events[3].created_at - t_origin,
5.5,
speed_factor=speed,
max_early_diff=1.0,
)
assert conversation_events[4].item.type == "message"
assert conversation_events[4].item.role == "assistant"
assert conversation_events[4].item.text_content == "Goodbye! have a nice day!"
check_timestamp(
conversation_events[4].created_at - t_origin, 9.0, speed_factor=speed, max_abs_diff=1.0
check_wallclock_event_timestamp(
conversation_events[4].created_at - t_origin,
9.0,
speed_factor=speed,
max_early_diff=1.0,
)

# chat context
Expand Down Expand Up @@ -663,7 +701,9 @@ async def test_aec_warmup() -> None:
assert agent_state_events[2].new_state == "speaking"
# interruption delayed to 5.5s (EOU), not 4.5s (VAD was blocked by warmup)
speaking_to_listening = next(e for e in agent_state_events[3:] if e.new_state == "listening")
check_timestamp(speaking_to_listening.created_at - t_origin, 5.5, speed_factor=speed)
check_wallclock_event_timestamp(
speaking_to_listening.created_at - t_origin, 5.5, speed_factor=speed
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -709,6 +749,8 @@ async def test_preemptive_generation(preemptive_generation: dict, expected_laten
assert agent_state_events[1].new_state == "thinking"
assert agent_state_events[2].new_state == "speaking"
t_agent_start_speaking = agent_state_events[2].created_at
# This compares two event timestamps from the same turn, so scheduler
# latency largely cancels out and the latency invariant can stay strict.
check_timestamp(
t_agent_start_speaking - t_user_stop_speaking,
t_target=expected_latency,
Expand Down Expand Up @@ -843,3 +885,34 @@ def check_timestamp(
assert abs(t_event - t_target) <= max_abs_diff, (
f"event timestamp {t_event} is not within {max_abs_diff} of target {t_target}"
)


def check_wallclock_event_timestamp(
t_event: float,
t_target: float,
*,
speed_factor: float = 1.0,
max_early_diff: float = EVENT_TIMESTAMP_EARLY_TOLERANCE,
max_late_diff: float = EVENT_TIMESTAMP_LATE_TOLERANCE,
) -> None:
"""
Check wall-clock event timestamps while preserving a stricter bound for early events.

These tests run the fake pipeline faster than real time. Under CI load, asyncio
callbacks can be dispatched slightly late after scaling, but an event that fires
too early is still a behavioral regression.
Use this for created_at event timestamps; keep check_timestamp for deterministic
pipeline durations, playback positions, and same-turn deltas.
"""
t_event = t_event * speed_factor
diff = t_event - t_target
print(
"check_wallclock_event_timestamp: "
f"t_event: {t_event}, t_target: {t_target}, "
f"max_early_diff: {max_early_diff}, "
f"max_late_diff: {max_late_diff}"
)
assert -max_early_diff <= diff <= max_late_diff, (
f"event timestamp {t_event} differs from target {t_target} by {diff}; "
f"allowed range is -{max_early_diff}/+{max_late_diff}"
)