forked from apache/hbase
-
Notifications
You must be signed in to change notification settings - Fork 0
/
HFileBlock.java
2097 lines (1908 loc) · 86 KB
/
HFileBlock.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.EncodingState;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* Cacheable Blocks of an {@link HFile} version 2 file.
* Version 2 was introduced in hbase-0.92.0.
*
* <p>Version 1 was the original file block. Version 2 was introduced when we changed the hbase file
* format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support
* for Version 1 was removed in hbase-1.3.0.
*
* <h3>HFileBlock: Version 2</h3>
* In version 2, a block is structured as follows:
* <ul>
* <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is
* HFILEBLOCK_HEADER_SIZE
* <ul>
* <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes):
* e.g. <code>DATABLK*</code>
* <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header,
* but including tailing checksum bytes (4 bytes)
* <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding
* checksum bytes (4 bytes)
* <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is
* used to navigate to the previous block without having to go to the block index
* <li>4: For minorVersions >=1, the ordinal describing checksum type (1 byte)
* <li>5: For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes)
* <li>6: onDiskDataSizeWithHeader: For minorVersions >=1, the size of data 'on disk', including
* header, excluding checksums (4 bytes)
* </ul>
* </li>
* <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression
* algorithm is the same for all the blocks in an {@link HFile}. If compression is NONE, this is
* just raw, serialized Cells.
* <li><b>Tail:</b> For minorVersions >=1, a series of 4 byte checksums, one each for
* the number of bytes specified by bytesPerChecksum.
* </ul>
*
* <h3>Caching</h3>
* Caches cache whole blocks with trailing checksums if any. We then tag on some metadata, the
* content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase'
* checksums and then the offset into the file which is needed when we re-make a cache key
* when we return the block to the cache as 'done'.
* See {@link Cacheable#serialize(ByteBuffer, boolean)} and {@link Cacheable#getDeserializer()}.
*
* <p>TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where
* we make a block to cache-on-write, there is an attempt at turning off checksums. This is not the
* only place we get blocks to cache. We also will cache the raw return from an hdfs read. In this
* case, the checksums may be present. If the cache is backed by something that doesn't do ECC,
* say an SSD, we might want to preserve checksums. For now this is open question.
* <p>TODO: Over in BucketCache, we save a block allocation by doing a custom serialization.
* Be sure to change it if serialization changes in here. Could we add a method here that takes an
* IOEngine and that then serializes to it rather than expose our internals over in BucketCache?
* IOEngine is in the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh.
*/
@InterfaceAudience.Private
public class HFileBlock implements Cacheable {
private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class);
public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HFileBlock.class, false);
// Block Header fields.
// TODO: encapsulate Header related logic in this inner class.
static class Header {
// Format of header is:
// 8 bytes - block magic
// 4 bytes int - onDiskSizeWithoutHeader
// 4 bytes int - uncompressedSizeWithoutHeader
// 8 bytes long - prevBlockOffset
// The following 3 are only present if header contains checksum information
// 1 byte - checksum type
// 4 byte int - bytes per checksum
// 4 byte int - onDiskDataSizeWithHeader
static int BLOCK_MAGIC_INDEX = 0;
static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
static int PREV_BLOCK_OFFSET_INDEX = 16;
static int CHECKSUM_TYPE_INDEX = 24;
static int BYTES_PER_CHECKSUM_INDEX = 25;
static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
}
/** Type of block. Header field 0. */
private BlockType blockType;
/**
* Size on disk excluding header, including checksum. Header field 1.
* @see Writer#putHeader(byte[], int, int, int, int)
*/
private int onDiskSizeWithoutHeader;
/**
* Size of pure data. Does not include header or checksums. Header field 2.
* @see Writer#putHeader(byte[], int, int, int, int)
*/
private int uncompressedSizeWithoutHeader;
/**
* The offset of the previous block on disk. Header field 3.
* @see Writer#putHeader(byte[], int, int, int, int)
*/
private long prevBlockOffset;
/**
* Size on disk of header + data. Excludes checksum. Header field 6,
* OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
* @see Writer#putHeader(byte[], int, int, int, int)
*/
private int onDiskDataSizeWithHeader;
// End of Block Header fields.
/**
* The in-memory representation of the hfile block. Can be on or offheap. Can be backed by
* a single ByteBuffer or by many. Make no assumptions.
*
* <p>Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if
* not, be sure to reset position and limit else trouble down the road.
*
* <p>TODO: Make this read-only once made.
*
* <p>We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have
* a ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache.
* So, we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be
* good if could be confined to cache-use only but hard-to-do.
*/
private ByteBuff buf;
/** Meta data that holds meta information on the hfileblock.
*/
private HFileContext fileContext;
/**
* The offset of this block in the file. Populated by the reader for
* convenience of access. This offset is not part of the block header.
*/
private long offset = UNSET;
/**
* The on-disk size of the next block, including the header and checksums if present.
* UNSET if unknown.
*
* Blocks try to carry the size of the next block to read in this data member. Usually
* we get block sizes from the hfile index but sometimes the index is not available:
* e.g. when we read the indexes themselves (indexes are stored in blocks, we do not
* have an index for the indexes). Saves seeks especially around file open when
* there is a flurry of reading in hfile metadata.
*/
private int nextBlockOnDiskSize = UNSET;
private ByteBuffAllocator allocator;
/**
* On a checksum failure, do these many succeeding read requests using hdfs checksums before
* auto-reenabling hbase checksum verification.
*/
static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
private static int UNSET = -1;
public static final boolean FILL_HEADER = true;
public static final boolean DONT_FILL_HEADER = false;
// How to get the estimate correctly? if it is a singleBB?
public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
(int)ClassSize.estimateBase(MultiByteBuff.class, false);
/**
* Space for metadata on a block that gets stored along with the block when we cache it.
* There are a few bytes stuck on the end of the HFileBlock that we pull in from HDFS.
* 8 bytes are for the offset of this block (long) in the file. Offset is important because is is
* used when we remake the CacheKey when we return block to the cache when done. There is also
* a flag on whether checksumming is being done by hbase or not. See class comment for note on
* uncertain state of checksumming of blocks that come out of cache (should we or should we not?).
* Finally there are 4 bytes to hold the length of the next block which can save a seek on
* occasion if available.
* (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was
* formerly known as EXTRA_SERIALIZATION_SPACE).
*/
static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
/**
* Each checksum value is an integer that can be stored in 4 bytes.
*/
static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
static final byte[] DUMMY_HEADER_NO_CHECKSUM =
new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
/**
* Used deserializing blocks from Cache.
*
* <code>
* ++++++++++++++
* + HFileBlock +
* ++++++++++++++
* + Checksums + <= Optional
* ++++++++++++++
* + Metadata! + <= See note on BLOCK_METADATA_SPACE above.
* ++++++++++++++
* </code>
* @see #serialize(ByteBuffer, boolean)
*/
public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer();
public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> {
private BlockDeserializer() {
}
@Override
public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc)
throws IOException {
// The buf has the file block followed by block metadata.
// Set limit to just before the BLOCK_METADATA_SPACE then rewind.
buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
// Get a new buffer to pass the HFileBlock for it to 'own'.
ByteBuff newByteBuff = buf.slice();
// Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
buf.position(buf.limit());
buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
boolean usesChecksum = buf.get() == (byte) 1;
long offset = buf.getLong();
int nextBlockOnDiskSize = buf.getInt();
return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
}
@Override
public int getDeserializerIdentifier() {
return DESERIALIZER_IDENTIFIER;
}
}
private static final int DESERIALIZER_IDENTIFIER;
static {
DESERIALIZER_IDENTIFIER =
CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
}
/**
* Creates a new {@link HFile} block from the given fields. This constructor
* is used only while writing blocks and caching,
* and is sitting in a byte buffer and we want to stuff the block into cache.
*
* <p>TODO: The caller presumes no checksumming
* <p>TODO: HFile block writer can also off-heap ? </p>
* required of this block instance since going into cache; checksum already verified on
* underlying block data pulled in from filesystem. Is that correct? What if cache is SSD?
*
* @param blockType the type of this block, see {@link BlockType}
* @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
* @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
* @param prevBlockOffset see {@link #prevBlockOffset}
* @param buf block buffer with header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
* @param fillHeader when true, write the first 4 header fields into passed buffer.
* @param offset the file offset the block was read from
* @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
* @param fileContext HFile meta data
*/
@VisibleForTesting
public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
ByteBuffAllocator allocator) {
this.blockType = blockType;
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
this.prevBlockOffset = prevBlockOffset;
this.offset = offset;
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
this.fileContext = fileContext;
this.allocator = allocator;
this.buf = buf;
if (fillHeader) {
overwriteHeader();
}
this.buf.rewind();
}
/**
* Creates a block from an existing buffer starting with a header. Rewinds
* and takes ownership of the buffer. By definition of rewind, ignores the
* buffer position, but if you slice the buffer beforehand, it will rewind
* to that point.
* @param buf Has header, content, and trailing checksums if present.
*/
static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
throws IOException {
buf.rewind();
final BlockType blockType = BlockType.read(buf);
final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
final int uncompressedSizeWithoutHeader =
buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
// This constructor is called when we deserialize a block from cache and when we read a block in
// from the fs. fileCache is null when deserialized from cache so need to make up one.
HFileContextBuilder fileContextBuilder = fileContext != null ?
new HFileContextBuilder(fileContext) : new HFileContextBuilder();
fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
int onDiskDataSizeWithHeader;
if (usesHBaseChecksum) {
byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
// Use the checksum type and bytes per checksum from header, not from fileContext.
fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
} else {
fileContextBuilder.withChecksumType(ChecksumType.NULL);
fileContextBuilder.withBytesPerCheckSum(0);
// Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data
onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum);
}
fileContext = fileContextBuilder.build();
assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
return new HFileBlockBuilder()
.withBlockType(blockType)
.withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
.withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
.withPrevBlockOffset(prevBlockOffset)
.withOffset(offset)
.withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
.withNextBlockOnDiskSize(nextBlockOnDiskSize)
.withHFileContext(fileContext)
.withByteBuffAllocator(allocator)
.withByteBuff(buf.rewind())
.withShared(!buf.hasArray())
.build();
}
/**
* Parse total on disk size including header and checksum.
* @param headerBuf Header ByteBuffer. Presumed exact size of header.
* @param verifyChecksum true if checksum verification is in use.
* @return Size of the block with header included.
*/
private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf,
boolean verifyChecksum) {
return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum);
}
/**
* @return the on-disk size of the next block (including the header size and any checksums if
* present) read by peeking into the next block's header; use as a hint when doing
* a read of the next block when scanning or running over a file.
*/
int getNextBlockOnDiskSize() {
return nextBlockOnDiskSize;
}
@Override
public BlockType getBlockType() {
return blockType;
}
@Override
public int refCnt() {
return buf.refCnt();
}
@Override
public HFileBlock retain() {
buf.retain();
return this;
}
/**
* Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will
* return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator}
*/
@Override
public boolean release() {
return buf.release();
}
/** @return get data block encoding id that was used to encode this block */
short getDataBlockEncodingId() {
if (blockType != BlockType.ENCODED_DATA) {
throw new IllegalArgumentException("Querying encoder ID of a block " +
"of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
}
return buf.getShort(headerSize());
}
/**
* @return the on-disk size of header + data part + checksum.
*/
public int getOnDiskSizeWithHeader() {
return onDiskSizeWithoutHeader + headerSize();
}
/**
* @return the on-disk size of the data part + checksum (header excluded).
*/
int getOnDiskSizeWithoutHeader() {
return onDiskSizeWithoutHeader;
}
/**
* @return the uncompressed size of data part (header and checksum excluded).
*/
int getUncompressedSizeWithoutHeader() {
return uncompressedSizeWithoutHeader;
}
/**
* @return the offset of the previous block of the same type in the file, or
* -1 if unknown
*/
long getPrevBlockOffset() {
return prevBlockOffset;
}
/**
* Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
* is modified as side-effect.
*/
private void overwriteHeader() {
buf.rewind();
blockType.write(buf);
buf.putInt(onDiskSizeWithoutHeader);
buf.putInt(uncompressedSizeWithoutHeader);
buf.putLong(prevBlockOffset);
if (this.fileContext.isUseHBaseChecksum()) {
buf.put(fileContext.getChecksumType().getCode());
buf.putInt(fileContext.getBytesPerChecksum());
buf.putInt(onDiskDataSizeWithHeader);
}
}
/**
* Returns a buffer that does not include the header and checksum.
* @return the buffer with header skipped and checksum omitted.
*/
public ByteBuff getBufferWithoutHeader() {
return this.getBufferWithoutHeader(false);
}
/**
* Returns a buffer that does not include the header or checksum.
* @param withChecksum to indicate whether include the checksum or not.
* @return the buffer with header skipped and checksum omitted.
*/
public ByteBuff getBufferWithoutHeader(boolean withChecksum) {
ByteBuff dup = getBufferReadOnly();
int delta = withChecksum ? 0 : totalChecksumBytes();
return dup.position(headerSize()).limit(buf.limit() - delta).slice();
}
/**
* Returns a read-only duplicate of the buffer this block stores internally ready to be read.
* Clients must not modify the buffer object though they may set position and limit on the
* returned buffer since we pass back a duplicate. This method has to be public because it is used
* in {@link CompoundBloomFilter} to avoid object creation on every Bloom
* filter lookup, but has to be used with caution. Buffer holds header, block content,
* and any follow-on checksums if present.
*
* @return the buffer of this block for read-only operations
*/
public ByteBuff getBufferReadOnly() {
// TODO: ByteBuf does not support asReadOnlyBuffer(). Fix.
ByteBuff dup = this.buf.duplicate();
assert dup.position() == 0;
return dup;
}
public ByteBuffAllocator getByteBuffAllocator() {
return this.allocator;
}
@VisibleForTesting
private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
String fieldName) throws IOException {
if (valueFromBuf != valueFromField) {
throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
+ ") is different from that in the field (" + valueFromField + ")");
}
}
@VisibleForTesting
private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
throws IOException {
if (valueFromBuf != valueFromField) {
throw new IOException("Block type stored in the buffer: " +
valueFromBuf + ", block type field: " + valueFromField);
}
}
/**
* Checks if the block is internally consistent, i.e. the first
* {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
* valid header consistent with the fields. Assumes a packed block structure.
* This function is primary for testing and debugging, and is not
* thread-safe, because it alters the internal buffer pointer.
* Used by tests only.
*/
@VisibleForTesting
void sanityCheck() throws IOException {
// Duplicate so no side-effects
ByteBuff dup = this.buf.duplicate().rewind();
sanityCheckAssertion(BlockType.read(dup), blockType);
sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader");
sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader,
"uncompressedSizeWithoutHeader");
sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset");
if (this.fileContext.isUseHBaseChecksum()) {
sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(),
"bytesPerChecksum");
sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
}
int cksumBytes = totalChecksumBytes();
int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
if (dup.limit() != expectedBufLimit) {
throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit());
}
// We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
// block's header, so there are two sensible values for buffer capacity.
int hdrSize = headerSize();
dup.rewind();
if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) {
throw new AssertionError("Invalid buffer capacity: " + dup.remaining() +
", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder()
.append("[")
.append("blockType=").append(blockType)
.append(", fileOffset=").append(offset)
.append(", headerSize=").append(headerSize())
.append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
.append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
.append(", prevBlockOffset=").append(prevBlockOffset)
.append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum());
if (fileContext.isUseHBaseChecksum()) {
sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
.append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1))
.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
} else {
sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
.append("(").append(onDiskSizeWithoutHeader)
.append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
}
String dataBegin;
if (buf.hasArray()) {
dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
} else {
ByteBuff bufWithoutHeader = getBufferWithoutHeader();
byte[] dataBeginBytes = new byte[Math.min(32,
bufWithoutHeader.limit() - bufWithoutHeader.position())];
bufWithoutHeader.get(dataBeginBytes);
dataBegin = Bytes.toStringBinary(dataBeginBytes);
}
sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader())
.append(", totalChecksumBytes=").append(totalChecksumBytes())
.append(", isUnpacked=").append(isUnpacked())
.append(", buf=[").append(buf).append("]")
.append(", dataBeginsWith=").append(dataBegin)
.append(", fileContext=").append(fileContext)
.append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize)
.append("]");
return sb.toString();
}
/**
* Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
* encoded structure. Internal structures are shared between instances where applicable.
*/
HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
if (!fileContext.isCompressedOrEncrypted()) {
// TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
// which is used for block serialization to L2 cache, does not preserve encoding and
// encryption details.
return this;
}
HFileBlock unpacked = shallowClone(this);
unpacked.allocateBuffer(); // allocates space for the decompressed block
boolean succ = false;
try {
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
// Create a duplicated buffer without the header part.
ByteBuff dup = this.buf.duplicate();
dup.position(this.headerSize());
dup = dup.slice();
// Decode the dup into unpacked#buf
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
succ = true;
return unpacked;
} finally {
if (!succ) {
unpacked.release();
}
}
}
/**
* Always allocates a new buffer of the correct size. Copies header bytes
* from the existing buffer. Does not change header fields.
* Reserve room to keep checksum bytes too.
*/
private void allocateBuffer() {
int cksumBytes = totalChecksumBytes();
int headerSize = headerSize();
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
ByteBuff newBuf = allocator.allocate(capacityNeeded);
// Copy header bytes into newBuf.
buf.position(0);
newBuf.put(0, buf, 0, headerSize);
buf = newBuf;
// set limit to exclude next block's header
buf.limit(capacityNeeded);
}
/**
* Return true when this block's buffer has been unpacked, false otherwise. Note this is a
* calculated heuristic, not tracked attribute of the block.
*/
public boolean isUnpacked() {
final int cksumBytes = totalChecksumBytes();
final int headerSize = headerSize();
final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
final int bufCapacity = buf.remaining();
return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
}
/**
* Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey}
* when block is returned to the cache.
* @return the offset of this block in the file it was read from
*/
long getOffset() {
if (offset < 0) {
throw new IllegalStateException("HFile block offset not initialized properly");
}
return offset;
}
/**
* @return a byte stream reading the data + checksum of this block
*/
DataInputStream getByteStream() {
ByteBuff dup = this.buf.duplicate();
dup.position(this.headerSize());
return new DataInputStream(new ByteBuffInputStream(dup));
}
@Override
public long heapSize() {
long size = FIXED_OVERHEAD;
size += fileContext.heapSize();
if (buf != null) {
// Deep overhead of the byte buffer. Needs to be aligned separately.
size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
}
return ClassSize.align(size);
}
/**
* Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
* by default.
*/
public boolean isSharedMem() {
if (this instanceof SharedMemHFileBlock) {
return true;
} else if (this instanceof ExclusiveMemHFileBlock) {
return false;
}
return true;
}
/**
* Unified version 2 {@link HFile} block writer. The intended usage pattern
* is as follows:
* <ol>
* <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
* <li>Call {@link Writer#startWriting} and get a data stream to write to.
* <li>Write your data into the stream.
* <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to.
* store the serialized block into an external stream.
* <li>Repeat to write more blocks.
* </ol>
* <p>
*/
static class Writer implements ShipperListener {
private enum State {
INIT,
WRITING,
BLOCK_READY
}
/** Writer state. Used to ensure the correct usage protocol. */
private State state = State.INIT;
/** Data block encoder used for data blocks */
private final HFileDataBlockEncoder dataBlockEncoder;
private HFileBlockEncodingContext dataBlockEncodingCtx;
/** block encoding context for non-data blocks*/
private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
/**
* The stream we use to accumulate data into a block in an uncompressed format.
* We reset this stream at the end of each block and reuse it. The
* header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
* stream.
*/
private ByteArrayOutputStream baosInMemory;
/**
* Current block type. Set in {@link #startWriting(BlockType)}. Could be
* changed in {@link #finishBlock()} from {@link BlockType#DATA}
* to {@link BlockType#ENCODED_DATA}.
*/
private BlockType blockType;
/**
* A stream that we write uncompressed bytes to, which compresses them and
* writes them to {@link #baosInMemory}.
*/
private DataOutputStream userDataStream;
/**
* Bytes to be written to the file system, including the header. Compressed
* if compression is turned on. It also includes the checksum data that
* immediately follows the block data. (header + data + checksums)
*/
private ByteArrayOutputStream onDiskBlockBytesWithHeader;
/**
* The size of the checksum data on disk. It is used only if data is
* not compressed. If data is compressed, then the checksums are already
* part of onDiskBytesWithHeader. If data is uncompressed, then this
* variable stores the checksum data for this block.
*/
private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
/**
* Current block's start offset in the {@link HFile}. Set in
* {@link #writeHeaderAndData(FSDataOutputStream)}.
*/
private long startOffset;
/**
* Offset of previous block by block type. Updated when the next block is
* started.
*/
private long[] prevOffsetByType;
/** The offset of the previous block of the same type */
private long prevOffset;
/** Meta data that holds information about the hfileblock**/
private HFileContext fileContext;
private final ByteBuffAllocator allocator;
@Override
public void beforeShipped() {
if (getEncodingState() != null) {
getEncodingState().beforeShipped();
}
}
EncodingState getEncodingState() {
return dataBlockEncodingCtx.getEncodingState();
}
/**
* @param dataBlockEncoder data block encoding algorithm to use
*/
@VisibleForTesting
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
this(dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
}
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext,
ByteBuffAllocator allocator) {
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
" Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
fileContext.getBytesPerChecksum());
}
this.allocator = allocator;
this.dataBlockEncoder = dataBlockEncoder != null?
dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
this.dataBlockEncodingCtx = this.dataBlockEncoder.
newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
// TODO: This should be lazily instantiated since we usually do NOT need this default encoder
this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
// TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
baosInMemory = new ByteArrayOutputStream();
prevOffsetByType = new long[BlockType.values().length];
for (int i = 0; i < prevOffsetByType.length; ++i) {
prevOffsetByType[i] = UNSET;
}
// TODO: Why fileContext saved away when we have dataBlockEncoder and/or
// defaultDataBlockEncoder?
this.fileContext = fileContext;
}
/**
* Starts writing into the block. The previous block's data is discarded.
*
* @return the stream the user can write their data into
*/
DataOutputStream startWriting(BlockType newBlockType)
throws IOException {
if (state == State.BLOCK_READY && startOffset != -1) {
// We had a previous block that was written to a stream at a specific
// offset. Save that offset as the last offset of a block of that type.
prevOffsetByType[blockType.getId()] = startOffset;
}
startOffset = -1;
blockType = newBlockType;
baosInMemory.reset();
baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
state = State.WRITING;
// We will compress it later in finishBlock()
userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory);
if (newBlockType == BlockType.DATA) {
this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
}
return userDataStream;
}
/**
* Writes the Cell to this block
*/
void write(Cell cell) throws IOException{
expectState(State.WRITING);
this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
}
/**
* Transitions the block writer from the "writing" state to the "block
* ready" state. Does nothing if a block is already finished.
*/
void ensureBlockReady() throws IOException {
Preconditions.checkState(state != State.INIT,
"Unexpected state: " + state);
if (state == State.BLOCK_READY) {
return;
}
// This will set state to BLOCK_READY.
finishBlock();
}
/**
* Finish up writing of the block.
* Flushes the compressing stream (if using compression), fills out the header,
* does any compression/encryption of bytes to flush out to disk, and manages
* the cache on write content, if applicable. Sets block write state to "block ready".
*/
private void finishBlock() throws IOException {
if (blockType == BlockType.DATA) {
this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
baosInMemory.getBuffer(), blockType);
blockType = dataBlockEncodingCtx.getBlockType();
}
userDataStream.flush();
prevOffset = prevOffsetByType[blockType.getId()];
// We need to set state before we can package the block up for cache-on-write. In a way, the
// block is ready, but not yet encoded or compressed.
state = State.BLOCK_READY;
Bytes compressAndEncryptDat;
if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
compressAndEncryptDat = dataBlockEncodingCtx.
compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
} else {
compressAndEncryptDat = defaultBlockEncodingCtx.
compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
}
if (compressAndEncryptDat == null) {
compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size());
}
if (onDiskBlockBytesWithHeader == null) {
onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength());
}
onDiskBlockBytesWithHeader.reset();
onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
// Calculate how many bytes we need for checksum on the tail of the block.
int numBytes = (int) ChecksumUtil.numBytes(
onDiskBlockBytesWithHeader.size(),
fileContext.getBytesPerChecksum());
// Put the header for the on disk bytes; header currently is unfilled-out
putHeader(onDiskBlockBytesWithHeader,
onDiskBlockBytesWithHeader.size() + numBytes,
baosInMemory.size(), onDiskBlockBytesWithHeader.size());
if (onDiskChecksum.length != numBytes) {
onDiskChecksum = new byte[numBytes];
}
ChecksumUtil.generateChecksums(
onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(),
onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
}
/**
* Put the header into the given byte array at the given offset.
* @param onDiskSize size of the block on disk header + data + checksum
* @param uncompressedSize size of the block after decompression (but
* before optional data block decoding) including header
* @param onDiskDataSize size of the block on disk with header
* and data but not including the checksums
*/
private void putHeader(byte[] dest, int offset, int onDiskSize,
int uncompressedSize, int onDiskDataSize) {
offset = blockType.put(dest, offset);
offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
offset = Bytes.putLong(dest, offset, prevOffset);
offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
Bytes.putInt(dest, offset, onDiskDataSize);
}
private void putHeader(ByteBuff buff, int onDiskSize,
int uncompressedSize, int onDiskDataSize) {
buff.rewind();
blockType.write(buff);