Skip to content

Commit

Permalink
Script-driven tts loadtesting, using Polly (#632)
Browse files Browse the repository at this point in the history
* script-driven tts loadtesting; using Polly

* linting

* changing some consts to flags

* capitalization matters

* update script.txt: more crosstalk to test overloading transcribers

* add script-no-crosstalk.txt

* add setup flag

* userPassword configurable (for community testing); improve crosstalk

* adding some overlapping speach at the start as an edge case

* recState -> jobState

* add a longer monologue to test windowing

* add profile var

* remove google translate tts

* send unmute/mute events

* revert changes to server files
  • Loading branch information
cpoile committed Apr 11, 2024
1 parent cc0336c commit 815e93d
Show file tree
Hide file tree
Showing 9 changed files with 875 additions and 78 deletions.
141 changes: 83 additions & 58 deletions lt/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/service/polly"
"github.com/pion/webrtc/v3"
"io"
"log"
"net/http"
Expand All @@ -23,7 +26,6 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/pion/webrtc/v3/pkg/media/ivfreader"
"github.com/pion/webrtc/v3/pkg/media/oggreader"
Expand Down Expand Up @@ -81,20 +83,26 @@ type Config struct {
Simulcast bool
Setup bool
SpeechFile string
PollySession *polly.Polly
PollyVoiceID *string
}

type User struct {
userID string
cfg Config
client *model.Client4
pc *webrtc.PeerConnection
dc *webrtc.DataChannel
connectedCh chan struct{}
doneCh chan struct{}
iceCh chan webrtc.ICECandidateInit
initCh chan struct{}
isHost bool
speechTextCh chan string
userID string
cfg Config
client *model.Client4
pc *webrtc.PeerConnection
dc *webrtc.DataChannel
connectedCh chan struct{}
doneCh chan struct{}
iceCh chan webrtc.ICECandidateInit
initCh chan struct{}
isHost bool

pollySession *polly.Polly
pollyVoiceID *string
speechTextCh chan string
doneSpeakingCh chan struct{}

// WebSocket
wsCloseCh chan struct{}
Expand All @@ -109,14 +117,17 @@ type wsMsg struct {

func NewUser(cfg Config) *User {
return &User{
cfg: cfg,
connectedCh: make(chan struct{}),
doneCh: make(chan struct{}),
iceCh: make(chan webrtc.ICECandidateInit, 10),
wsCloseCh: make(chan struct{}),
wsSendCh: make(chan wsMsg, 256),
initCh: make(chan struct{}),
speechTextCh: make(chan string, 8),
cfg: cfg,
connectedCh: make(chan struct{}),
doneCh: make(chan struct{}),
iceCh: make(chan webrtc.ICECandidateInit, 10),
wsCloseCh: make(chan struct{}),
wsSendCh: make(chan wsMsg, 256),
initCh: make(chan struct{}),
speechTextCh: make(chan string, 8),
doneSpeakingCh: make(chan struct{}),
pollySession: cfg.PollySession,
pollyVoiceID: cfg.PollyVoiceID,
}
}

Expand Down Expand Up @@ -423,57 +434,71 @@ func (u *User) transmitSpeech() {
}

for text := range u.speechTextCh {
log.Printf("%s: received text to speak: %q", u.cfg.Username, text)

rd, rate, err := textToSpeech(text)
if err != nil {
log.Printf("%s: textToSpeech failed: %s", u.cfg.Username, err.Error())
continue
}
func() {
defer func() {
u.doneSpeakingCh <- struct{}{}
}()
log.Printf("%s: received text to speak: %q", u.cfg.Username, text)

var rd io.Reader
var rate int
var err error
if u.pollySession != nil {
rd, rate, err = u.pollyToSpeech(text)
}
if err != nil {
log.Printf("%s: textToSpeech failed: %s", u.cfg.Username, err.Error())
return
}

log.Printf("%s: raw speech samples decoded (%d)", u.cfg.Username, rate)
log.Printf("%s: raw speech samples decoded (%d)", u.cfg.Username, rate)

audioSamplesDataBuf := bytes.NewBuffer([]byte{})
if _, err := audioSamplesDataBuf.ReadFrom(rd); err != nil {
log.Printf("%s: failed to read samples data: %s", u.cfg.Username, err.Error())
continue
}
audioSamplesDataBuf := bytes.NewBuffer([]byte{})
if _, err := audioSamplesDataBuf.ReadFrom(rd); err != nil {
log.Printf("%s: failed to read samples data: %s", u.cfg.Username, err.Error())
return
}

log.Printf("read %d samples bytes", audioSamplesDataBuf.Len())
log.Printf("read %d samples bytes", audioSamplesDataBuf.Len())

sampleDuration := time.Millisecond * 20
ticker := time.NewTicker(sampleDuration)
audioSamplesData := make([]byte, 480*4)
audioSamples := make([]int16, 480)
opusData := make([]byte, 8192)
for ; true; <-ticker.C {
n, err := audioSamplesDataBuf.Read(audioSamplesData)
if err != nil {
log.Printf("%s: failed to read audio samples: %s", u.cfg.Username, err.Error())
break
}
sampleDuration := time.Millisecond * 20
ticker := time.NewTicker(sampleDuration)
audioSamplesData := make([]byte, 480*4)
audioSamples := make([]int16, 480)
opusData := make([]byte, 8192)
for ; true; <-ticker.C {
n, err := audioSamplesDataBuf.Read(audioSamplesData)
if err != nil {
if !errors.Is(err, io.EOF) {
log.Printf("%s: failed to read audio samples: %s", u.cfg.Username, err.Error())
}
break
}

// Convert []byte to []int16
for i := 0; i < n; i += 4 {
audioSamples[i/4] = int16(binary.LittleEndian.Uint16(audioSamplesData[i : i+4]))
}
// Convert []byte to []int16
for i := 0; i < n; i += 4 {
audioSamples[i/4] = int16(binary.LittleEndian.Uint16(audioSamplesData[i : i+4]))
}

n, err = enc.Encode(audioSamples, opusData)
if err != nil {
log.Printf("%s: failed to encode: %s", u.cfg.Username, err.Error())
continue
}
n, err = enc.Encode(audioSamples, opusData)
if err != nil {
log.Printf("%s: failed to encode: %s", u.cfg.Username, err.Error())
continue
}

if err := track.WriteSample(media.Sample{Data: opusData[:n], Duration: sampleDuration}); err != nil {
log.Printf("%s: failed to write audio sample: %s", u.cfg.Username, err.Error())
if err := track.WriteSample(media.Sample{Data: opusData[:n], Duration: sampleDuration}); err != nil {
log.Printf("%s: failed to write audio sample: %s", u.cfg.Username, err.Error())
}
}
}
}()
}
}()
}

func (u *User) Speak(text string) {
func (u *User) Speak(text string) chan struct{} {
u.Unmute()
u.speechTextCh <- text
return u.doneSpeakingCh
}

func (u *User) initRTC() error {
Expand Down
22 changes: 14 additions & 8 deletions lt/client/tts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,28 @@ package client

import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/polly"
mp3 "github.com/hajimehoshi/go-mp3"
"io"
"net/http"
"net/url"
)

func textToSpeech(text string) (io.Reader, int, error) {
resp, err := http.Get(fmt.Sprintf("http://translate.google.com/translate_tts?ie=UTF-8&total=1&idx=0&textlen=32&client=tw-ob&q=%s&tl=%s",
url.QueryEscape(text), "en"))
func (u *User) pollyToSpeech(text string) (io.Reader, int, error) {
input := &polly.SynthesizeSpeechInput{
Engine: aws.String("neural"),
LanguageCode: aws.String(polly.LanguageCodeEnUs),
OutputFormat: aws.String("mp3"),
SampleRate: aws.String("24000"),
VoiceId: u.pollyVoiceID,
Text: aws.String(text),
}
output, err := u.pollySession.SynthesizeSpeech(input)
if err != nil {
return nil, 0, fmt.Errorf("request failed: %w", err)
return nil, 0, fmt.Errorf("failed to synthesize speech with polly, err: %v", err)
}

dec, err := mp3.NewDecoder(resp.Body)
dec, err := mp3.NewDecoder(output.AudioStream)
if err != nil {
resp.Body.Close()
return nil, 0, fmt.Errorf("failed to create decoder: %w", err)
}

Expand Down

0 comments on commit 815e93d

Please sign in to comment.