-
Notifications
You must be signed in to change notification settings - Fork 197
/
logSender.go
167 lines (136 loc) · 3.39 KB
/
logSender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package logs
import (
"bytes"
"strings"
"github.com/gorilla/websocket"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/common"
logger "github.com/multiversx/mx-chain-logger-go"
)
const disconnectMessage = -1
type logSender struct {
marshalizer marshal.Marshalizer
conn wsConn
writer *logWriter
log logger.Logger
lastProfile logger.Profile
}
// NewLogSender returns a new component that is able to communicate with the log viewer application.
// After the correct handshake it will send all logs that come through the logger subsystem
func NewLogSender(marshalizer marshal.Marshalizer, conn wsConn, log logger.Logger) (*logSender, error) {
if check.IfNil(marshalizer) {
return nil, ErrNilMarshalizer
}
if check.IfNil(log) {
return nil, ErrNilLogger
}
if conn == nil {
return nil, ErrNilWsConn
}
ls := &logSender{
marshalizer: marshalizer,
log: log,
conn: conn,
}
err := ls.registerLogWriter()
if err != nil {
return nil, err
}
return ls, nil
}
func (ls *logSender) registerLogWriter() error {
w := NewLogWriter()
formatter, err := logger.NewLogLineWrapperFormatter(ls.marshalizer)
if err != nil {
return err
}
err = logger.AddLogObserver(w, formatter)
if err != nil {
return err
}
ls.writer = w
return nil
}
// StartSendingBlocking initialize the handshake by waiting the correct pattern and after that
// will start sending logs information and in the same time monitor the current connection.
// When the connection ends it will revert the previous log pattern.
func (ls *logSender) StartSendingBlocking() {
ls.lastProfile = logger.GetCurrentProfile()
defer func() {
_ = ls.conn.Close()
_ = ls.writer.Close()
_ = logger.RemoveLogObserver(ls.writer)
_ = ls.lastProfile.Apply()
ls.log.Info("reverted log profile", "profile", ls.lastProfile.String())
}()
err := ls.waitForProfile()
if err != nil {
ls.log.Error(err.Error())
return
}
go ls.monitorConnection()
ls.doSendContinuously()
}
func (ls *logSender) waitForProfile() error {
_, message, err := ls.conn.ReadMessage()
if err != nil {
return err
}
if bytes.Equal(message, []byte(common.DefaultLogProfileIdentifier)) {
return nil
}
profile, err := logger.UnmarshalProfile(message)
if err != nil {
return err
}
ls.log.Info("websocket log profile received", "profile", profile.String())
err = profile.Apply()
if err != nil {
return err
}
logger.NotifyProfileChange()
return nil
}
func (ls *logSender) monitorConnection() {
var err error
var mt int
defer func() {
_ = ls.writer.Close()
}()
for {
mt, _, err = ls.conn.ReadMessage()
ls.log.Trace("message type", "value", mt)
if mt == websocket.CloseMessage || mt == disconnectMessage {
return
}
if err != nil {
return
}
}
}
func (ls *logSender) doSendContinuously() {
for {
shouldStop := ls.sendMessage()
if shouldStop {
return
}
}
}
func (ls *logSender) sendMessage() (shouldStop bool) {
data, ok := ls.writer.ReadBlocking()
if !ok {
return true
}
err := ls.conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
isConnectionClosed := strings.Contains(err.Error(), "websocket: close sent")
if !isConnectionClosed {
ls.log.Error("web socket error", "error", err.Error())
} else {
ls.log.Info("web socket", "connection", "closed")
}
return true
}
return false
}