-
Notifications
You must be signed in to change notification settings - Fork 42
/
async.go
122 lines (110 loc) · 2.67 KB
/
async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package log
import (
"io"
"sync"
"time"
)
// AsyncWriter is an io.WriteCloser that writes asynchronously.
type AsyncWriter struct {
// BufferSize is the size in bytes of the buffer, the default size is 32KB.
BufferSize int
// ChannelSize is the size of the data channel, the default size is 100.
ChannelSize int
// SyncDuration is the duration of the writer syncs, the default duration is 5s.
SyncDuration time.Duration
// Writer specifies the writer of output.
Writer io.Writer
once sync.Once
ch chan []byte
chDone chan error
sync chan struct{}
syncDone chan error
}
// Sync syncs all pending log I/O.
func (w *AsyncWriter) Sync() (err error) {
w.sync <- struct{}{}
err = <-w.syncDone
return
}
// Close implements io.Closer, and closes the underlying Writer.
func (w *AsyncWriter) Close() (err error) {
w.ch <- nil // instead of close(w.ch) to avoid panic other goroutine
err = <-w.chDone
return
}
var a2kpool = sync.Pool{
New: func() interface{} {
return make([]byte, 0, 2048)
},
}
// Write implements io.Writer. If a write would cause the log buffer to be larger
// than Size, the buffer is written to the underlying Writer and cleared.
func (w *AsyncWriter) Write(p []byte) (n int, err error) {
w.once.Do(func() {
if w.BufferSize == 0 {
w.BufferSize = 32 * 1024
}
if w.ChannelSize == 0 {
w.ChannelSize = 100
}
if w.SyncDuration == 0 {
w.SyncDuration = 5 * time.Second
}
// channels
w.ch = make(chan []byte, w.ChannelSize)
w.chDone = make(chan error)
w.sync = make(chan struct{})
w.syncDone = make(chan error)
// data routine
go func(w *AsyncWriter) {
var err error
buf := make([]byte, 0, w.BufferSize+4096)
ticker := time.NewTicker(w.SyncDuration)
for {
select {
case b := <-w.ch:
isNil := b == nil
if len(b) != 0 {
buf = append(buf, b...)
a2kpool.Put(b)
}
// full or closed
if len(buf) >= w.BufferSize || (isNil && len(buf) != 0) {
_, err = w.Writer.Write(buf)
buf = buf[:0]
}
if isNil {
// channel closed, so close writer and quit.
if closer, ok := w.Writer.(io.Closer); ok {
err1 := closer.Close()
if err1 != nil && err == nil {
err = err1
}
}
w.chDone <- err
ticker.Stop()
return
}
case <-w.sync:
if len(buf) != 0 {
_, err = w.Writer.Write(buf)
buf = buf[:0]
} else {
err = nil
}
w.syncDone <- err
case <-ticker.C:
if len(buf) != 0 {
_, err = w.Writer.Write(buf)
buf = buf[:0]
} else {
err = nil
}
}
}
}(w)
})
// copy and sends data
w.ch <- append(a2kpool.Get().([]byte)[:0], p...)
return
}