-
Notifications
You must be signed in to change notification settings - Fork 0
/
logreader.go
157 lines (135 loc) · 3.95 KB
/
logreader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright (c) 2020 Bert Young. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package leveldb
import (
//"bytes"
"encoding/binary"
"fmt"
"log"
)
type LogReader struct {
src SequentialFile
blockOffset int
checkCrc bool
eof bool
record []byte
backingStore [blockSize]byte // A physical record
// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the
// record type stored in the header.
typeCrc [maxRecordType + 1]uint32
}
// Create a reader that will return log records from "*file".
// "*file" must remain live while this Reader is in use.
//
// If "reporter" is non-NULL, it is notified whenever some data is
// dropped due to a detected corruption. "*reporter" must remain
// live while this Reader is in use.
//
// If "checksum" is true, verify checksums if available.
//
// The Reader will start reading at the first record located at physical
// position >= initial_offset within the file.
func NewLogReader(src SequentialFile) *LogReader {
lr := &LogReader{src: src}
lr.blockOffset = 0
lr.checkCrc = true
lr.eof = false
lr.record = lr.backingStore[0:0]
var i byte = 0
for ; i < byte(maxRecordType); i++ {
lr.typeCrc[i] = uint32(CrcValue([]byte{i}))
}
return lr
}
func (lr *LogReader) EnableCheckCrc(enable bool) {
lr.checkCrc = enable
}
// Read the next record into *record. Returns true if read
// successfully, false if we hit end of the input. May use
// "*scratch" as temporary storage. The contents filled in *record
// will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch.
func (lr *LogReader) ReadRecord(record *[]byte) bool {
for {
rec, tp := lr.readPhysicalRecord()
if rec == nil {
return false
}
switch tp {
case fullType:
*record = rec // shallow copy
return true
case firstType:
*record = make([]byte, 0, blockSize) // reset record
*record = append(*record, rec...)
case middleType:
*record = append(*record, rec...)
case lastType:
*record = append(*record, rec...)
return true
default:
panic(fmt.Sprintf("unknown type %v", tp))
}
}
return false
}
func (lr *LogReader) Close() Status {
if err := lr.src.Close(); err != nil {
return NewStatus(IOError, err.Error())
}
return NewStatus(OK)
}
func (lr *LogReader) readPhysicalRecord() ([]byte, RecordType) {
if len(lr.record) < logHeaderSize {
if lr.eof {
return nil, eofType
}
if len(lr.record) > 0 {
// Last read was a full read, so this is a trailer to skip
log.Printf("skip trailer %v", len(lr.record))
}
lr.record = lr.backingStore[0:0]
if data, st := lr.src.Read(blockSize, lr.backingStore[:]); !st.IsOK() {
log.Println("eof...")
lr.eof = true
return nil, eofType
} else {
if len(data) < blockSize {
log.Println("eof... len(data) ", len(data))
lr.eof = true
}
lr.record = lr.backingStore[0:len(data)]
}
}
// read header
var dataLen int = int(binary.LittleEndian.Uint16(lr.record[4:6]))
if dataLen < 0 {
panic(fmt.Sprintf("Wrong dataLen %d", dataLen))
}
if len(lr.record) < dataLen+logHeaderSize {
// something abnormal, not enough data
if lr.eof {
// If the end of the file has been reached without reading |length| bytes
// of payload, assume the writer died in the middle of writing the record.
// Don't report a corruption.
log.Println("writer crash, skip this record")
return nil, eofType
}
return nil, badRecordType
}
tp := RecordType(lr.record[6])
if lr.checkCrc {
crc := binary.LittleEndian.Uint32(lr.record[:4])
expectCrc := levelDbCrc(lr.typeCrc[tp])
expectCrc = expectCrc.Extend(lr.record[logHeaderSize : logHeaderSize+dataLen]).Mask()
if crc != uint32(expectCrc) {
return nil, badRecordType
}
}
rec := lr.record[logHeaderSize : logHeaderSize+dataLen]
// skip rec
lr.record = lr.record[logHeaderSize+dataLen:]
return rec, tp
}