-
Notifications
You must be signed in to change notification settings - Fork 39
/
rpc_log_service.go
68 lines (55 loc) · 1.32 KB
/
rpc_log_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package namespace
import (
"context"
"github.com/stratosnet/sds/rpc"
"github.com/stratosnet/sds/utils"
)
func RpcLogService() *rpcLogService {
return &rpcLogService{}
}
type rpcLogService struct{}
//func (s *rpcLogService) CleanUp() {
// utils.ClearRpcLogger()
//}
func (s *rpcLogService) LogSubscription(ctx context.Context, terminalId string) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
subscription := notifier.CreateSubscription()
writer := newRpcWriter(terminalId)
logCh := writer.getLogCh()
utils.AddRpcLogger(writer, terminalId)
go func() {
for {
select {
case log := <-logCh:
err := notifier.Notify(subscription.ID, log)
if err != nil {
break
}
case <-subscription.Err(): // client send an unsubscribe request
utils.DisableRpcLogger(terminalId)
return
}
}
}()
return subscription, nil
}
func newRpcWriter(id string) *rpcWriter {
return &rpcWriter{
terminalId: id,
logCh: make(chan utils.LogMsg),
}
}
type rpcWriter struct {
terminalId string
logCh chan utils.LogMsg
}
func (l *rpcWriter) getLogCh() chan utils.LogMsg {
return l.logCh
}
func (l *rpcWriter) Write(p []byte) (n int, err error) {
l.logCh <- utils.LogMsg{Msg: string(p)}
return len(p), nil
}