/
wsprotocol.go
101 lines (88 loc) · 2.1 KB
/
wsprotocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package service
import (
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/underpin-korea/protocol/livekit"
"github.com/underpin-korea/protocol/logger"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"github.com/underpin-korea/livekit_server_go/pkg/rtc/types"
)
const (
pingFrequency = 10 * time.Second
pingTimeout = 2 * time.Second
)
type WSSignalConnection struct {
conn types.WebsocketClient
mu sync.Mutex
useJSON bool
}
func NewWSSignalConnection(conn types.WebsocketClient) *WSSignalConnection {
wsc := &WSSignalConnection{
conn: conn,
mu: sync.Mutex{},
useJSON: true,
}
go wsc.pingWorker()
return wsc
}
func (c *WSSignalConnection) ReadRequest() (*livekit.SignalRequest, error) {
for {
// handle special messages and pass on the rest
messageType, payload, err := c.conn.ReadMessage()
if err != nil {
return nil, err
}
msg := &livekit.SignalRequest{}
switch messageType {
case websocket.BinaryMessage:
if c.useJSON {
c.mu.Lock()
// switch to protobuf if client supports it
c.useJSON = false
c.mu.Unlock()
}
// protobuf encoded
err := proto.Unmarshal(payload, msg)
return msg, err
case websocket.TextMessage:
c.mu.Lock()
// json encoded, also write back JSON
c.useJSON = true
c.mu.Unlock()
err := protojson.Unmarshal(payload, msg)
return msg, err
default:
logger.Debugw("unsupported message", "message", messageType)
return nil, nil
}
}
}
func (c *WSSignalConnection) WriteResponse(msg *livekit.SignalResponse) error {
var msgType int
var payload []byte
var err error
c.mu.Lock()
defer c.mu.Unlock()
if c.useJSON {
msgType = websocket.TextMessage
payload, err = protojson.Marshal(msg)
} else {
msgType = websocket.BinaryMessage
payload, err = proto.Marshal(msg)
}
if err != nil {
return err
}
return c.conn.WriteMessage(msgType, payload)
}
func (c *WSSignalConnection) pingWorker() {
for {
<-time.After(pingFrequency)
err := c.conn.WriteControl(websocket.PingMessage, []byte(""), time.Now().Add(pingTimeout))
if err != nil {
return
}
}
}