diff --git a/examples/speech-gateway/.golangci.yml b/examples/speech-gateway/.golangci.yml new file mode 100644 index 00000000..e4ee18ab --- /dev/null +++ b/examples/speech-gateway/.golangci.yml @@ -0,0 +1,26 @@ +version: "2" +linters: + default: none + enable: + - bidichk + - errcheck + - govet + - ineffassign + - makezero + - misspell + - revive + - staticcheck + - unconvert + - unqueryvet + - unused + - whitespace + settings: + govet: + disable: + - fieldalignment + enable-all: true + unqueryvet: + check-sql-builders: true +issues: + max-issues-per-linter: 0 # no maximum + max-same-issues: 0 # no maximum diff --git a/examples/speech-gateway/README.md b/examples/speech-gateway/README.md new file mode 100644 index 00000000..98c0d316 --- /dev/null +++ b/examples/speech-gateway/README.md @@ -0,0 +1,55 @@ + + +# Speech Gateway + +Thin HTTP gateway that rewrites simple STT/TTS requests into the multipart oneshot format expected by a StreamKit backend. + +## Prereqs + +- StreamKit server running locally (default assumed: `http://127.0.0.1:4545`). +- Go 1.24+. + +## Run the gateway + +```sh +cd examples/streamkit-cli-gateway +go run ./cmd/gateway --listen :8080 --skit-url http://127.0.0.1:4545 +``` + +Environment equivalents: + +- `GATEWAY_LISTEN` (default `:8080`) +- `SKIT_URL` (default `http://127.0.0.1:4545`) +- `SKIT_TOKEN` (optional bearer sent to Skit) +- `GATEWAY_MAX_CONCURRENCY` (default 10) +- `GATEWAY_MAX_BODY_BYTES` (default 10MB) + +## STT via curl (Ogg/Opus) + +Transcribe a file: + +```sh +curl -H "Content-Type: audio/ogg" --data-binary @speech.ogg http://127.0.0.1:8080/stt +``` + +Transcribe from microphone (requires ffmpeg): + +```sh +./stt.sh +``` + +Press Ctrl-C when done speaking. The script captures audio, sends it to the gateway, and displays the transcription. + +Response is NDJSON (one JSON object per line). + +## TTS via curl (plain text) + +```sh +curl -H "Content-Type: text/plain" --data 'Hello from StreamKit' http://127.0.0.1:8080/tts | ffplay -nodisp -autoexit - +``` + +Response is `audio/ogg` (Opus mono). diff --git a/examples/speech-gateway/cmd/gateway/main.go b/examples/speech-gateway/cmd/gateway/main.go new file mode 100644 index 00000000..0eaa6dfe --- /dev/null +++ b/examples/speech-gateway/cmd/gateway/main.go @@ -0,0 +1,449 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +// Gateway translates simple STT/TTS requests into StreamKit oneshot multipart calls. +package main + +import ( + "bytes" + "context" + "errors" + "flag" + "fmt" + "io" + "log" + "mime/multipart" + "net" + "net/http" + "net/textproto" + "os" + "strconv" + "strings" + "time" +) + +const ( + defaultSkitURL = "http://127.0.0.1:4545" + defaultListenAddr = ":8080" + + sttPipelineYAML = ` +name: stt-ogg-opus +description: STT over streamed Ogg/Opus +mode: oneshot +steps: + - kind: streamkit::http_input + + - kind: containers::ogg::demuxer + + - kind: audio::opus::decoder + + - kind: audio::resampler + params: + chunk_frames: 960 + output_frame_size: 960 + target_sample_rate: 16000 + + - kind: plugin::native::whisper + params: + model_path: models/ggml-base.en-q5_1.bin + language: en + vad_model_path: models/silero_vad.onnx + vad_threshold: 0.5 + min_silence_duration_ms: 700 + max_segment_duration_secs: 30.0 + + - kind: core::json_serialize + params: + pretty: false + newline_delimited: true + + - kind: streamkit::http_output + params: + content_type: application/json +` + + ttsPipelineYAML = ` +name: tts-ogg-opus +description: TTS to streamed Ogg/Opus +mode: oneshot +steps: + - kind: streamkit::http_input + - kind: core::text_chunker + params: + min_length: 10 + - kind: plugin::native::kokoro + params: + model_dir: "models/kokoro-multi-lang-v1_1" + speaker_id: 0 + speed: 1.0 + num_threads: 4 + - kind: audio::resampler + params: + chunk_frames: 960 + output_frame_size: 960 + target_sample_rate: 48000 + - kind: audio::opus::encoder + - kind: containers::ogg::muxer + params: + channels: 1 + codec: opus + chunk_size: 32768 + - kind: streamkit::http_output + params: + content_type: audio/ogg +` +) + +type gateway struct { + client *http.Client + skitURL string + authToken string + maxBodySize int64 + sem chan struct{} +} + +type config struct { + skitURL string + authToken string + listenAddr string + maxConcurrency int + maxBodySize int64 +} + +func main() { + cfg := loadConfig() + gw := &gateway{ + client: newHTTPClient(), + skitURL: cfg.skitURL, + authToken: cfg.authToken, + maxBodySize: cfg.maxBodySize, + sem: make(chan struct{}, cfg.maxConcurrency), + } + + mux := http.NewServeMux() + mux.HandleFunc("/stt", gw.handleSTT) + mux.HandleFunc("/tts", gw.handleTTS) + + server := &http.Server{ + Addr: cfg.listenAddr, + Handler: logging(mux), + ReadHeaderTimeout: 5 * time.Second, + } + + log.Printf("StreamKit speech gateway listening on %s -> %s", cfg.listenAddr, cfg.skitURL) + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("server error: %v", err) + } +} + +func loadConfig() config { + listen := flagString("listen", getEnvDefault("GATEWAY_LISTEN", defaultListenAddr), "Listen address for the gateway") + skit := flagString("skit-url", getEnvDefault("SKIT_URL", defaultSkitURL), "Skit backend URL") + token := flagString("token", os.Getenv("SKIT_TOKEN"), "Bearer token for Skit (overrides SKIT_TOKEN env)") + maxConc := flagInt("max-concurrency", envInt("GATEWAY_MAX_CONCURRENCY", 10), "Maximum concurrent in-flight requests") + maxBody := flagInt64("max-body-bytes", envInt64("GATEWAY_MAX_BODY_BYTES", 10*1024*1024), "Maximum request body size") + + flag.Parse() + + return config{ + skitURL: *skit, + authToken: *token, + listenAddr: *listen, + maxConcurrency: *maxConc, + maxBodySize: *maxBody, + } +} + +func flagString(name, def, usage string) *string { + return flag.String(name, def, usage) +} + +func flagInt(name string, def int, usage string) *int { + return flag.Int(name, def, usage) +} + +func flagInt64(name string, def int64, usage string) *int64 { + return flag.Int64(name, def, usage) +} + +func envInt(key string, def int) int { + if v := os.Getenv(key); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + return n + } + } + return def +} + +func envInt64(key string, def int64) int64 { + if v := os.Getenv(key); v != "" { + if n, err := strconv.ParseInt(v, 10, 64); err == nil && n > 0 { + return n + } + } + return def +} + +func getEnvDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +func newHTTPClient() *http.Client { + tr := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 5 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + MaxIdleConns: 0, + MaxConnsPerHost: 0, + MaxIdleConnsPerHost: 0, + IdleConnTimeout: 0, + DisableKeepAlives: true, + ExpectContinueTimeout: 0, + ForceAttemptHTTP2: false, + } + return &http.Client{Transport: tr} +} + +func (gw *gateway) acquire() func() { + gw.sem <- struct{}{} + return func() { <-gw.sem } +} + +func (gw *gateway) handleSTT(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost && r.Method != http.MethodPut { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + ct := r.Header.Get("Content-Type") + switch strings.ToLower(ct) { + case "", "application/octet-stream", "application/x-www-form-urlencoded", "text/plain": + ct = "audio/ogg" + log.Printf("stt request missing/unknown Content-Type, assuming %s", ct) + } + log.Printf("stt request start remote=%s ct=%s", r.RemoteAddr, ct) + defer func() { + _ = r.Body.Close() + }() + if !strings.HasPrefix(ct, "audio/ogg") { + log.Printf("stt unsupported content type: %s", ct) + http.Error(w, "Content-Type must be audio/ogg (Opus mono 48k)", http.StatusUnsupportedMediaType) + return + } + release := gw.acquire() + defer release() + r.Body = http.MaxBytesReader(w, r.Body, gw.maxBodySize) + useBuffer := r.ContentLength > 0 && r.ContentLength <= gw.maxBodySize + if err := gw.proxyMultipart(w, r, sttPipelineYAML, "media", "audio/ogg", useBuffer); err != nil { + log.Printf("stt error: %v", err) + if !errors.Is(err, context.Canceled) { + http.Error(w, "upstream error", http.StatusBadGateway) + } + } +} + +func (gw *gateway) handleTTS(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost && r.Method != http.MethodPut { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + ct := r.Header.Get("Content-Type") + switch strings.ToLower(ct) { + case "", "application/octet-stream", "application/x-www-form-urlencoded": + ct = "text/plain" + log.Printf("tts request missing/unknown Content-Type, assuming %s", ct) + } + log.Printf("tts request start remote=%s ct=%s", r.RemoteAddr, ct) + defer func() { + _ = r.Body.Close() + }() + if !strings.HasPrefix(ct, "text/plain") { + http.Error(w, "Content-Type must be text/plain", http.StatusUnsupportedMediaType) + return + } + release := gw.acquire() + defer release() + r.Body = http.MaxBytesReader(w, r.Body, gw.maxBodySize) + useBuffer := r.ContentLength > 0 && r.ContentLength <= gw.maxBodySize + if err := gw.proxyMultipart(w, r, ttsPipelineYAML, "media", "text/plain", useBuffer); err != nil { + log.Printf("tts error: %v", err) + if !errors.Is(err, context.Canceled) { + http.Error(w, "upstream error", http.StatusBadGateway) + } + } +} + +func (gw *gateway) proxyMultipart(w http.ResponseWriter, r *http.Request, pipelineYAML, mediaField, mediaContentType string, bufferBody bool) error { + ctx := r.Context() + + // Optionally buffer the request body for finite uploads (helps curl -T file). + var src io.Reader = r.Body + if bufferBody { + limited := io.LimitReader(r.Body, gw.maxBodySize+1) + buf, err := io.ReadAll(limited) + if err != nil { + return fmt.Errorf("buffer request body: %w", err) + } + if int64(len(buf)) > gw.maxBodySize { + return fmt.Errorf("body too large") + } + log.Printf("buffered upload (%d bytes) before forwarding", len(buf)) + src = bytes.NewReader(buf) + } + + bodyReader, bodyWriter := io.Pipe() + mw := multipart.NewWriter(bodyWriter) + + // writer goroutine + go func() { + defer func() { + _ = bodyWriter.Close() + }() + log.Printf("-> skit /api/v1/process (pipeline=%s, field=%s)", pipelineName(pipelineYAML), mediaField) + if err := writeConfigPart(mw, pipelineYAML); err != nil { + log.Printf("multipart config error: %v", err) + bodyWriter.CloseWithError(err) + return + } + if err := writeStreamPart(mw, mediaField, mediaContentType, src); err != nil { + log.Printf("multipart stream error: %v", err) + bodyWriter.CloseWithError(err) + return + } + if err := mw.Close(); err != nil { + log.Printf("multipart close error: %v", err) + bodyWriter.CloseWithError(err) + } + }() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, gw.skitURL+"/api/v1/process", bodyReader) + if err != nil { + return fmt.Errorf("create skit request: %w", err) + } + req.Header.Set("Content-Type", mw.FormDataContentType()) + req.Close = true + if gw.authToken != "" { + req.Header.Set("Authorization", "Bearer "+gw.authToken) + } + + resp, err := gw.client.Do(req) + if err != nil { + log.Printf("call skit failed: %v", err) + return fmt.Errorf("call skit: %w", err) + } + defer func() { + _ = resp.Body.Close() + }() + + log.Printf("<- skit status=%d", resp.StatusCode) + + copyHeaders(w.Header(), resp.Header) + // Avoid forwarding length/transfer headers so Go can stream-chunk the proxied body. + w.Header().Del("Content-Length") + w.Header().Del("Transfer-Encoding") + w.WriteHeader(resp.StatusCode) + + flusher, _ := w.(http.Flusher) + + if resp.StatusCode >= 400 { + slurp, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + if len(slurp) > 0 { + log.Printf("upstream error body: %s", strings.TrimSpace(string(slurp))) + } + if _, err := w.Write(slurp); err != nil { + log.Printf("write error: %v", err) + } + return nil + } + + target := w.(io.Writer) + if flusher != nil { + target = flushWriter{w: w, f: flusher} + } + + _, copyErr := io.Copy(target, resp.Body) + if copyErr != nil { + log.Printf("copy response error: %v", copyErr) + } + + return copyErr +} + +func writeConfigPart(mw *multipart.Writer, pipelineYAML string) error { + part, err := mw.CreateFormField("config") + if err != nil { + return fmt.Errorf("create config part: %w", err) + } + if _, err := io.WriteString(part, strings.TrimSpace(pipelineYAML)); err != nil { + return fmt.Errorf("write config: %w", err) + } + return nil +} + +func writeStreamPart(mw *multipart.Writer, fieldName, contentType string, src io.Reader) error { + h := textproto.MIMEHeader{} + h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"; filename="media"`, fieldName)) + h.Set("Content-Type", contentType) + + part, err := mw.CreatePart(h) + if err != nil { + return fmt.Errorf("create media part: %w", err) + } + if n, err := io.Copy(part, src); err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + log.Printf("media stream ended early after %d bytes: %v", n, err) + return nil + } + log.Printf("media copy error after %d bytes: %v", n, err) + return fmt.Errorf("copy media: %w", err) + } + return nil +} + +func copyHeaders(dst, src http.Header) { + for k, vv := range src { + if strings.EqualFold(k, "Content-Length") || strings.EqualFold(k, "Transfer-Encoding") { + continue + } + for _, v := range vv { + dst.Add(k, v) + } + } +} + +type flushWriter struct { + w io.Writer + f http.Flusher +} + +func (fw flushWriter) Write(p []byte) (int, error) { + n, err := fw.w.Write(p) + if fw.f != nil { + fw.f.Flush() + } + return n, err +} + +// logging middleware (minimal) +func logging(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + log.Printf("recv %s %s remote=%s ct=%s", r.Method, r.URL.Path, r.RemoteAddr, r.Header.Get("Content-Type")) + start := time.Now() + next.ServeHTTP(w, r) + log.Printf("%s %s %s", r.Method, r.URL.Path, time.Since(start).Truncate(time.Millisecond)) + }) +} + +func pipelineName(yaml string) string { + for _, line := range strings.Split(yaml, "\n") { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "name:") { + return strings.TrimSpace(strings.TrimPrefix(line, "name:")) + } + } + return "unknown" +} diff --git a/examples/speech-gateway/go.mod b/examples/speech-gateway/go.mod new file mode 100644 index 00000000..a9805a17 --- /dev/null +++ b/examples/speech-gateway/go.mod @@ -0,0 +1,3 @@ +module github.com/streamer45/streamkit/examples/speech-gateway + +go 1.24 diff --git a/examples/speech-gateway/stt.sh b/examples/speech-gateway/stt.sh new file mode 100755 index 00000000..8c58f8ca --- /dev/null +++ b/examples/speech-gateway/stt.sh @@ -0,0 +1,110 @@ +#!/usr/bin/env bash +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +set -euo pipefail + +# ---------------- config ---------------- +STT_URL="${STT_URL:-https://stt.streamkit.dev}" +WORKDIR="$(mktemp -d /tmp/stt-XXXX)" +OGG="${WORKDIR}/capture.ogg" + +# ---------------- platform detection ---------------- +OS="$(uname -s)" +case "$OS" in +Linux) + AUDIO_FMT="pulse" + INPUT_DEV="default" + ;; +Darwin) + AUDIO_FMT="avfoundation" + INPUT_DEV=":0" + ;; +MINGW* | MSYS* | CYGWIN*) + AUDIO_FMT="dshow" + INPUT_DEV="audio=default" + ;; +*) + echo "Unsupported OS: ${OS}" >&2 + exit 1 + ;; +esac + +# Allow override +AUDIO_FMT="${AUDIO_FMT_OVERRIDE:-$AUDIO_FMT}" +INPUT_DEV="${INPUT_DEV_OVERRIDE:-$INPUT_DEV}" + +# ---------------- info ---------------- +echo "OS: ${OS}" +echo "Audio backend: ${AUDIO_FMT}" +echo "Input device: ${INPUT_DEV}" +echo "Output file: ${OGG}" +echo +echo "Initializing microphone…" + +# ---------------- start capture ---------------- +# Build ffmpeg args based on audio format +FFMPEG_ARGS=(-f "${AUDIO_FMT}") + +# For PulseAudio, reduce buffer sizes to minimize latency +if [ "${AUDIO_FMT}" = "pulse" ]; then + FFMPEG_ARGS+=(-fragment_size 1024) +fi + +FFMPEG_ARGS+=( + -i "${INPUT_DEV}" + -ac 1 -ar 48000 + -af "volume=2.0" + -flush_packets 1 + -c:a libopus + -frame_duration 20 + -application voip + -f ogg + -hide_banner -loglevel info + "${OGG}" +) + +# Create a named pipe for stdin control +FFMPEG_PIPE="${WORKDIR}/ffmpeg.pipe" +mkfifo "${FFMPEG_PIPE}" + +# Start ffmpeg with stdin from the pipe, keep it open in background +exec 3<>"${FFMPEG_PIPE}" +ffmpeg "${FFMPEG_ARGS[@]}" <"${FFMPEG_PIPE}" & +FFPID=$! + +# Give ffmpeg a brief moment to initialize +sleep 0.1 + +echo +printf "\a" +echo "🎙️ Recording — speak now" + +# ---------------- cleanup ---------------- +cleanup() { + echo + echo "Stopping capture…" + + # Send 'q' to ffmpeg's stdin for graceful shutdown + echo "q" >&3 2>/dev/null || true + + # Wait for ffmpeg to finish writing the file + wait "${FFPID}" 2>/dev/null || true + + # Close the pipe + exec 3>&- 2>/dev/null || true + + echo "Sending to STT…" + curl --http1.1 --no-buffer -sS \ + -H "Content-Type: audio/ogg" \ + --data-binary @"${OGG}" \ + "${STT_URL}" + echo + + echo "Capture kept at:" + echo " ${OGG}" +} + +trap cleanup INT TERM +wait "${FFPID}" diff --git a/plugins/native/whisper/Cargo.lock b/plugins/native/whisper/Cargo.lock index 1caebe10..4b3d5a9e 100644 --- a/plugins/native/whisper/Cargo.lock +++ b/plugins/native/whisper/Cargo.lock @@ -1158,7 +1158,7 @@ dependencies = [ [[package]] name = "whisper-plugin-native" -version = "0.1.0" +version = "0.1.1" dependencies = [ "ndarray", "once_cell", diff --git a/plugins/native/whisper/Cargo.toml b/plugins/native/whisper/Cargo.toml index f413e040..de618348 100644 --- a/plugins/native/whisper/Cargo.toml +++ b/plugins/native/whisper/Cargo.toml @@ -4,7 +4,7 @@ [package] name = "whisper-plugin-native" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "MPL-2.0" diff --git a/plugins/native/whisper/src/lib.rs b/plugins/native/whisper/src/lib.rs index 539dbede..28ccf833 100644 --- a/plugins/native/whisper/src/lib.rs +++ b/plugins/native/whisper/src/lib.rs @@ -575,6 +575,35 @@ impl NativeProcessorNode for WhisperPlugin { } Ok(()) } + + fn flush(&mut self, output: &OutputSender) -> Result<(), String> { + tracing::debug!( + buffered_samples = self.speech_buffer.len(), + pending_frame_samples = self.frame_buffer.len(), + "Flushing Whisper plugin buffers" + ); + + if self.speech_buffer.is_empty() { + self.frame_buffer.clear(); + return Ok(()); + } + + let mut end_time_ms = self.absolute_time_ms; + + if !self.frame_buffer.is_empty() { + let remaining: Vec = self.frame_buffer.drain(..).collect(); + + // Attach any trailing samples that never formed a full VAD frame + self.speech_buffer.extend(remaining.iter().copied()); + + // Account for the partial frame time in telemetry timestamps + let remaining_ms = (remaining.len() as u64).div_ceil(16); + end_time_ms = end_time_ms.saturating_add(remaining_ms); + self.absolute_time_ms = end_time_ms; + } + + self.transcribe_and_emit(output, end_time_ms, "stream_end", None) + } } impl WhisperPlugin {