-
Notifications
You must be signed in to change notification settings - Fork 19
/
log.go
151 lines (131 loc) · 3.21 KB
/
log.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package workload
import (
"bufio"
"context"
"encoding/json"
"fmt"
"sync"
"github.com/alphadose/haxmap"
"github.com/projecteru2/agent/types"
"github.com/projecteru2/agent/utils"
corelog "github.com/projecteru2/core/log"
coreutils "github.com/projecteru2/core/utils"
)
type subscriber struct {
ctx context.Context
cancel context.CancelFunc
buf *bufio.ReadWriter
errChan chan error
}
func (s *subscriber) isDone() bool {
select {
case <-s.ctx.Done():
return true
default:
return false
}
}
// logBroadcaster receives log and broadcasts to subscribers
type logBroadcaster struct {
sync.RWMutex
logC chan *types.Log
subscribersMap *haxmap.Map[string, map[string]*subscriber] // format: map[app string, map[ID string]*subscriber]
}
func newLogBroadcaster() *logBroadcaster {
return &logBroadcaster{
logC: make(chan *types.Log),
subscribersMap: haxmap.New[string, map[string]*subscriber](),
}
}
func (l *logBroadcaster) getSubscribers(app string) map[string]*subscriber {
subs, ok := l.subscribersMap.Get(app)
if !ok {
subs = map[string]*subscriber{}
l.subscribersMap.Set(app, subs)
}
return subs
}
func (l *logBroadcaster) deleteSubscribers(app string) {
l.subscribersMap.Del(app)
}
// subscribe subscribes logs of the specific app.
func (l *logBroadcaster) subscribe(ctx context.Context, app string, buf *bufio.ReadWriter) (string, chan error, func()) {
l.Lock()
defer l.Unlock()
subscribers := l.getSubscribers(app)
ID := coreutils.RandomString(8)
ctx, cancel := context.WithCancel(ctx)
errChan := make(chan error)
subscribers[ID] = &subscriber{
ctx: ctx,
cancel: cancel,
buf: buf,
errChan: errChan,
}
corelog.Infof(ctx, "%s %s log subscribed", app, ID)
return ID, errChan, func() {
cancel()
_ = utils.Pool.Submit(func() { l.unsubscribe(app, ID) })
}
}
func (l *logBroadcaster) unsubscribe(app string, ID string) {
l.Lock()
defer l.Unlock()
subscribers := l.getSubscribers(app)
subscriber, ok := subscribers[ID]
if ok {
close(subscriber.errChan)
}
delete(subscribers, ID)
corelog.Infof(nil, "%s %s detached", app, ID) //nolint
// if no subscribers for this app, remove the key
if len(subscribers) == 0 {
l.deleteSubscribers(app)
}
}
func (l *logBroadcaster) broadcast(log *types.Log) {
l.RLock()
defer l.RUnlock()
subscribers := l.getSubscribers(log.Name)
if len(subscribers) == 0 {
return
}
data, err := json.Marshal(log)
if err != nil {
corelog.Error(nil, err) //nolint
return
}
line := fmt.Sprintf("%X\r\n%s\r\n\r\n", len(data)+2, string(data))
// use wait group to make sure the logs are ordered
wg := &sync.WaitGroup{}
wg.Add(len(subscribers))
for ID, sub := range subscribers {
ID := ID
sub := sub
_ = utils.Pool.Submit(func() {
defer wg.Done()
if sub.isDone() {
return
}
if _, err := sub.buf.Write([]byte(line)); err != nil {
corelog.Debugf(nil, "[broadcast] failed to write into %v, err: %v", ID, err) //nolint
sub.cancel()
sub.errChan <- err
return
}
sub.buf.Flush()
})
}
wg.Wait()
}
func (l *logBroadcaster) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
corelog.Info(ctx, "[logBroadcaster] stops")
return
case log := <-l.logC:
l.broadcast(log)
}
}
}