/
spooledWriteCloser.go
142 lines (125 loc) · 3.43 KB
/
spooledWriteCloser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package gorill
import (
"bufio"
"fmt"
"io"
"sync"
"time"
)
// DefaultBufSize is the default size of the underlying bufio.Writer buffer.
const DefaultBufSize = 4096
// DefaultFlushPeriod is the default frequency of buffer flushes.
const DefaultFlushPeriod = 15 * time.Second
// SpooledWriteCloser spools bytes written to it through a bufio.Writer, periodically flushing data
// written to underlying io.WriteCloser.
type SpooledWriteCloser struct {
bufSize int
bw *bufio.Writer
flushPeriod time.Duration
halted bool
iowc io.WriteCloser
jobs chan *rillJob
jobsDone sync.WaitGroup
lock sync.RWMutex
}
// SpooledWriteCloserSetter is any function that modifies a SpooledWriteCloser being instantiated.
type SpooledWriteCloserSetter func(*SpooledWriteCloser) error
// Flush is used to configure a new SpooledWriteCloser to periodically flush.
func Flush(periodicity time.Duration) SpooledWriteCloserSetter {
return func(sw *SpooledWriteCloser) error {
if periodicity <= 0 {
return fmt.Errorf("periodicity must be greater than 0: %s", periodicity)
}
sw.flushPeriod = periodicity
return nil
}
}
// BufSize is used to configure a new SpooledWriteCloser's buffer size.
func BufSize(size int) SpooledWriteCloserSetter {
return func(sw *SpooledWriteCloser) error {
if size <= 0 {
return fmt.Errorf("buffer size must be greater than 0: %s", size)
}
sw.bufSize = size
return nil
}
}
// NewSpooledWriteCloser returns a SpooledWriteCloser that spools bytes written to it through a
// bufio.Writer, periodically forcing the bufio.Writer to flush its contents.
func NewSpooledWriteCloser(iowc io.WriteCloser, setters ...SpooledWriteCloserSetter) (*SpooledWriteCloser, error) {
w := &SpooledWriteCloser{
bufSize: DefaultBufSize,
flushPeriod: DefaultFlushPeriod,
iowc: iowc,
jobs: make(chan *rillJob, 1),
}
for _, setter := range setters {
if err := setter(w); err != nil {
return nil, err
}
}
w.bw = bufio.NewWriterSize(iowc, w.bufSize)
w.jobsDone.Add(1)
go func() {
ticker := time.NewTicker(w.flushPeriod)
defer ticker.Stop()
defer w.jobsDone.Done()
for {
select {
case job, more := <-w.jobs:
if !more {
return
}
switch job.op {
case _write:
n, err := w.bw.Write(job.data)
job.results <- rillResult{n, err}
case _flush:
err := w.bw.Flush()
job.results <- rillResult{0, err}
}
case <-ticker.C:
w.bw.Flush()
}
}
}()
return w, nil
}
// Write spools a byte slice of data to be written to the SpooledWriteCloser.
func (w *SpooledWriteCloser) Write(data []byte) (int, error) {
w.lock.RLock()
defer w.lock.RUnlock()
if w.halted {
return 0, ErrWriteAfterClose{}
}
job := newRillJob(_write, data)
w.jobs <- job
// wait for results
result := <-job.results
return result.n, result.err
}
// Flush causes all data not yet written to the output stream to be flushed.
func (w *SpooledWriteCloser) Flush() error {
w.lock.RLock()
defer w.lock.RUnlock()
if w.halted {
return ErrWriteAfterClose{}
}
job := newRillJob(_flush, nil)
w.jobs <- job
result := <-job.results
// wait for results
return result.err
}
// Close frees resources when a SpooledWriteCloser is no longer needed.
func (w *SpooledWriteCloser) Close() error {
w.lock.Lock()
defer w.lock.Unlock()
close(w.jobs)
w.jobsDone.Wait()
w.halted = true
var errors ErrList
errors.Append(w.bw.Flush())
errors.Append(w.iowc.Close())
return errors.Err()
}