/
compacting_scanner.go
163 lines (140 loc) · 4.25 KB
/
compacting_scanner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package wal
import (
"errors"
"io"
"sort"
)
var (
// ErrOpenTransaction is returned when the final frame in the WAL file is not a committing frame.
ErrOpenTransaction = errors.New("open transaction at end of WAL file")
)
type cFrame struct {
Pgno uint32
Commit uint32
Offset int64
}
type cFrames []*cFrame
func (c cFrames) Len() int { return len(c) }
func (c cFrames) Less(i, j int) bool { return c[i].Offset < c[j].Offset }
func (c cFrames) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
// CompactingScanner implements WALIterator to iterate over frames in a WAL file.
// It also compacts the WAL file, with Next() returning the last valid frame for each
// page in the right order such that they can be written to a new WAL file. This Scanner
// requires that the final frame in the WAL file is a committing frame. It will return an
// error at creation time if this is not the case.
type CompactingScanner struct {
readSeeker io.ReadSeeker
walReader *Reader
header *WALHeader
fullScan bool
cIdx int
frames cFrames
}
// NewFastCompactingScanner creates a new CompactingScanner with the given io.ReadSeeker.
// It performs a fast scan of the WAL file, assuming that the file is valid and does not
// need to be checked.
func NewFastCompactingScanner(r io.ReadSeeker) (*CompactingScanner, error) {
return NewCompactingScanner(r, false)
}
// NewCompactingScanner creates a new CompactingScanner with the given io.ReadSeeker.
// If fullScan is true, the scanner will perform a full scan of the WAL file, performing
// a checksum on each frame. If fullScan is false, the scanner will only scan the file
// sufficiently to find the last valid frame for each page. This is faster when the
// caller knows that the entire WAL file is valid, and will not contain pages from a
// previous checkpointing operation.
func NewCompactingScanner(r io.ReadSeeker, fullScan bool) (*CompactingScanner, error) {
walReader := NewReader(r)
err := walReader.ReadHeader()
if err != nil {
return nil, err
}
hdr := &WALHeader{
Magic: walReader.magic,
Version: WALSupportedVersion,
PageSize: walReader.PageSize(),
Seq: walReader.seq,
Salt1: walReader.salt1,
Salt2: walReader.salt2,
Checksum1: walReader.chksum1,
Checksum2: walReader.chksum2,
}
s := &CompactingScanner{
readSeeker: r,
walReader: walReader,
header: hdr,
fullScan: fullScan,
}
if err := s.scan(); err != nil {
return nil, err
}
return s, nil
}
// Header returns the header of the WAL file.
func (c *CompactingScanner) Header() (*WALHeader, error) {
return c.header, nil
}
// Next return the next logical frame from the WAL file.
func (c *CompactingScanner) Next() (*Frame, error) {
if c.cIdx >= len(c.frames) {
return nil, io.EOF
}
frame := &Frame{
Pgno: c.frames[c.cIdx].Pgno,
Commit: c.frames[c.cIdx].Commit,
Data: make([]byte, c.header.PageSize),
}
if _, err := c.readSeeker.Seek(c.frames[c.cIdx].Offset+WALFrameHeaderSize, io.SeekStart); err != nil {
return nil, err
}
if _, err := io.ReadFull(c.readSeeker, frame.Data); err != nil {
return nil, err
}
c.cIdx++
return frame, nil
}
func (c *CompactingScanner) scan() error {
waitingForCommit := false
txFrames := make(map[uint32]*cFrame)
frames := make(map[uint32]*cFrame)
var buf []byte
if c.fullScan {
buf = make([]byte, c.header.PageSize)
}
for {
pgno, commit, err := c.walReader.ReadFrame(buf)
if err == io.EOF {
break
} else if err != nil {
return err
}
frame := &cFrame{
Pgno: pgno,
Commit: commit,
Offset: c.walReader.Offset(),
}
// Save latest frame information for each page.
txFrames[pgno] = frame
// If this is not a committing frame, continue to next frame.
if commit == 0 {
waitingForCommit = true
continue
}
waitingForCommit = false
// At the end of each transaction, copy frame information to main map.
for k, v := range txFrames {
frames[k] = v
}
txFrames = make(map[uint32]*cFrame)
}
if waitingForCommit {
return ErrOpenTransaction
}
// Now we have the latest version of each frame. Next we need to sort
// them by offset so we return them in the correct order.
c.frames = make(cFrames, 0, len(frames))
for _, frame := range frames {
c.frames = append(c.frames, frame)
}
sort.Sort(c.frames)
return nil
}