Skip to content

Commit

Permalink
Add toggle voicce server
Browse files Browse the repository at this point in the history
  • Loading branch information
qnkhuat committed Jul 23, 2021
1 parent 0c408d6 commit 62e2691
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 59 deletions.
2 changes: 0 additions & 2 deletions tstream/pkg/room/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,14 +385,12 @@ func (r *Room) Broadcast(msg message.Wrapper, roles []message.CRole, IDExclude [
func (r *Room) Stop(status message.RoomStatus) {
log.Printf("Stopping room: %s, with Status: %s", r.name, status)
r.status = status
r.lock.Lock()
for id, client := range r.clients {
client.Close()
r.RemoveClient(id)
}
r.sfu.Stop()
r.streamer.Close()
r.lock.Unlock()
}

func (r *Room) PrepareRoomInfo() message.RoomInfo {
Expand Down
1 change: 0 additions & 1 deletion tstream/pkg/room/sfu.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func (s *SFU) AddPeer(cl *Client) error {
return
}
}
log.Printf("The enddddddddddd")
})

}
Expand Down
10 changes: 3 additions & 7 deletions tstream/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,9 @@ func (s *Server) Stop() {
// interval : scan for every interval time
// ildeThreshold : room with idle time above this threshold will be killed
func (s *Server) repeatedlyCleanRooms(interval, idleThreshold int) {
tick := time.NewTicker(time.Duration(interval) * time.Second)
for {
select {
case <-tick.C:
c := s.scanAndCleanRooms(idleThreshold)
log.Printf("Cleaned %d rooms", c)
}
for _ = range time.Tick(time.Duration(interval) * time.Second) {
c := s.scanAndCleanRooms(idleThreshold)
log.Printf("Cleaned %d rooms", c)
}
}

Expand Down
162 changes: 113 additions & 49 deletions tstream/pkg/streamer/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
"github.com/gorilla/websocket"
"github.com/pion/webrtc/v3"
"github.com/qnkhuat/mediadevices"
"github.com/qnkhuat/mediadevices/pkg/codec/opus" // This is required to use opus audio encoder
"github.com/qnkhuat/mediadevices/pkg/codec/opus"
_ "github.com/qnkhuat/mediadevices/pkg/driver/microphone" // This is required to register microphone adapter
"github.com/qnkhuat/mediadevices/pkg/prop"
"github.com/qnkhuat/tstream/pkg/message"
"github.com/rivo/tview"
"log"
Expand All @@ -22,13 +23,21 @@ import (

var decoder = schema.NewDecoder()

const mtu int = 1600

type MediaSession struct {
stream mediadevices.MediaStream
engine *webrtc.MediaEngine
}

type Chat struct {
username string
sessionId string
serverAddr string
color string
wsConn *websocket.Conn // for chat and roominfo
peerConn *webrtc.PeerConnection // for voice
mediaSession *MediaSession
app *tview.Application
startedTime time.Time
chatTextView *tview.TextView
Expand All @@ -37,6 +46,8 @@ type Chat struct {
titleTextView *tview.TextView
muteBtn *tview.Button
mute bool

lastToggleMute time.Time
}

func NewChat(sessionId, serverAddr, username string) *Chat {
Expand All @@ -58,11 +69,6 @@ func (c *Chat) Start() error {
return err
}

if err := c.StartVoiceService(); err != nil {
log.Printf("Failed to start voice service : %s", err)
c.addNoti(fmt.Sprintf("Failed to start voice service : %s", err))
}

// blocking call
if err := c.app.EnableMouse(true).Run(); err != nil {
log.Printf("Error in UI app: %s", err)
Expand Down Expand Up @@ -129,6 +135,7 @@ func (c *Chat) StartChatService() error {
go func() {
time.Sleep(1 * time.Second)
c.addNoti("[yellow]Type /help to get list of available commands[white]")
c.addNoti("[yellow]Voice chat is off. Type /unmute to turn on voice chat[white]")
}()

go func() {
Expand Down Expand Up @@ -162,46 +169,71 @@ func (c *Chat) StartChatService() error {
return nil
}

func (c *Chat) StartVoiceService() error {
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{{
URLs: []string{"stun:stun.l.google.com:19302"}},
},
SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback,
}

func GetMediaSession() (*MediaSession, error) {
// Init microphone
mediaEngine := webrtc.MediaEngine{}
var mediaSession *MediaSession
engine := &webrtc.MediaEngine{}
opusParams, err := opus.NewParams()
if err != nil {
return err
return mediaSession, err
}
codecSelector := mediadevices.NewCodecSelector(
mediadevices.WithAudioEncoders(&opusParams),
)
codecSelector.Populate(&mediaEngine)
codecSelector.Populate(engine)

api := webrtc.NewAPI(webrtc.WithMediaEngine(&mediaEngine))
peerConn, err := api.NewPeerConnection(config)
if err != nil {
log.Printf("Failed to start webrtc connection %s", err)
return err
}

s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
Audio: func(c *mediadevices.MediaTrackConstraints) {},
stream, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
Audio: func(c *mediadevices.MediaTrackConstraints) {
c.SampleRate = prop.Int(44100)
c.Latency = prop.Duration(0)
},
Codec: codecSelector,
})

if err != nil {
log.Printf("Failed to get user media %s", err)
return mediaSession, err
}

mediaSession = &MediaSession{
engine: engine,
stream: stream,
}
return mediaSession, nil
}

func (c *Chat) StartVoiceService() error {
// media engine can only created once during the process
// in order to turn on/off audio track we init it once and use many times
if c.mediaSession == nil {
mediaSession, err := GetMediaSession()
if err != nil {
log.Printf("Failed to open media engine: %s", err)
return err
}
c.mediaSession = mediaSession
}

config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{{
URLs: []string{"stun:stun.l.google.com:19302"}},
},
SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback,
}

api := webrtc.NewAPI(webrtc.WithMediaEngine(c.mediaSession.engine))

peerConn, err := api.NewPeerConnection(config)
if err != nil {
log.Printf("Failed to start webrtc connection %s", err)
return err
}

c.peerConn = peerConn

// add tracks to peer connection
for _, track := range s.GetTracks() {
log.Printf("%s, %s, %s", track.ID(), track.Kind(), track.Kind)
log.Printf("adding track")
for _, track := range c.mediaSession.stream.GetTracks() {

// TODO: we probably want to stop the chat here
// reproduce steps: try open a producer page while having chat on
track.OnEnded(func(err error) {
Expand Down Expand Up @@ -229,9 +261,10 @@ func (c *Chat) StartVoiceService() error {
switch p {

case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed, webrtc.PeerConnectionStateDisconnected:
c.Stop(fmt.Sprintf("Voice server status: %s", p))
wsConn.Close()

case webrtc.PeerConnectionStateConnected:
c.addNoti("[yellow]Voice server connected[white]")

default:
log.Printf("Not implemented: %s", p)
Expand Down Expand Up @@ -269,7 +302,6 @@ func (c *Chat) StartVoiceService() error {
err := wsConn.ReadJSON(&msg)
if err != nil {
log.Printf("Failed to read message: %s", err)
c.Stop("Failed to read message form server")
return
}

Expand Down Expand Up @@ -345,6 +377,28 @@ func (c *Chat) StartVoiceService() error {

}

func (c *Chat) StopVoiceServer() error {
if c.peerConn == nil {
return nil
}
log.Printf("Stoppign")

for _, sender := range c.peerConn.GetSenders() {
log.Printf("removed: %v", sender)
if err := sender.Stop(); err != nil {
log.Printf("Failed to stop voice: %s", err)
}
c.peerConn.RemoveTrack(sender)
}
if err := c.peerConn.Close(); err != nil {
log.Printf("Failed to stop peerconenction: %s", err)
return err
}
log.Printf("stopped")
c.peerConn = nil
return nil
}

func (c *Chat) requestServer(msgType message.MType) error {
payload := message.Wrapper{
Type: msgType,
Expand Down Expand Up @@ -424,7 +478,7 @@ func (c *Chat) initUI() error {
// Default is mute
c.muteBtn = tview.NewButton("🔇").
SetSelectedFunc(func() {
c.toggleMute()
c.toggleMute(!c.mute)
})
c.muteBtn.SetBackgroundColor(tcell.ColorBlack)

Expand Down Expand Up @@ -454,12 +508,12 @@ func (c *Chat) HandleCommand(command string) error {
args := strings.Split(command, " ")
switch args[0] {
case "help":
c.addNoti(`
TStream - Streaming from terimnal
c.addNoti(`TStream - Streaming from terimnal
[green]/title[yellow] title[white] - to change stream title
[green]/mute[white] - to turn on microphone
[green]/unmute[white] - to turn off microphone
[green]/exit[white] - to exit chat room`)
[green]/exit[white] - to exit chat room
`)

case "title":
if len(args) > 1 {
Expand All @@ -480,13 +534,10 @@ func (c *Chat) HandleCommand(command string) error {
}

case "mute":
if !c.mute {
c.toggleMute()
}
c.toggleMute(true)

case "unmute":
if c.mute {
c.toggleMute()
}
c.toggleMute(false)

case "exit":
c.Stop("Bye!")
Expand Down Expand Up @@ -518,7 +569,6 @@ func (c *Chat) connectWS(role message.CRole) (*websocket.Conn, error) {
Role: role,
Secret: GetSecret(CONFIG_PATH),
}
log.Printf("clientinfo %s", clientInfo)

payload := message.Wrapper{Type: message.TClientInfo, Data: clientInfo}
err = conn.WriteJSON(payload)
Expand Down Expand Up @@ -547,16 +597,30 @@ func (c *Chat) connectWS(role message.CRole) (*websocket.Conn, error) {
return conn, nil
}

func (c *Chat) toggleMute() {
c.mute = !c.mute
if c.mute {
func (c *Chat) toggleMute(mute bool) {
if mute {
c.StopVoiceServer()
c.muteBtn.SetLabel("🔇")
c.addNoti(`[yellow]Microphone: On[white]`)
c.addNoti(`[yellow]Microphone: Off[white]`)
} else {
c.addNoti(`[yellow]Connecting to voice server...[white]`)
// BUG
// there is a bug if user toggle audio too fast, it won't able to connect to audio
// A quick fix for this is too make sure user wait enought time before turn it back on

threshold := 8 * time.Second
time.Sleep(threshold - time.Now().Sub(c.lastToggleMute))
if err := c.StartVoiceService(); err != nil {
log.Printf("Failed to start voice service : %s", err)
c.addNoti(fmt.Sprintf("Failed to start voice service : %s", err))
return
}

c.muteBtn.SetLabel("🔈")
c.addNoti(`[yellow]Microphone: Off[white]`)
c.addNoti(`[yellow]Microphone: On[white]`)
}

c.mute = mute
c.lastToggleMute = time.Now()
}

func (c *Chat) addNoti(msg string) {
Expand Down Expand Up @@ -591,14 +655,14 @@ func (c *Chat) addChatMsgs(chatList []message.Chat) {
}

func (c *Chat) Stop(msg string) {
fmt.Printf(msg)
if c.wsConn != nil {
c.wsConn.Close()
}
if c.peerConn != nil {
c.peerConn.Close()
}
c.app.Stop()
fmt.Println(msg)
}

func FormatChat(name, content, color string) string {
Expand Down

0 comments on commit 62e2691

Please sign in to comment.