-
Notifications
You must be signed in to change notification settings - Fork 272
/
client.go
131 lines (112 loc) · 2.88 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package ws
import (
"context"
"fmt"
"time"
"github.com/jeremija/peer-calls/src/server/basen"
"github.com/jeremija/peer-calls/src/server/ws/wsmessage"
"nhooyr.io/websocket"
)
type WSWriter interface {
Write(ctx context.Context, typ websocket.MessageType, msg []byte) error
}
type WSReader interface {
Read(ctx context.Context) (websocket.MessageType, []byte, error)
}
type WSReadWriter interface {
WSReader
WSWriter
}
// An abstraction for sending out to websocket using channels.
type Client struct {
id string
conn WSReadWriter
metadata string
writeChannel chan wsmessage.Message
readChannel chan wsmessage.Message
serializer wsmessage.ByteSerializer
}
// Creates a new websocket client.
func NewClient(conn WSReadWriter) *Client {
return NewClientWithID(conn, "")
}
func NewClientWithID(conn WSReadWriter, id string) *Client {
if id == "" {
id = basen.NewUUIDBase62()
}
return &Client{
id: id,
conn: conn,
writeChannel: make(chan wsmessage.Message, 16),
readChannel: make(chan wsmessage.Message, 16),
}
}
func (c *Client) SetMetadata(metadata string) {
c.metadata = metadata
}
func (c *Client) Metadata() string {
return c.metadata
}
// Writes a message to websocket with timeout.
func (c *Client) WriteTimeout(ctx context.Context, timeout time.Duration, msg wsmessage.Message) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
data, err := c.serializer.Serialize(msg)
if err != nil {
return fmt.Errorf("client.WriteTimeout - error serializing message: %w", err)
}
return c.conn.Write(ctx, websocket.MessageText, data)
}
func (c *Client) ID() string {
return c.id
}
// Gets the channel to write out to. Messages sent here will be written
// to the websocket and received by the other side.
func (c *Client) WriteChannel() chan<- wsmessage.Message {
return c.writeChannel
}
// Subscribes
func (c *Client) subscribeRead(ctx context.Context) error {
for {
typ, data, err := c.conn.Read(ctx)
if err != nil {
return fmt.Errorf("client.subscribeRead - error reading data: %w", err)
}
message, err := c.serializer.Deserialize(data)
if err != nil {
return fmt.Errorf("client.subscribeRead - error deserializing data: %w", err)
}
if typ == websocket.MessageText {
c.readChannel <- message
}
}
}
func (c *Client) Close() {
close(c.readChannel)
close(c.writeChannel)
}
func (c *Client) Subscribe(ctx context.Context, handle func(wsmessage.Message)) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
readErr := make(chan error)
go func() {
readErr <- c.subscribeRead(ctx)
close(readErr)
}()
for {
select {
case msg := <-c.writeChannel:
err := c.WriteTimeout(ctx, time.Second*5, msg)
if err != nil {
return err
}
case msg := <-c.readChannel:
handle(msg)
case err := <-readErr:
return err
case <-ctx.Done():
err := ctx.Err()
return err
}
}
}