forked from webrocket/webrocket
/
channel.go
285 lines (263 loc) · 7.95 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
// Copyright (C) 2011 by Krzysztof Kowalik <chris@nu7hat.ch>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package webrocket
import (
"errors"
"regexp"
"strings"
"sync"
)
// Pattern used to validate a channel name.
var validChannelNamePattern = regexp.MustCompile("^[\\w\\d\\_][\\w\\d\\-\\_\\.]*$")
// ChannelType represents a type of the channel. Can be normal, private
// or presence.
type ChannelType int
// Possible channel types
const (
ChannelNormal = 0
ChannelPrivate = 2
ChannelPresence = 3
)
// Channel keeps information about specified channel and it's subscriptions.
// It's hub is used to broadcast messages.
type Channel struct {
// ID of the persisted record.
_id int
// The name of the channel.
name string
// A type of the channel.
kind ChannelType
// List of subscribers.
subscribers map[string]*Subscription
// Channel's state.
alive bool
// Internal semaphore.
mtx sync.Mutex
}
// Internal constructor
// -----------------------------------------------------------------------------
// newChannel creates and configures new channel under the specified vhost.
// The channel name limitations are the same as in case of user names: channel
// name can be contain letters, numbers, dashes, underscores and dots.
//
// Each channel's broadcasting loop works in its own goroutine.
//
// name - The name of a new channel.
// kind - The type of a new channel.
//
// Returns new channel or error if something went wrong.
func newChannel(name string, kind ChannelType) (ch *Channel, err error) {
if !validChannelNamePattern.MatchString(name) {
err = errors.New("invalid channel name")
return
}
ch = &Channel{
name: name,
kind: kind,
subscribers: make(map[string]*Subscription),
alive: true,
}
return
}
// Internal
// -----------------------------------------------------------------------------
// channelTypeFromName parses given channel name and discover's what is
// its type.
//
// name - The name to be parsed.
//
// Returns the channel type.
func channelTypeFromName(name string) (t ChannelType) {
if parts := strings.Split(name, "-"); len(parts) > 1 {
switch parts[0] {
case "presence":
return ChannelPresence
case "private":
return ChannelPrivate
}
}
return ChannelNormal
}
// subscribe appends given client to the list of subscribers. If hidden
// is true then he will be invisible fot the other subscribers of the
// presence channel. Threadsafe, May be called from many websocket
// connection's handlers.
//
// client - The websocket client to be subscribed.
// hidden - If true then subscription will be invisible.
// data - The user specific data attached to the presence channel identity.
//
func (ch *Channel) subscribe(client *WebsocketConnection, hidden bool, data map[string]interface{}) {
if client != nil && ch.IsAlive() {
ch.mtx.Lock()
sid := client.Id()
s, ok := ch.subscribers[sid]
if ok {
// Already subscribing this channel...
ch.mtx.Unlock()
return
} else {
s = newSubscription(client, hidden, data)
}
data["channel"] = ch.name
var subscribers []interface{}
if ch.IsPresence() {
data["uid"] = s.Uid()
subscribers = make([]interface{}, len(ch.subscribers))
i := 0
for _, s := range ch.subscribers {
subscribers[i] = s.Data()
i += 1
}
}
// Confirm subscription.
sdata := map[string]interface{}{"channel": ch.name}
if ch.IsPresence() {
sdata["subscribers"] = subscribers
}
if ch.IsPrivate() {
sdata["uid"] = s.Uid()
}
client.Send(map[string]interface{}{":subscribed": sdata})
ch.subscribers[sid] = s
client.subscriptions[ch.Name()] = ch
ch.mtx.Unlock()
if ch.IsPresence() && !hidden {
// Tell everyone that someone joined the channel.
ch.Broadcast(map[string]interface{}{":memberJoined": data}, true)
}
}
}
// unsubscribe removes specified client from the subscribers list. Threadsafe,
// may be called from many websocket connection's handlers.
//
// client - The websocket client to be subscribed.
// data - The user specific data passed to other subscribers.
//
func (ch *Channel) unsubscribe(client *WebsocketConnection, data map[string]interface{}, confirm bool) {
if client != nil && ch.IsAlive() {
var s *Subscription
var ok bool
ch.mtx.Lock()
sid := client.Id()
if s, ok = ch.subscribers[sid]; !ok {
ch.mtx.Unlock()
return
}
// Confirm unsubscription.
if confirm {
client.Send(map[string]interface{}{
":unsubscribed": map[string]interface{}{
"channel": ch.name,
},
})
}
delete(ch.subscribers, sid)
delete(client.subscriptions, ch.Name())
ch.mtx.Unlock()
if ch.IsPrivate() {
data["uid"] = s.Uid()
}
if ch.IsPresence() && !s.IsHidden() {
// Tell the others that this guy is not subscribing the
// channel anymore.
merged := s.Data()
for k, v := range data {
merged[k] = v
}
data["channel"] = ch.name
ch.Broadcast(map[string]interface{}{":memberLeft": merged}, true)
}
}
}
// Exported
// -----------------------------------------------------------------------------
// Name returns name of the channel.
func (ch *Channel) Name() string {
return ch.name
}
// Type returns flag representing the channel's type.
func (ch *Channel) Type() ChannelType {
return ch.kind
}
// IsPrivate returns whether this channel requires authenticaion or not.
func (ch *Channel) IsPrivate() bool {
return ch.kind&ChannelPrivate == ChannelPrivate
}
// IsPresence returns whether this channel is a presence one or not.
func (ch *Channel) IsPresence() bool {
return ch.kind&ChannelPresence == ChannelPresence
}
// HasSubscriber checks whether specified client is subscribing to this
// channel or not. Threadsafe, May be called from many places and depends
// on the Subscribe and Unsubscribe funcs.
//
// client - The websocket client to be checked.
//
// Returns whether client is subscribing this channel or not.
func (ch *Channel) HasSubscriber(client *WebsocketConnection) bool {
if client != nil {
ch.mtx.Lock()
defer ch.mtx.Unlock()
_, ok := ch.subscribers[client.Id()]
return ok
}
return false
}
// Subscribers returns list of the clients subsribing to the channel.
// Threadsafe, May be called from many places and depends on the Subscribe
// and Unsubscribe funcs.
func (ch *Channel) Subscribers() map[string]*Subscription {
ch.mtx.Lock()
defer ch.mtx.Unlock()
return ch.subscribers
}
// Broadcast sends given payload to all active subscribers of this channel.
// Threadsafe, May be called from many websocket client's handlers.
//
// x - The data to be broadcasted to all the subscribers.
//
func (ch *Channel) Broadcast(x map[string]interface{}, includeHidden bool) {
go func(subscribers map[string]*Subscription) {
for _, s := range subscribers {
if s.IsHidden() && !includeHidden {
continue
}
if client := s.Client(); client != nil {
client.Send(x)
}
}
}(ch.Subscribers())
}
// IsAlive returns whether the channels is alive or not. Threadsafe, May be
// called from many places and depends on the Kill func.
func (ch *Channel) IsAlive() bool {
ch.mtx.Lock()
defer ch.mtx.Unlock()
return ch.alive
}
// Kill closes the channel's broadcaster and marks it as dead. Threadsafe,
// May be called from the backend protocol or admin interface and
// affects the IsAlive func.
func (ch *Channel) Kill() {
ch.mtx.Lock()
defer ch.mtx.Unlock()
if ch.alive {
ch.alive = false
for _, s := range ch.subscribers {
ch.unsubscribe(s.Client(), map[string]interface{}{}, false)
}
}
}