/
io.go
248 lines (212 loc) · 6.1 KB
/
io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package progress
// Acknowledgement: The reader & writer implementations were originally
// adapted from github.com/dolmen-go/contextio.
import (
"context"
"io"
"github.com/neilotoole/sq/libsq/core/errz"
"github.com/neilotoole/sq/libsq/core/ioz/contextio"
)
// NewWriter returns a progress.Writer that wraps w, is context-aware, and
// generates a progress bar as bytes are written to w. It is expected that ctx
// contains a *progress.Progress, as returned by progress.FromContext. If not,
// this function delegates to contextio.NewWriter: the returned writer will
// still be context-aware. See the contextio package for more details.
//
// Context state is checked BEFORE every Write.
//
// The returned progress.Writer implements io.ReaderFrom to allow io.Copy
// to select the best strategy while still checking the context state before
// every chunk transfer.
//
// The returned progress.Writer also implements io.Closer, even if the
// underlying writer does not. This is necessary because we need a means of
// stopping the progress bar when writing is complete. If the underlying writer
// does implement io.Closer, it will be closed when the returned writer is
// closed.
//
// The caller is expected to close the returned writer, which results in the
// progress bar being removed. However, the progress bar can also be removed
// independently of closing the writer by invoking Writer.Stop.
//
// If size is unknown, set to -1; this will result in an indeterminate progress
// spinner instead of a bar.
func NewWriter(ctx context.Context, msg string, size int64, w io.Writer) Writer {
if w, ok := w.(*progCopier); ok && ctx == w.ctx {
return w
}
pb := FromContext(ctx)
if pb == nil {
// No progress bar in context, so we delegate to contextio.
return writerAdapter{contextio.NewWriter(ctx, w)}
}
b := pb.NewByteCounter(msg, size)
return &progCopier{progWriter{
ctx: ctx,
w: w,
b: b,
}}
}
var _ io.WriteCloser = (*progWriter)(nil)
type progWriter struct {
ctx context.Context
w io.Writer
// delayCh <-chan struct{}
b Bar
}
// Write implements io.Writer, but with context and progress interaction.
func (w *progWriter) Write(p []byte) (n int, err error) {
select {
case <-w.ctx.Done():
w.b.Stop()
return 0, w.ctx.Err()
default:
}
n, err = w.w.Write(p)
w.b.Incr(n)
if err != nil {
w.b.Stop()
}
return n, err
}
// Close implements io.WriteCloser, but with context and
// progress interaction.
func (w *progWriter) Close() error {
w.b.Stop()
var closeErr error
if c, ok := w.w.(io.Closer); ok {
closeErr = errz.Err(c.Close())
}
select {
case <-w.ctx.Done():
return w.ctx.Err()
default:
return closeErr
}
}
// NewReader returns an io.Reader that wraps r, is context-aware, and
// generates a progress bar as bytes are read from r. It is expected that ctx
// contains a *progress.Progress, as returned by progress.FromContext. If not,
// this function delegates to contextio.NewReader: the returned reader will
// still be context-ware. See the contextio package for more details.
//
// Context state is checked BEFORE every Read.
//
// The returned io.Reader also implements io.Closer, even if the underlying
// reader does not. This is necessary because we need a means of stopping the
// progress bar when writing is complete. If the underlying reader does
// implement io.Closer, it will be closed when the returned reader is closed.
func NewReader(ctx context.Context, msg string, size int64, r io.Reader) io.Reader {
if r, ok := r.(*progReader); ok && ctx == r.ctx {
return r
}
pb := FromContext(ctx)
if pb == nil {
return contextio.NewReader(ctx, r)
}
b := pb.NewByteCounter(msg, size)
pr := &progReader{
ctx: ctx,
r: r,
b: b,
}
return pr
}
var _ io.ReadCloser = (*progReader)(nil)
type progReader struct {
ctx context.Context
r io.Reader
b Bar
}
// Close implements io.ReadCloser, but with context awareness.
func (r *progReader) Close() error {
r.b.Stop()
var closeErr error
if c, ok := r.r.(io.ReadCloser); ok {
closeErr = errz.Err(c.Close())
}
select {
case <-r.ctx.Done():
return r.ctx.Err()
default:
return closeErr
}
}
// Read implements io.Reader, but with context and progress interaction.
func (r *progReader) Read(p []byte) (n int, err error) {
select {
case <-r.ctx.Done():
r.b.Stop()
return 0, r.ctx.Err()
default:
}
n, err = r.r.Read(p)
r.b.Incr(n)
if err != nil {
r.b.Stop()
}
return n, err
}
var _ io.ReaderFrom = (*progCopier)(nil)
// Writer is an io.WriteCloser as returned by NewWriter.
type Writer interface {
io.WriteCloser
// Stop stops and removes the progress bar. Typically this is accomplished
// by invoking Writer.Close, but there are circumstances where it may
// be desirable to stop the progress bar without closing the underlying
// writer.
Stop()
}
var _ Writer = (*writerAdapter)(nil)
// writerAdapter wraps an io.Writer to implement progress.Writer.
// This is only used, by NewWriter, when there is no progress bar
// in the context, and thus NewWriter delegates to contextio.NewWriter,
// but we still need to implement progress.Writer.
type writerAdapter struct {
io.Writer
}
// Close implements io.WriteCloser. If the underlying
// writer implements io.Closer, it will be closed.
func (w writerAdapter) Close() error {
if c, ok := w.Writer.(io.Closer); ok {
return c.Close()
}
return nil
}
// Stop implements Writer and is no-op.
func (w writerAdapter) Stop() {
}
var _ Writer = (*progCopier)(nil)
type progCopier struct {
progWriter
}
// Stop implements progress.Writer.
func (w *progCopier) Stop() {
w.b.Stop()
}
// ReadFrom implements io.ReaderFrom, but with context and
// progress interaction.
func (w *progCopier) ReadFrom(r io.Reader) (n int64, err error) {
if _, ok := w.w.(io.ReaderFrom); ok {
// Let the original Writer decide the chunk size.
rdr := &progReader{
ctx: w.ctx,
r: r,
b: w.b,
}
return io.Copy(w.progWriter.w, rdr)
}
select {
case <-w.ctx.Done():
w.b.Stop()
return 0, w.ctx.Err()
default:
// The original Writer is not a ReaderFrom.
// Let the Reader decide the chunk size.
n, err = io.Copy(&w.progWriter, r)
if err != nil {
w.b.Stop()
}
return n, err
}
}