-
Notifications
You must be signed in to change notification settings - Fork 105
/
tracer.go
127 lines (108 loc) · 3.98 KB
/
tracer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package nsqd
import (
"fmt"
"github.com/absolute8511/nsq/internal/flume_log"
"github.com/absolute8511/nsq/internal/levellogger"
"time"
)
const (
traceModule = "msgtracer"
)
type IMsgTracer interface {
Start()
TracePub(topic string, traceID uint64, msg *Message, diskOffset BackendOffset, currentCnt int64)
// state will be READ_QUEUE, Start, Req, Fin, Timeout
TraceSub(topic string, channel string, state string, traceID uint64, msg *Message, clientID string)
}
var nsqMsgTracer IMsgTracer
type TraceLogItemInfo struct {
MsgID uint64 `json:"msgid"`
TraceID uint64 `json:"traceid"`
Topic string `json:"topic"`
Channel string `json:"channel"`
Timestamp int64 `json:"timestamp"`
Action string `json:"action"`
}
func SetRemoteMsgTracer(remote string) {
if remote != "" {
nsqMsgTracer = NewRemoteMsgTracer(remote)
}
}
// just print the trace log
type LogMsgTracer struct {
MID string
}
func (self *LogMsgTracer) Start() {
}
func (self *LogMsgTracer) TracePub(topic string, traceID uint64, msg *Message, diskOffset BackendOffset, currentCnt int64) {
nsqLog.Logf("[TRACE] topic %v trace id %v: message %v put at offset: %v, current count: %v at time %v", topic, msg.TraceID,
msg.ID, diskOffset, currentCnt, time.Now().UnixNano())
}
func (self *LogMsgTracer) TraceSub(topic string, channel string, state string, traceID uint64, msg *Message, clientID string) {
nsqLog.Logf("[TRACE] topic %v channel %v trace id %v: message %v (offset: %v) consume state %v from client %v at time: %v, attempt: %v",
topic, channel, msg.TraceID,
msg.ID, msg.offset, state, clientID, time.Now().UnixNano(), msg.Attempts)
}
// this tracer will send the trace info to remote server for each seconds
type RemoteMsgTracer struct {
remoteAddr string
remoteLogger *flume_log.FlumeLogger
localTracer *LogMsgTracer
}
func NewRemoteMsgTracer(remote string) IMsgTracer {
return &RemoteMsgTracer{
remoteAddr: remote,
remoteLogger: flume_log.NewFlumeLoggerWithAddr(remote),
localTracer: &LogMsgTracer{},
}
}
func (self *RemoteMsgTracer) Start() {
self.localTracer.Start()
}
func (self *RemoteMsgTracer) Stop() {
self.remoteLogger.Stop()
}
func (self *RemoteMsgTracer) TracePub(topic string, traceID uint64, msg *Message, diskOffset BackendOffset, currentCnt int64) {
now := time.Now().UnixNano()
detail := flume_log.NewDetailInfo(traceModule)
var traceItem [1]TraceLogItemInfo
traceItem[0].MsgID = uint64(msg.ID)
traceItem[0].TraceID = msg.TraceID
traceItem[0].Topic = topic
traceItem[0].Timestamp = now
traceItem[0].Action = "PUB"
detail.SetExtraInfo(traceItem[:])
l := fmt.Sprintf("[TRACE] topic %v trace id %v: message %v put at offset: %v, current count: %v at time %v", topic, msg.TraceID,
msg.ID, diskOffset, currentCnt, now)
err := self.remoteLogger.Info(l, detail)
if err != nil || nsqLog.Level() >= levellogger.LOG_DEBUG {
if err != nil {
nsqLog.Warningf("send log to remote error: %v", err)
}
self.localTracer.TracePub(topic, traceID, msg, diskOffset, currentCnt)
}
}
func (self *RemoteMsgTracer) TraceSub(topic string, channel string, state string, traceID uint64, msg *Message, clientID string) {
now := time.Now().UnixNano()
var traceItem [1]TraceLogItemInfo
traceItem[0].MsgID = uint64(msg.ID)
traceItem[0].TraceID = msg.TraceID
traceItem[0].Topic = topic
traceItem[0].Channel = channel
traceItem[0].Timestamp = now
traceItem[0].Action = state
detail := flume_log.NewDetailInfo(traceModule)
detail.SetExtraInfo(traceItem[:])
l := fmt.Sprintf("[TRACE] topic %v channel %v trace id %v: message %v (offset: %v) consume state %v from client %v at time: %v, attempt: %v",
topic, channel, msg.TraceID, msg.ID, msg.offset, state, clientID, time.Now().UnixNano(), msg.Attempts)
err := self.remoteLogger.Info(l, detail)
if err != nil || nsqLog.Level() >= levellogger.LOG_DEBUG {
if err != nil {
nsqLog.Warningf("send log to remote error: %v", err)
}
self.localTracer.TraceSub(topic, channel, state, traceID, msg, clientID)
}
}
func init() {
nsqMsgTracer = &LogMsgTracer{}
}