/
block_parser.go
145 lines (112 loc) · 2.86 KB
/
block_parser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package chain_block
import (
"encoding/binary"
"github.com/pkg/errors"
)
const (
BlockTypeUnknown = byte(0)
BlockTypeAccountBlock = byte(1)
BlockTypeSnapshotBlock = byte(2)
)
var ClosedErr = errors.New("blockFileParser is closed")
type byteBuffer struct {
BlockType byte
Buffer []byte
Size int64
}
type blockFileParser struct {
blockSize int64
blockSizeBuffer []byte
blockSizeBufferPointer int
blockType byte
blockBufferPointer int64
blockBuffer []byte
bytesBuffer chan *byteBuffer
closed bool
err error
}
func newBlockFileParser() *blockFileParser {
bp := &blockFileParser{
blockSizeBuffer: make([]byte, 4),
bytesBuffer: make(chan *byteBuffer, 1000),
closed: false,
}
return bp
}
func (bfp *blockFileParser) Close() error {
if bfp.closed {
return ClosedErr
}
bfp.closed = true
close(bfp.bytesBuffer)
return nil
}
func (bfp *blockFileParser) WriteError(err error) {
bfp.err = err
if !bfp.closed {
bfp.Close()
}
}
func (bfp *blockFileParser) Write(buf []byte) error {
if bfp.closed {
return ClosedErr
}
readPointer := 0
bufLen := len(buf)
for readPointer < bufLen {
restLen := bufLen - readPointer
if bfp.blockSizeBufferPointer < 4 {
readNumbers := 4 - bfp.blockSizeBufferPointer
if readNumbers > restLen {
readNumbers = restLen
}
nextPointer := readPointer + readNumbers
copy(bfp.blockSizeBuffer[bfp.blockSizeBufferPointer:], buf[readPointer:nextPointer])
readPointer = nextPointer
bfp.blockSizeBufferPointer += readNumbers
if bfp.blockSizeBufferPointer >= 4 {
bfp.blockSize = int64(binary.BigEndian.Uint32(bfp.blockSizeBuffer) - 1)
}
} else if bfp.blockType == BlockTypeUnknown {
bfp.blockType = buf[readPointer]
readPointer += 1
} else {
readNumbers := int(bfp.blockSize - bfp.blockBufferPointer)
if readNumbers > restLen {
if len(bfp.blockBuffer) <= 0 {
bfp.blockBuffer = make([]byte, 0, bfp.blockSize)
}
bfp.blockBuffer = append(bfp.blockBuffer, buf[readPointer:]...)
readPointer = bufLen
bfp.blockBufferPointer += int64(restLen)
} else {
nextPointer := readPointer + readNumbers
if len(bfp.blockBuffer) <= 0 {
bfp.bytesBuffer <- &byteBuffer{
BlockType: bfp.blockType,
Buffer: buf[readPointer:nextPointer],
Size: bfp.blockSize + 5,
}
} else {
bfp.bytesBuffer <- &byteBuffer{
BlockType: bfp.blockType,
Buffer: append(bfp.blockBuffer, buf[readPointer:nextPointer]...),
Size: bfp.blockSize + 5,
}
}
readPointer = nextPointer
bfp.blockSizeBufferPointer = 0
bfp.blockType = BlockTypeUnknown
bfp.blockBufferPointer = 0
bfp.blockBuffer = nil
}
}
}
return nil
}
func (bfp *blockFileParser) Iterator() <-chan *byteBuffer {
return bfp.bytesBuffer
}
func (bfp *blockFileParser) Error() error {
return bfp.err
}