/
room.go
97 lines (88 loc) · 1.69 KB
/
room.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package comet
import (
"sync"
"github.com/nanpingli/qqqg/api/protocol"
"github.com/nanpingli/qqqg/internal/comet/errors"
)
// Room is a room and store channel room info.
type Room struct {
ID string
rLock sync.RWMutex
next *Channel
drop bool
Online int32 // dirty read is ok
AllOnline int32
}
// NewRoom new a room struct, store channel room info.
func NewRoom(id string) (r *Room) {
r = new(Room)
r.ID = id
r.drop = false
r.next = nil
r.Online = 0
return
}
// Put put channel into the room.
func (r *Room) Put(ch *Channel) (err error) {
r.rLock.Lock()
if !r.drop {
if r.next != nil {
r.next.Prev = ch
}
ch.Next = r.next
ch.Prev = nil
r.next = ch // insert to header
r.Online++
} else {
err = errors.ErrRoomDroped
}
r.rLock.Unlock()
return
}
// Del delete channel from the room.
func (r *Room) Del(ch *Channel) bool {
r.rLock.Lock()
if ch.Prev == nil && ch.Next == nil {
r.rLock.Unlock()
return false
}
if ch.Next != nil {
// if not footer
ch.Next.Prev = ch.Prev
}
if ch.Prev != nil {
// if not header
ch.Prev.Next = ch.Next
} else {
r.next = ch.Next
}
ch.Next = nil
ch.Prev = nil
r.Online--
r.drop = r.Online == 0
r.rLock.Unlock()
return r.drop
}
// Push push msg to the room, if chan full discard it.
func (r *Room) Push(p *protocol.Proto) {
r.rLock.RLock()
for ch := r.next; ch != nil; ch = ch.Next {
_ = ch.Push(p)
}
r.rLock.RUnlock()
}
// Close close the room.
func (r *Room) Close() {
r.rLock.RLock()
for ch := r.next; ch != nil; ch = ch.Next {
ch.Close()
}
r.rLock.RUnlock()
}
// OnlineNum the room all online.
func (r *Room) OnlineNum() int32 {
if r.AllOnline > 0 {
return r.AllOnline
}
return r.Online
}