/
CompressedMothChunkLoader.go
160 lines (140 loc) · 5.82 KB
/
CompressedMothChunkLoader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package store
import (
"fmt"
"github.com/mothdb-bd/orc-go/pkg/maths"
"github.com/mothdb-bd/orc-go/pkg/memory"
"github.com/mothdb-bd/orc-go/pkg/slice"
"github.com/mothdb-bd/orc-go/pkg/store/common"
"github.com/mothdb-bd/orc-go/pkg/util"
)
type CompressedMothChunkLoader struct {
// 继承
MothChunkLoader
dataReader MothDataReader
dataReaderMemoryUsage memory.LocalMemoryContext
decompressor MothDecompressor
decompressionBufferMemoryUsage memory.LocalMemoryContext
compressedBufferStream slice.FixedLengthSliceInput
compressedBufferStart int32
nextUncompressedOffset int32
lastCheckpoint int64
decompressorOutputBuffer []byte
}
func NewCompressedMothChunkLoader(dataReader MothDataReader, decompressor MothDecompressor, memoryContext memory.AggregatedMemoryContext) *CompressedMothChunkLoader {
cr := new(CompressedMothChunkLoader)
cr.compressedBufferStream = slice.EMPTY_SLICE.GetInput()
cr.dataReader = dataReader
cr.decompressor = decompressor
cr.dataReaderMemoryUsage = memoryContext.NewLocalMemoryContext("CompressedMothChunkLoader")
cr.dataReaderMemoryUsage.SetBytes(dataReader.GetRetainedSize())
cr.decompressionBufferMemoryUsage = memoryContext.NewLocalMemoryContext("CompressedMothChunkLoader")
return cr
}
// @Override
func (cr *CompressedMothChunkLoader) GetMothDataSourceId() *common.MothDataSourceId {
return cr.dataReader.GetMothDataSourceId()
}
func (cr *CompressedMothChunkLoader) getCurrentCompressedOffset() int32 {
return util.Int32Exact(int64(cr.compressedBufferStart) + cr.compressedBufferStream.Position())
}
// @Override
func (cr *CompressedMothChunkLoader) HasNextChunk() bool {
return cr.getCurrentCompressedOffset() < cr.dataReader.GetSize()
}
// @Override
func (cr *CompressedMothChunkLoader) GetLastCheckpoint() int64 {
return cr.lastCheckpoint
}
// @Override
func (cr *CompressedMothChunkLoader) SeekToCheckpoint(checkpoint int64) {
compressedOffset := DecodeCompressedBlockOffset(checkpoint)
if compressedOffset >= cr.dataReader.GetSize() {
panic(fmt.Sprintf("%s Seek past end of stream", cr.dataReader.GetMothDataSourceId()))
}
if cr.compressedBufferStart <= compressedOffset && compressedOffset < cr.compressedBufferStart+int32(cr.compressedBufferStream.Length()) {
cr.compressedBufferStream.SetPosition(int64(compressedOffset - cr.compressedBufferStart))
} else {
cr.compressedBufferStart = compressedOffset
cr.compressedBufferStream = slice.EMPTY_SLICE.GetInput()
}
cr.nextUncompressedOffset = DecodeDecompressedOffset(checkpoint)
cr.lastCheckpoint = checkpoint
}
// @Override
func (cr *CompressedMothChunkLoader) NextChunk() *slice.Slice {
cr.ensureCompressedBytesAvailable(3)
cr.lastCheckpoint = CreateInputStreamCheckpoint2(cr.getCurrentCompressedOffset(), cr.nextUncompressedOffset)
b0 := cr.compressedBufferStream.ReadUnsignedByte()
b1 := cr.compressedBufferStream.ReadUnsignedByte()
b2 := cr.compressedBufferStream.ReadUnsignedByte()
isUncompressed := (b0 & 0x01) == 1
chunkLength := int32(b2)<<15 | int32(b1)<<7 | (maths.UnsignedRightShiftInt32(int32(b0), 1))
cr.ensureCompressedBytesAvailable(chunkLength)
chunk := cr.compressedBufferStream.ReadSlice(chunkLength)
if !isUncompressed {
// chunk.byteArrayOffset()
uncompressedSize := cr.decompressor.Decompress(chunk.AvailableBytes(), 0, chunk.Length(), cr.createOutputBuffer())
chunk = slice.NewWithBuf(cr.decompressorOutputBuffer[0:uncompressedSize])
}
if cr.nextUncompressedOffset != 0 {
chunk, _ = chunk.MakeSlice(int(cr.nextUncompressedOffset), int(chunk.Length()-cr.nextUncompressedOffset))
cr.nextUncompressedOffset = 0
if chunk.Length() == 0 {
chunk = cr.NextChunk()
}
}
return chunk
}
func (cr *CompressedMothChunkLoader) ensureCompressedBytesAvailable(size int32) {
if int64(size) <= cr.compressedBufferStream.Remaining() {
return
}
if size > cr.dataReader.GetMaxBufferSize() {
panic(fmt.Sprintf("Requested read size (%d bytes) is greater than max buffer size (%d bytes", size, cr.dataReader.GetMaxBufferSize()))
}
if cr.compressedBufferStart+int32(cr.compressedBufferStream.Position())+size > cr.dataReader.GetSize() {
panic("Read past end of stream")
}
cr.compressedBufferStart = cr.compressedBufferStart + util.Int32Exact(cr.compressedBufferStream.Position())
compressedBuffer := cr.dataReader.SeekBuffer(cr.compressedBufferStart)
cr.dataReaderMemoryUsage.SetBytes(cr.dataReader.GetRetainedSize())
if compressedBuffer.Length() < size {
panic(fmt.Sprintf("Requested read of %d bytes but only %d were bytes", size, compressedBuffer.SizeInt32()))
}
cr.compressedBufferStream = compressedBuffer.GetInput()
}
func (cr *CompressedMothChunkLoader) createOutputBuffer() OutputBuffer {
return NewOutputBuffer(cr)
}
// @Override
func (cr *CompressedMothChunkLoader) ToString() string {
return util.NewSB().AddString("loader", cr.dataReader.String()).AddInt32("compressedOffset", cr.getCurrentCompressedOffset()).AddString("decompressor", cr.decompressor.String()).String()
}
type OutputBufferImpl struct {
// 继承
OutputBuffer
cr *CompressedMothChunkLoader
}
func NewOutputBuffer(cr *CompressedMothChunkLoader) OutputBuffer {
br := new(OutputBufferImpl)
br.cr = cr
return br
}
// @Override
func (ol *OutputBufferImpl) Initialize(size int32) []byte {
if ol.cr.decompressorOutputBuffer == nil || size > util.Lens(ol.cr.decompressorOutputBuffer) {
ol.cr.decompressorOutputBuffer = make([]byte, size)
ol.cr.decompressionBufferMemoryUsage.SetBytes(int64(size))
}
return ol.cr.decompressorOutputBuffer
}
// @Override
func (ol *OutputBufferImpl) Grow(size int32) []byte {
if size > util.Lens(ol.cr.decompressorOutputBuffer) {
newB := make([]byte, size)
copy(newB, ol.cr.decompressorOutputBuffer)
ol.cr.decompressorOutputBuffer = newB
ol.cr.decompressionBufferMemoryUsage.SetBytes(int64(size))
}
return ol.cr.decompressorOutputBuffer
}