-
Notifications
You must be signed in to change notification settings - Fork 0
/
writer.go
66 lines (57 loc) · 1.15 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package logs
import (
"errors"
"golang.org/x/sync/errgroup"
"io"
"sync/atomic"
)
type Writer struct {
ch chan []byte // 用于存储要写入的数据的通道
cl atomic.Bool // 用于标识通道是否已关闭的原子变量
g errgroup.Group // 用于管理协程的协程组
w io.WriteCloser // 写入的目标
}
func NewWriter(w io.WriteCloser, bufferSize int) *Writer {
fw := &Writer{
ch: make(chan []byte, bufferSize),
cl: atomic.Bool{},
g: errgroup.Group{},
w: w,
}
fw.g.Go(fw.handle)
return fw
}
func (w *Writer) handle() (err error) {
for {
select {
case d, ok := <-w.ch:
if !ok {
return
}
if _, err = w.w.Write(d); err != nil {
return err
}
}
}
}
func (w *Writer) Write(p []byte) (n int, err error) {
if p == nil {
return
}
if w.cl.Load() {
return 0, errors.New("channel is closed") // 如果通道已关闭,则返回错误信息
}
s := string(p)
select {
case w.ch <- []byte(s): // 将数据写入通道中
return len(p), nil
default:
return 0, errors.New("channel is full")
}
}
func (w *Writer) Close() error {
w.g.Go(w.w.Close)
w.cl.Store(true)
close(w.ch)
return w.g.Wait()
}