This repository has been archived by the owner on Apr 7, 2019. It is now read-only.
/
y.go
286 lines (250 loc) · 7.26 KB
/
y.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package y
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"math"
"os"
"sync"
"time"
"github.com/pkg/errors"
)
// ErrEOF indicates an end of file when trying to read from a memory mapped file
// and encountering the end of slice.
var ErrEOF = errors.New("End of mapped region")
const (
// Sync indicates that O_DSYNC should be set on the underlying file,
// ensuring that data writes do not return until the data is flushed
// to disk.
Sync = 1 << iota
// ReadOnly opens the underlying file on a read-only basis.
ReadOnly
)
var (
// This is O_DSYNC (datasync) on platforms that support it -- see file_unix.go
datasyncFileFlag = 0x0
// CastagnoliCrcTable is a CRC32 polynomial table
CastagnoliCrcTable = crc32.MakeTable(crc32.Castagnoli)
)
// OpenExistingFile opens an existing file, errors if it doesn't exist.
func OpenExistingFile(filename string, flags uint32) (*os.File, error) {
openFlags := os.O_RDWR
if flags&ReadOnly != 0 {
openFlags = os.O_RDONLY
}
if flags&Sync != 0 {
openFlags |= datasyncFileFlag
}
return os.OpenFile(filename, openFlags, 0)
}
// CreateSyncedFile creates a new file (using O_EXCL), errors if it already existed.
func CreateSyncedFile(filename string, sync bool) (*os.File, error) {
flags := os.O_RDWR | os.O_CREATE | os.O_EXCL
if sync {
flags |= datasyncFileFlag
}
return os.OpenFile(filename, flags, 0666)
}
// OpenSyncedFile creates the file if one doesn't exist.
func OpenSyncedFile(filename string, sync bool) (*os.File, error) {
flags := os.O_RDWR | os.O_CREATE
if sync {
flags |= datasyncFileFlag
}
return os.OpenFile(filename, flags, 0666)
}
// OpenTruncFile opens the file with O_RDWR | O_CREATE | O_TRUNC
func OpenTruncFile(filename string, sync bool) (*os.File, error) {
flags := os.O_RDWR | os.O_CREATE | os.O_TRUNC
if sync {
flags |= datasyncFileFlag
}
return os.OpenFile(filename, flags, 0666)
}
// SafeCopy does append(a[:0], src...).
func SafeCopy(a []byte, src []byte) []byte {
return append(a[:0], src...)
}
// Copy copies a byte slice and returns the copied slice.
func Copy(a []byte) []byte {
b := make([]byte, len(a))
copy(b, a)
return b
}
// KeyWithTs generates a new key by appending ts to key.
func KeyWithTs(key []byte, ts uint64) []byte {
out := make([]byte, len(key)+8)
copy(out, key)
binary.BigEndian.PutUint64(out[len(key):], math.MaxUint64-ts)
return out
}
// ParseTs parses the timestamp from the key bytes.
func ParseTs(key []byte) uint64 {
if len(key) <= 8 {
return 0
}
return math.MaxUint64 - binary.BigEndian.Uint64(key[len(key)-8:])
}
// CompareKeys checks the key without timestamp and checks the timestamp if keyNoTs
// is same.
// a<timestamp> would be sorted higher than aa<timestamp> if we use bytes.compare
// All keys should have timestamp.
func CompareKeys(key1 []byte, key2 []byte) int {
AssertTrue(len(key1) > 8 && len(key2) > 8)
if cmp := bytes.Compare(key1[:len(key1)-8], key2[:len(key2)-8]); cmp != 0 {
return cmp
}
return bytes.Compare(key1[len(key1)-8:], key2[len(key2)-8:])
}
// ParseKey parses the actual key from the key bytes.
func ParseKey(key []byte) []byte {
if key == nil {
return nil
}
AssertTrue(len(key) > 8)
return key[:len(key)-8]
}
// SameKey checks for key equality ignoring the version timestamp suffix.
func SameKey(src, dst []byte) bool {
if len(src) != len(dst) {
return false
}
return bytes.Equal(ParseKey(src), ParseKey(dst))
}
// Slice holds a reusable buf, will reallocate if you request a larger size than ever before.
// One problem is with n distinct sizes in random order it'll reallocate log(n) times.
type Slice struct {
buf []byte
}
// Resize reuses the Slice's buffer (or makes a new one) and returns a slice in that buffer of
// length sz.
func (s *Slice) Resize(sz int) []byte {
if cap(s.buf) < sz {
s.buf = make([]byte, sz)
}
return s.buf[0:sz]
}
// FixedDuration returns a string representation of the given duration with the
// hours, minutes, and seconds.
func FixedDuration(d time.Duration) string {
str := fmt.Sprintf("%02ds", int(d.Seconds())%60)
if d >= time.Minute {
str = fmt.Sprintf("%02dm", int(d.Minutes())%60) + str
}
if d >= time.Hour {
str = fmt.Sprintf("%02dh", int(d.Hours())) + str
}
return str
}
// Closer holds the two things we need to close a goroutine and wait for it to finish: a chan
// to tell the goroutine to shut down, and a WaitGroup with which to wait for it to finish shutting
// down.
type Closer struct {
closed chan struct{}
waiting sync.WaitGroup
}
// NewCloser constructs a new Closer, with an initial count on the WaitGroup.
func NewCloser(initial int) *Closer {
ret := &Closer{closed: make(chan struct{})}
ret.waiting.Add(initial)
return ret
}
// AddRunning Add()'s delta to the WaitGroup.
func (lc *Closer) AddRunning(delta int) {
lc.waiting.Add(delta)
}
// Signal signals the HasBeenClosed signal.
func (lc *Closer) Signal() {
close(lc.closed)
}
// HasBeenClosed gets signaled when Signal() is called.
func (lc *Closer) HasBeenClosed() <-chan struct{} {
return lc.closed
}
// Done calls Done() on the WaitGroup.
func (lc *Closer) Done() {
lc.waiting.Done()
}
// Wait waits on the WaitGroup. (It waits for NewCloser's initial value, AddRunning, and Done
// calls to balance out.)
func (lc *Closer) Wait() {
lc.waiting.Wait()
}
// SignalAndWait calls Signal(), then Wait().
func (lc *Closer) SignalAndWait() {
lc.Signal()
lc.Wait()
}
// Throttle allows a limited number of workers to run at a time. It also
// provides a mechanism to check for errors encountered by workers and wait for
// them to finish.
type Throttle struct {
wg sync.WaitGroup
ch chan struct{}
errCh chan error
}
// NewThrottle creates a new throttle with a max number of workers.
func NewThrottle(max int) *Throttle {
return &Throttle{
ch: make(chan struct{}, max),
errCh: make(chan error, max),
}
}
// Do should be called by workers before they start working. It blocks if there
// are already maximum number of workers working. If it detects an error from
// previously Done workers, it would return it.
func (t *Throttle) Do() error {
for {
select {
case t.ch <- struct{}{}:
t.wg.Add(1)
return nil
case err := <-t.errCh:
if err != nil {
return err
}
}
}
}
// Done should be called by workers when they finish working. They can also
// pass the error status of work done.
func (t *Throttle) Done(err error) {
if err != nil {
t.errCh <- err
}
select {
case <-t.ch:
default:
panic("Throttle Do Done mismatch")
}
t.wg.Done()
}
// Finish waits until all workers have finished working. It would return any
// error passed by Done.
func (t *Throttle) Finish() error {
t.wg.Wait()
close(t.ch)
close(t.errCh)
for err := range t.errCh {
if err != nil {
return err
}
}
return nil
}