/
organization_pool.go
148 lines (133 loc) · 3.39 KB
/
organization_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package notifyhub
import (
"fmt"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
)
type OrganizationPool []*Connection
var (
ErrUserInPool = fmt.Errorf("user already present in pool")
ErrUserNotFound = fmt.Errorf("could not find user in pool")
ErrNoRecevierFound = fmt.Errorf("could not find receiver for message to send to")
ErrWriteToConn = fmt.Errorf("could not write to connection")
)
func (pool *OrganizationPool) SendBatch(receiver string, msgs BatchNotification) error {
for _, conn := range *pool {
if conn.Uuid == receiver {
err := conn.Conn.WriteJSON(msgs)
if err != nil {
return ErrWriteToConn
}
return nil
}
}
return ErrNoRecevierFound
}
func (pool *OrganizationPool) Broadcast(msg *IncomingEvent) error {
for _, conn := range *pool {
err := conn.Conn.WriteJSON(msg)
if err != nil {
return ErrWriteToConn
}
}
return nil
}
func (pool *OrganizationPool) BroadcastOnline(msg *IncomingEvent) error {
// tell everyone how is online
var newConn *Connection
for _, conn := range *pool {
if conn.Uuid == msg.UserUuid {
newConn = conn
continue
}
err := conn.Conn.WriteJSON(msg)
if err != nil {
return ErrWriteToConn
}
}
// tell new connection who is online
if newConn == nil {
return nil
}
for _, conn := range *pool {
if err := newConn.Conn.WriteJSON(&IncomingEvent{
UserUuid: conn.Uuid,
Organization: msg.Organization,
Timestamp: msg.Timestamp,
Mutation: msg.Mutation,
Event: msg.Event,
Value: nil,
}); err != nil {
return ErrWriteToConn
}
}
return nil
}
// Send iterate over the pool and sends the message to the connection
// which uuid matches with the receiver ones. If not receiver is found
// Send returns an ErrNoRecevierFound
func (pool *OrganizationPool) Send(receiver string, msg *IncomingEvent) error {
for _, conn := range *pool {
if conn.Uuid == receiver {
err := conn.Conn.WriteJSON(msg)
if err != nil {
return ErrWriteToConn
}
return nil
}
}
return ErrNoRecevierFound
}
// Add adds a new connection to its pool if the connection
// dose not already exists
func (pool *OrganizationPool) Add(newConn *Connection) error {
for _, conn := range *pool {
if newConn.Uuid == conn.Uuid {
return ErrUserInPool
}
}
*pool = append(*pool, newConn)
return nil
}
// Remove removes a connection form its pool
func (pool *OrganizationPool) Remove(delConn *Connection) error {
var offset int
for i, conn := range *pool {
if delConn.Uuid == conn.Uuid {
offset = i
break
}
}
*pool = append((*pool)[:offset], (*pool)[offset+1:]...)
return nil
}
func (pool *OrganizationPool) Length() int {
return len(*pool)
}
func (pool *OrganizationPool) Find(uuid string) *Connection {
for _, conn := range *pool {
if conn.Uuid == uuid {
return conn
}
}
return nil
}
type Connection struct {
Conn *websocket.Conn
Uuid string
Organization string
}
// Health checks periodically if the connection is still alive
// if not conn signals to hub that connection can be removed
// since the client should never send any messages Health will block at conn.ReadMessage
// which will interrupt if the connection breaks
func (conn *Connection) Health(hub *NotifyHub) {
defer func() {
hub.unsubscribe <- conn
}()
// block on ReadMessage
_, _, err := conn.Conn.ReadMessage()
if err != nil {
logrus.Warnf("[connection.Health] closed: %v\n", err)
}
}