/
cursor.go
83 lines (74 loc) · 1.71 KB
/
cursor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package commitlog
import (
"io"
"sync"
"sync/atomic"
"github.com/pkg/errors"
)
type Cursor interface {
io.ReadSeeker
}
type cursor struct {
mtx sync.Mutex
currentSegment Segment
pos int64
log *commitLog
}
func (c *cursor) Seek(offset int64, whence int) (int64, error) {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.seek(offset, whence)
}
func (c *cursor) seek(offset int64, whence int) (int64, error) {
var err error
var target uint64
lastOffset := int64(atomic.LoadUint64(&c.log.currentOffset))
switch whence {
case io.SeekEnd:
if offset > lastOffset {
target = uint64(lastOffset)
} else {
target = uint64(lastOffset + offset)
}
case io.SeekStart:
if offset < 0 {
return 0, errors.New("invalid offset")
}
target = uint64(offset)
case io.SeekCurrent:
return 0, errors.New("SeekCurrent unsupported when reading the log")
default:
return 0, errors.New("invalid whence")
}
if target > uint64(lastOffset) {
target = uint64(lastOffset)
}
c.currentSegment = c.log.lookupOffset(target)
c.pos, err = c.currentSegment.LookupPosition(target)
return int64(target), err
}
func (c *cursor) Read(p []byte) (int, error) {
c.mtx.Lock()
defer c.mtx.Unlock()
var total int
for {
n, err := c.currentSegment.ReadAt(p[total:], c.pos)
c.pos += int64(n)
total += n
if err == io.EOF {
if total > 0 {
return total, nil
}
currentLogOffset := c.currentSegment.BaseOffset() + c.currentSegment.CurrentOffset()
nextSegment := c.log.lookupOffset(currentLogOffset)
if nextSegment.BaseOffset() != c.currentSegment.BaseOffset() {
c.currentSegment = nextSegment
c.pos = 0
if len(p) > total {
continue
}
}
}
return total, err
}
}