/
room.go
156 lines (127 loc) · 3.08 KB
/
room.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package server
import (
"io"
"sync"
"github.com/juju/errors"
"github.com/peer-calls/peer-calls/v4/server/identifiers"
)
type NewAdapterFunc func(room identifiers.RoomID) Adapter
type adapterCounter struct {
count uint64
adapter Adapter
}
type AdapterRoomManager struct {
rooms map[identifiers.RoomID]*adapterCounter
roomsMu sync.RWMutex
newAdapter NewAdapterFunc
}
var _ RoomManager = &AdapterRoomManager{}
func NewAdapterRoomManager(newAdapter NewAdapterFunc) *AdapterRoomManager {
return &AdapterRoomManager{
rooms: map[identifiers.RoomID]*adapterCounter{},
newAdapter: newAdapter,
}
}
func (r *AdapterRoomManager) Enter(room identifiers.RoomID) (adapter Adapter, isNew bool) {
r.roomsMu.Lock()
defer r.roomsMu.Unlock()
ac, ok := r.rooms[room]
if ok {
ac.count++
} else {
isNew = true
ac = &adapterCounter{
count: 1,
adapter: r.newAdapter(room),
}
r.rooms[room] = ac
}
return ac.adapter, isNew
}
func (r *AdapterRoomManager) Exit(room identifiers.RoomID) (isRemoved bool) {
r.roomsMu.Lock()
defer r.roomsMu.Unlock()
adapter, ok := r.rooms[room]
if ok {
adapter.count--
if adapter.count == 0 {
isRemoved = true
delete(r.rooms, room)
adapter.adapter.Close() // FIXME log error
}
}
return isRemoved
}
type ChannelRoomManager struct {
roomManager RoomManager
roomEventsChan chan RoomEvent
closedChan chan struct{}
closedChanCloseOnce sync.Once
mu sync.Mutex
}
func NewChannelRoomManager(roomManager RoomManager) *ChannelRoomManager {
return &ChannelRoomManager{
roomManager: roomManager,
roomEventsChan: make(chan RoomEvent),
closedChan: make(chan struct{}),
}
}
// Close exists for tests. This channel should always stay open IRL.
func (r *ChannelRoomManager) Close() {
r.mu.Lock()
defer r.mu.Unlock()
r.closedChanCloseOnce.Do(func() {
close(r.roomEventsChan)
})
}
func (r *ChannelRoomManager) isClosed() bool {
select {
case <-r.closedChan:
return true
default:
return false
}
}
func (r *ChannelRoomManager) Enter(room identifiers.RoomID) (adapter Adapter, isNew bool) {
r.mu.Lock()
defer r.mu.Unlock()
adapter, isNew = r.roomManager.Enter(room)
if isNew && !r.isClosed() {
r.roomEventsChan <- RoomEvent{
RoomName: room,
Type: RoomEventTypeAdd,
}
}
return adapter, isNew
}
func (r *ChannelRoomManager) Exit(room identifiers.RoomID) (isRemoved bool) {
r.mu.Lock()
defer r.mu.Unlock()
isRemoved = r.roomManager.Exit(room)
if isRemoved && !r.isClosed() {
r.roomEventsChan <- RoomEvent{
RoomName: room,
Type: RoomEventTypeRemove,
}
}
return isRemoved
}
func (r *ChannelRoomManager) AcceptEvent() (RoomEvent, error) {
event, ok := <-r.roomEventsChan
if !ok {
return event, errors.Annotatef(io.ErrClosedPipe, "ChannelRoomManager closed")
}
return event, nil
}
func (r *ChannelRoomManager) RoomEventsChannel() <-chan RoomEvent {
return r.roomEventsChan
}
type RoomEvent struct {
RoomName identifiers.RoomID
Type RoomEventType
}
type RoomEventType int
const (
RoomEventTypeAdd RoomEventType = iota + 1
RoomEventTypeRemove
)