Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions projects/app_picoclaw/app.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
id: picoclaw
name: PicoClaw
name[zh]: PicoClaw
version: 1.0.0
icon: img/icon.png
author: Sipeed Ltd
desc: PicoClaw
desc[zh]: PicoClaw
files:
app.yaml: app.yaml
asr: asr
img: img
51 changes: 51 additions & 0 deletions projects/app_picoclaw/asr/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import importlib
import logging

from .config import load_asr_config

logger = logging.getLogger(__name__)

_BACKEND_REGISTRY: list[tuple[str, str]] = [
("qwen3-asr-flash-realtime", ".qwen_realtime"),
("qwen3-asr-flash", ".qwen"),
("whisper", ".whisper"),
("scribe_v1", ".elevenlabs"),
]


class ASRNotConfiguredError(Exception):
"""Raised when no ASR model is configured."""


def _resolve_backend(model: str) -> str:
"""Return the module path for the given model name."""
for prefix, module_path in _BACKEND_REGISTRY:
if model.startswith(prefix):
return module_path
raise ValueError(
f"No ASR backend registered for model '{model}'. "
f"Known prefixes: {[p for p, _ in _BACKEND_REGISTRY]}"
)


def get_asr_backend(use_cache: bool = True):
prefixes = [p for p, _ in _BACKEND_REGISTRY]
model, api_key = load_asr_config(use_cache=use_cache, prefixes=prefixes)
if not model:
raise ASRNotConfiguredError(
"No ASR model configured."
)

module_path = _resolve_backend(model)
logger.info("ASR routing: model=%s → %s", model, module_path)

mod = importlib.import_module(module_path, package=__name__)
return mod.asr_session


try:
asr_session = get_asr_backend()
except ASRNotConfiguredError:
asr_session = None

__all__ = ["asr_session", "get_asr_backend", "ASRNotConfiguredError"]
110 changes: 110 additions & 0 deletions projects/app_picoclaw/asr/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import logging
import os
from pathlib import Path

logger = logging.getLogger(__name__)

SECURITY_YML_PATH = Path(
os.environ.get("PICOCLAW_SECURITY_YML", "/root/.picoclaw/.security.yml")
)

_cached_config: tuple[str, str] | None = None


def load_asr_config(prefixes: list[str] | None = None, use_cache: bool = True) -> tuple[str, str]:
global _cached_config
if use_cache and _cached_config is not None:
return _cached_config
env_model = os.environ.get("ASR_MODEL", "").strip()
env_key = os.environ.get("DASHSCOPE_API_KEY", "").strip()

if env_model and env_key:
_cached_config = (env_model, env_key)
return _cached_config

# Try .security.yml
yml_result = _load_from_yml(prefixes)
if yml_result is not None:
yml_model, yml_key = yml_result
model = env_model or yml_model
key = env_key or yml_key
if model and key:
_cached_config = (model, key)
return _cached_config

# Fallback
model = env_model or ""
key = env_key or ""
result = (model, key)
if model and key:
_cached_config = result
return result


def _load_from_yml(prefixes: list[str] | None = None) -> tuple[str, str] | None:
try:
if not SECURITY_YML_PATH.exists():
return None
text = SECURITY_YML_PATH.read_text(encoding="utf-8")
return _parse_yml(text, prefixes)
except Exception as exc:
logger.debug("Failed to read %s: %s", SECURITY_YML_PATH, exc)
return None


def _parse_yml(text: str, prefixes: list[str] | None = None) -> tuple[str, str] | None:
"""Extract first model block (matching prefixes) with an api_key.

Expected structure (indent = 2 spaces per level):
<model-name>:0:
api_keys:
- <key>
"""
lines = text.splitlines()
found_model = ""
in_model = False
in_api_keys = False

for raw in lines:
line = raw.rstrip()
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue

indent = len(line) - len(line.lstrip(" "))

if not in_model:
if indent == 2 and stripped.endswith(":0:"):
candidate = stripped[:-3]
if prefixes is None or any(candidate.startswith(p) for p in prefixes):
found_model = candidate
in_model = True
continue

# Moved to another top-level model block
if indent <= 2 and stripped.endswith(":0:"):
# Check if this new block also matches
candidate = stripped[:-3]
in_model = False
in_api_keys = False
if prefixes is None or any(candidate.startswith(p) for p in prefixes):
found_model = candidate
in_model = True
continue
if indent == 0:
break

if indent == 4 and stripped == "api_keys:":
in_api_keys = True
continue

if in_api_keys:
if indent <= 4:
in_api_keys = False
continue
if stripped.startswith("- "):
key = stripped[2:].strip().strip('"').strip("'")
if key:
return found_model, key

return None
71 changes: 71 additions & 0 deletions projects/app_picoclaw/asr/elevenlabs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import asyncio
import io
import logging
import wave

import numpy as np
import requests

from .config import load_asr_config

logger = logging.getLogger(__name__)

API_URL = "https://api.elevenlabs.io/v1/speech-to-text"


def _pcm_to_wav_bytes(pcm_int16: np.ndarray, sample_rate: int = 16000) -> bytes:
"""Convert int16 PCM samples to WAV file bytes."""
buf = io.BytesIO()
with wave.open(buf, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2) # 16-bit
wf.setframerate(sample_rate)
wf.writeframes(pcm_int16.tobytes())
return buf.getvalue()


async def asr_session(pcm_data: np.ndarray) -> str:
if len(pcm_data) < 3200:
logger.info("Audio too short (%d samples), skipping recognition", len(pcm_data))
return ""

# Convert normalized float32 PCM to int16 PCM
pcm_int16 = (pcm_data * 32768).clip(-32768, 32767).astype(np.int16)

model, api_key = load_asr_config()
if not api_key:
logger.error("API key not found")
return ""

logger.debug("ASR model: %s (ElevenLabs)", model)

wav_bytes = _pcm_to_wav_bytes(pcm_int16)

headers = {
"Xi-Api-Key": api_key,
}
files = {
"file": ("audio.wav", wav_bytes, "audio/wav"),
}
data = {
"model_id": model,
}

loop = asyncio.get_event_loop()
resp = await loop.run_in_executor(
None,
lambda: requests.post(API_URL, headers=headers, files=files, data=data, timeout=120),
)
Comment on lines +54 to +58

if resp.status_code != 200:
logger.error("ElevenLabs API error %d: %s", resp.status_code, resp.text)
return ""

try:
result = resp.json()
transcript = result["text"]
logger.info("Recognized: %s", transcript)
return transcript.strip()
except (KeyError, TypeError) as exc:
logger.error("Failed to parse ElevenLabs response: %s — %s", exc, resp.text)
return ""
83 changes: 83 additions & 0 deletions projects/app_picoclaw/asr/qwen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import base64
import io
import logging
import wave

import numpy as np
import requests

from .config import load_asr_config

logger = logging.getLogger(__name__)

API_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions"


def _pcm_to_base64_wav(pcm_int16: np.ndarray, sample_rate: int = 16000) -> str:
"""Convert int16 PCM samples to a base64-encoded WAV data URI."""
buf = io.BytesIO()
with wave.open(buf, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2) # 16-bit
wf.setframerate(sample_rate)
wf.writeframes(pcm_int16.tobytes())
wav_bytes = buf.getvalue()
b64 = base64.b64encode(wav_bytes).decode("utf-8")
return f"data:audio/wav;base64,{b64}"


async def asr_session(pcm_data: np.ndarray) -> str:
if len(pcm_data) < 3200:
logger.info("Audio too short (%d samples), skipping recognition", len(pcm_data))
return ""

# Convert normalized float32 PCM to int16 PCM
pcm_int16 = (pcm_data * 32768).clip(-32768, 32767).astype(np.int16)

model, api_key = load_asr_config()
if not api_key:
logger.error("API key not found (DASHSCOPE_API_KEY / .security.yml)")
return ""

logger.debug("ASR model: %s (non-realtime)", model)

data_uri = _pcm_to_base64_wav(pcm_int16)

payload = {
"model": model,
"messages": [
{
"role": "user",
"content": [
{"type": "input_audio", "input_audio": {"data": data_uri, "format": "wav"}}
],
}
],
"stream": False,
"asr_options": {"enable_itn": False},
}

headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}

import asyncio
loop = asyncio.get_event_loop()
resp = await loop.run_in_executor(
None,
lambda: requests.post(API_URL, json=payload, headers=headers, timeout=120),
)
Comment on lines +65 to +70

if resp.status_code != 200:
logger.error("ASR API error %d: %s", resp.status_code, resp.text)
return ""

try:
data = resp.json()
transcript = data["choices"][0]["message"]["content"]
logger.info("Recognized: %s", transcript)
return transcript.strip()
except (KeyError, IndexError, TypeError) as exc:
logger.error("Failed to parse ASR response: %s — %s", exc, resp.text)
return ""
Loading
Loading