-
Notifications
You must be signed in to change notification settings - Fork 48
/
ring_buffer.go
237 lines (199 loc) · 4.8 KB
/
ring_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
// Copyright 2019 smallnest. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package ringbuffer
import (
"errors"
"sync"
)
var (
ErrTooManyDataToWrite = errors.New("too many data to write")
ErrIsFull = errors.New("ringbuffer is full")
ErrIsEmpty = errors.New("ringbuffer is empty")
)
// RingBuffer is a circular buffer that implement io.ReaderWriter interface.
type RingBuffer struct {
buf []byte
size int
r int // next position to read
w int // next position to write
isFull bool
mu sync.Mutex
}
// New returns a new RingBuffer whose buffer has the given size.
func New(size int) *RingBuffer {
return &RingBuffer{
buf: make([]byte, size),
size: size,
}
}
// Read reads up to len(p) bytes into p. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered. Even if Read returns n < len(p), it may use all of p as scratch space during the call. If some data is available but not len(p) bytes, Read conventionally returns what is available instead of waiting for more.
// When Read encounters an error or end-of-file condition after successfully reading n > 0 bytes, it returns the number of bytes read. It may return the (non-nil) error from the same call or return the error (and n == 0) from a subsequent call.
// Callers should always process the n > 0 bytes returned before considering the error err. Doing so correctly handles I/O errors that happen after reading some bytes and also both of the allowed EOF behaviors.
func (r *RingBuffer) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
r.mu.Lock()
if r.w == r.r && !r.isFull {
r.mu.Unlock()
return 0, ErrIsEmpty
}
if r.w > r.r {
n = r.w - r.r
if n > len(p) {
n = len(p)
}
copy(p, r.buf[r.r:r.r+n])
r.r = r.r + n
r.mu.Unlock()
return
}
n = r.size - r.r + r.w
if n > len(p) {
n = len(p)
}
if r.r+n <= r.size {
copy(p, r.buf[r.r:r.r+n])
} else {
c1 := r.size - r.r
copy(p, r.buf[r.r:r.size])
c2 := n - c1
copy(p[c1:], r.buf[0:c2])
}
r.r = (r.r + n) % r.size
r.isFull = false
r.mu.Unlock()
return n, err
}
// Write writes len(p) bytes from p to the underlying buf.
// It returns the number of bytes written from p (0 <= n <= len(p)) and any error encountered that caused the write to stop early.
// Write returns a non-nil error if it returns n < len(p).
// Write must not modify the slice data, even temporarily.
func (r *RingBuffer) Write(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
r.mu.Lock()
if r.w == r.r && r.isFull {
r.mu.Unlock()
return 0, ErrIsFull
}
var avail int
if r.w >= r.r {
avail = r.size - r.w + r.r
} else {
avail = r.r - r.w
}
if len(p) > avail {
err = ErrTooManyDataToWrite
p = p[:avail]
}
n = len(p)
if r.w >= r.r {
c1 := r.size - r.w
if c1 >= n {
copy(r.buf[r.w:], p)
r.w += n
} else {
copy(r.buf[r.w:], p[:c1])
c2 := n - c1
copy(r.buf[0:], p[c1+1:])
r.w = c2
}
} else {
copy(r.buf[r.w:], p)
r.w += n
}
if r.w == r.size {
r.w = 0
}
if r.w == r.r {
r.isFull = true
}
r.mu.Unlock()
return n, err
}
// Length return the length of available read bytes.
func (r *RingBuffer) Length() int {
r.mu.Lock()
defer r.mu.Unlock()
if r.w == r.r {
if r.isFull {
return r.size
}
return 0
}
if r.w > r.r {
return r.w - r.r
}
return r.size - r.r + r.w
}
// Capacity returns the size of the underlying buffer.
func (r *RingBuffer) Capacity() int {
return r.size
}
// Free returns the length of available bytes to write.
func (r *RingBuffer) Free() int {
r.mu.Lock()
defer r.mu.Unlock()
if r.w == r.r {
if r.isFull {
return 0
}
return r.size
}
if r.w < r.r {
return r.r - r.w
}
return r.size - r.w + r.r
}
// Bytes returns all available read bytes. It does not move the read pointer and only copy the available data.
func (r *RingBuffer) Bytes() []byte {
r.mu.Lock()
defer r.mu.Unlock()
if r.w == r.r {
if r.isFull {
buf := make([]byte, r.size)
copy(buf, r.buf)
return buf
}
return nil
}
if r.w > r.r {
buf := make([]byte, r.w-r.r)
copy(buf, r.buf[r.r:r.w])
return buf
}
n := r.size - r.r + r.w
buf := make([]byte, r.w-r.r)
if r.r+n < r.size {
copy(buf, r.buf[r.r:r.r+n])
} else {
c1 := r.size - r.r
copy(buf, r.buf[r.r:r.size])
c2 := n - c1
copy(buf[c1:], r.buf[0:c2])
}
return buf
}
// IsFull returns this ringbuffer is full.
func (r *RingBuffer) IsFull() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.isFull
}
// IsEmpty returns this ringbuffer is empty.
func (r *RingBuffer) IsEmpty() bool {
r.mu.Lock()
defer r.mu.Unlock()
return !r.isFull && r.w == r.r
}
// Reset the read pointer and writer pointer to zero.
func (r *RingBuffer) Reset() {
r.mu.Lock()
defer r.mu.Unlock()
r.r = 0
r.w = 0
r.isFull = false
}