-
Notifications
You must be signed in to change notification settings - Fork 6
/
tail_ooo.go
134 lines (118 loc) · 2.85 KB
/
tail_ooo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package xlog
import (
"bytes"
"encoding/json"
"fmt"
"io"
"sort"
"sync"
"time"
"get.pme.sh/pmesh/ray"
"github.com/valyala/fastjson"
)
type oooLog struct {
Line []byte
Time time.Time
}
// Mux writer is a writer that buffers lines and writes them in order
// of their timestamps. It is used to write logs from multiple sources
// to a single output, while preserving the order of the logs.
// Additionally, it inserts sending host information into the caller data.
type MuxWriter struct {
mu sync.Mutex
out io.Writer // The underlying writer
lnbuf bytes.Buffer // Buffer for incomplete lines
buf []oooLog // Buffer for complete lines, preceding the flush time
flushTime time.Time // The time when the logger was created
}
func ToMuxWriter(out io.Writer) *MuxWriter {
if mw, ok := out.(*MuxWriter); ok {
return mw
}
lo := &MuxWriter{
out: out,
buf: make([]oooLog, 0, 100),
flushTime: time.Now().Add(500 * time.Millisecond),
}
time.AfterFunc(500*time.Millisecond, lo.Flush)
return lo
}
func (o *MuxWriter) flushLocked() {
sort.Slice(o.buf, func(i, j int) bool {
return o.buf[i].Time.Before(o.buf[j].Time)
})
for _, l := range o.buf {
o.out.Write(l.Line)
}
o.buf = nil
}
func (o *MuxWriter) Flush() {
o.mu.Lock()
defer o.mu.Unlock()
o.flushLocked()
}
func (o *MuxWriter) WriteAs(p []byte, host string) (n int, err error) {
o.mu.Lock()
defer o.mu.Unlock()
n = len(p)
// Write the input to the line buffer
o.lnbuf.Write(p)
maxTime := time.Time{}
for {
// Read the next line from the buffer
p, e := o.lnbuf.ReadBytes('\n')
if e == io.EOF {
break
} else if e != nil {
return n, e
}
// Parse the line
ln, err := ParseLine(p)
if err != nil {
// Invalid lines are written directly to the output
_, err := o.out.Write(p)
if err != nil {
return 0, err
}
continue
}
// Insert the host into the caller data
if host != "" {
dom := ln.GetStringBytes(DomainFieldName)
newDom, _ := json.Marshal(fmt.Sprintf("%s/%s", host, dom))
ln.Set(DomainFieldName, fastjson.MustParseBytes(newDom))
p = ln.MarshalTo(p[:0])
}
// If we're buffering, append the line, else write it directly
if o.buf != nil {
t := ln.Time()
o.buf = append(o.buf, oooLog{Line: p, Time: t})
if t.After(maxTime) {
maxTime = t
}
} else {
_, err := o.out.Write(p)
if err != nil {
return 0, err
}
}
}
// If we are past the flush time, flush the buffer
if maxTime.After(o.flushTime) {
o.flushLocked()
}
return n, nil
}
func (o *MuxWriter) Write(p []byte) (n int, err error) {
return o.WriteAs(p, "")
}
type subWriter struct {
mux *MuxWriter
host string
}
func (o *subWriter) Write(p []byte) (n int, err error) {
return o.mux.WriteAs(p, o.host)
}
func (o *MuxWriter) SubWriter(host string) io.Writer {
return &subWriter{mux: o, host: ray.ToHostString(host)}
}