forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathio.go
133 lines (115 loc) · 3.83 KB
/
io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
//
// Written by Maxim Khitrov (November 2012)
//
package flowrate
import (
"errors"
"io"
)
// ErrLimit is returned by the Writer when a non-blocking write is short due to
// the transfer rate limit.
var ErrLimit = errors.New("flowrate: flow rate limit exceeded")
// Limiter is implemented by the Reader and Writer to provide a consistent
// interface for monitoring and controlling data transfer.
type Limiter interface {
Done() int64
Status() Status
SetTransferSize(bytes int64)
SetLimit(new int64) (old int64)
SetBlocking(new bool) (old bool)
}
// Reader implements io.ReadCloser with a restriction on the rate of data
// transfer.
type Reader struct {
io.Reader // Data source
*Monitor // Flow control monitor
limit int64 // Rate limit in bytes per second (unlimited when <= 0)
block bool // What to do when no new bytes can be read due to the limit
}
// NewReader restricts all Read operations on r to limit bytes per second.
func NewReader(r io.Reader, limit int64) *Reader {
return &Reader{r, New(0, 0), limit, true}
}
// Read reads up to len(p) bytes into p without exceeding the current transfer
// rate limit. It returns (0, nil) immediately if r is non-blocking and no new
// bytes can be read at this time.
func (r *Reader) Read(p []byte) (n int, err error) {
p = p[:r.Limit(len(p), r.limit, r.block)]
if len(p) > 0 {
n, err = r.IO(r.Reader.Read(p))
}
return
}
// SetLimit changes the transfer rate limit to new bytes per second and returns
// the previous setting.
func (r *Reader) SetLimit(new int64) (old int64) {
old, r.limit = r.limit, new
return
}
// SetBlocking changes the blocking behavior and returns the previous setting. A
// Read call on a non-blocking reader returns immediately if no additional bytes
// may be read at this time due to the rate limit.
func (r *Reader) SetBlocking(new bool) (old bool) {
old, r.block = r.block, new
return
}
// Close closes the underlying reader if it implements the io.Closer interface.
func (r *Reader) Close() error {
defer r.Done()
if c, ok := r.Reader.(io.Closer); ok {
return c.Close()
}
return nil
}
// Writer implements io.WriteCloser with a restriction on the rate of data
// transfer.
type Writer struct {
io.Writer // Data destination
*Monitor // Flow control monitor
limit int64 // Rate limit in bytes per second (unlimited when <= 0)
block bool // What to do when no new bytes can be written due to the limit
}
// NewWriter restricts all Write operations on w to limit bytes per second. The
// transfer rate and the default blocking behavior (true) can be changed
// directly on the returned *Writer.
func NewWriter(w io.Writer, limit int64) *Writer {
return &Writer{w, New(0, 0), limit, true}
}
// Write writes len(p) bytes from p to the underlying data stream without
// exceeding the current transfer rate limit. It returns (n, ErrLimit) if w is
// non-blocking and no additional bytes can be written at this time.
func (w *Writer) Write(p []byte) (n int, err error) {
var c int
for len(p) > 0 && err == nil {
s := p[:w.Limit(len(p), w.limit, w.block)]
if len(s) > 0 {
c, err = w.IO(w.Writer.Write(s))
} else {
return n, ErrLimit
}
p = p[c:]
n += c
}
return
}
// SetLimit changes the transfer rate limit to new bytes per second and returns
// the previous setting.
func (w *Writer) SetLimit(new int64) (old int64) {
old, w.limit = w.limit, new
return
}
// SetBlocking changes the blocking behavior and returns the previous setting. A
// Write call on a non-blocking writer returns as soon as no additional bytes
// may be written at this time due to the rate limit.
func (w *Writer) SetBlocking(new bool) (old bool) {
old, w.block = w.block, new
return
}
// Close closes the underlying writer if it implements the io.Closer interface.
func (w *Writer) Close() error {
defer w.Done()
if c, ok := w.Writer.(io.Closer); ok {
return c.Close()
}
return nil
}