This repository has been archived by the owner on Apr 13, 2020. It is now read-only.
/
server.go
106 lines (95 loc) · 2.54 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package eventsource
import "time"
// A server manages all clients, adding and removing them from the pool and
// receiving incoming events to forward to clients
type server struct {
add chan client
remove chan client
events chan Event
hearbeat time.Duration
metrics Metrics
}
// The listen method is used to receive messages to add, remove and send
// events to clients. Every X seconds it sends a ping message to all clients to
// detect stale connections
func (s server) listen() {
var clients []client
tick := time.Tick(s.hearbeat)
for {
select {
case c := <-s.add:
clients = s.spawn(clients, c)
case c := <-s.remove:
clients = s.kill(clients, c)
case e := <-s.events:
go func() {
start := time.Now()
durations := send(e, clients)
s.metrics.EventDone(e, time.Since(start), durations)
}()
case <-tick:
go func() {
durations := send(ping{}, clients)
s.metrics.ClientCount(len(durations))
}()
}
}
}
// send receives an event and a list of clients and send to them the
// text/stream data to be written on the client's connection. It returns a list
// of time.Duration each client took. 0 duration means that the data wasn't
// sent.
func send(e Event, clients []client) []time.Duration {
durations := []time.Duration{}
clients = e.Clients(clients)
size := len(clients)
if size == 0 {
return durations
}
done := make(chan time.Duration, size)
p := payload{data: e.Bytes(), done: done}
for _, c := range clients {
go func(c client) {
select {
case c.events <- p:
case <-c.done:
p.done <- 0
}
}(c)
}
for i := 0; i < size; i++ {
d := <-done
durations = append(durations, d)
}
return durations
}
// The spawn adds a new client to the clients list and launches a goroutine for
// the client to listen to incoming messages. The client receives the remove
// channel necessary to unsubscribe itself from the server.
func (s server) spawn(clients []client, c client) []client {
go c.listen(s.remove)
clients = append(clients, c)
return clients
}
// The kill removes a client from the client list by comparing their events
// channel. The client is removed by being moved to the end of the list and
// reducing the slice length.
func (s server) kill(clients []client, client client) []client {
index := -1
for i, c := range clients {
if client.events == c.events {
index = i
break
}
}
if index == -1 {
panic("client not found")
}
last := len(clients) - 1
if index < last {
swap := clients[last]
clients[index] = swap
}
clients = clients[:last]
return clients
}