Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from livekit.agents import (
APIConnectionError,
APIConnectOptions,
APIError,
APIStatusError,
APITimeoutError,
LanguageCode,
Expand Down Expand Up @@ -335,6 +336,8 @@ async def _run(self) -> None:
tasks_group.cancel()
tasks_group.exception()

except APIError:
raise
except asyncio.TimeoutError as e:
logger.error(
f"Timeout during Soniox Speech-to-Text API connection/initialization: {e}"
Expand Down Expand Up @@ -486,7 +489,8 @@ def send_endpoint_transcript() -> None:

try:
content = json.loads(msg.data)
tokens = content["tokens"]
has_error = bool(content.get("error_code") or content.get("error_message"))
tokens = content.get("tokens", []) if has_error else content["tokens"]

non_final = _TokenAccumulator()
non_final_original = _TokenAccumulator()
Expand Down Expand Up @@ -572,29 +576,36 @@ def send_endpoint_transcript() -> None:
)

# 3) on error or finish, flush any remaining final tokens.
if (
content.get("finished")
or content.get("error_code")
or content.get("error_message")
):
if content.get("finished") or has_error:
send_endpoint_transcript()
self._report_processed_audio_duration(total_audio_proc_ms)

if content.get("error_code") or content.get("error_message"):
logger.error(
f"WebSocket error: {content.get('error_code')}"
f" - {content.get('error_message')}"
if has_error:
err_code = content.get("error_code")
err_msg = content.get("error_message", "Unknown Soniox STT error")
logger.error(f"WebSocket error: {err_code} - {err_msg}")
status_code = int(err_code) if isinstance(err_code, int) else -1
if isinstance(err_code, str) and err_code.isdigit():
status_code = int(err_code)
raise APIStatusError(
f"Soniox STT error: {err_code} - {err_msg}",
status_code=status_code,
body=content,
)

if content.get("finished"):
logger.debug("Transcription finished")

except APIError:
raise
except Exception as e:
logger.exception(f"Error processing message: {e}")

except asyncio.CancelledError:
# Normal shutdown — don't trigger reconnect.
raise
except APIError:
raise
except aiohttp.ClientError as e:
logger.error(f"WebSocket error while receiving: {e}")
except Exception as e:
Expand Down
22 changes: 22 additions & 0 deletions tests/test_plugin_soniox_stt.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
from unittest.mock import MagicMock, patch

import aiohttp
import pytest

from livekit.agents import APIStatusError
from livekit.agents.language import LanguageCode
from livekit.agents.stt import SpeechData, SpeechEventType

Expand Down Expand Up @@ -423,6 +425,26 @@ async def test_interim_transcript_no_translation_populates_source_runs():
assert sd.target_texts is None


async def test_recv_messages_raises_on_server_error_frame():
stream = _make_stream(translation=None)
stream._ws = _FakeWebSocket(
[
{
"error_code": 401,
"error_message": "Incorrect API key provided",
"total_audio_proc_ms": 0,
}
]
)

with pytest.raises(APIStatusError) as exc_info:
await asyncio.wait_for(stream._recv_messages_task(), timeout=1.0)

assert exc_info.value.status_code == 401
assert exc_info.value.retryable is False
assert exc_info.value.body is not None


# --- Non-translation mode -------------------------------------------------


Expand Down
Loading