Skip to content

Commit

Permalink
[apache#596][FOLLOWUP] Index data support offheap read
Browse files Browse the repository at this point in the history
  • Loading branch information
jerqi committed May 8, 2023
1 parent 6bd8d13 commit 6357775
Showing 1 changed file with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,22 @@ public HdfsShuffleReadHandler(
protected ShuffleIndexResult readShuffleIndex() {
long start = System.currentTimeMillis();
try {
byte[] indexData = indexReader.read();
int segmentNumber = indexData.length / FileBasedShuffleSegment.SEGMENT_SIZE;
ByteBuffer indexData = null;
if (offHeapEnabled) {
indexData = indexReader.readAsByteBuffer();
} else {
indexData = ByteBuffer.wrap(indexReader.read());
}
int indexDataLength = indexData.limit() - indexData.position();
int segmentNumber = indexDataLength / FileBasedShuffleSegment.SEGMENT_SIZE;
int expectedLen = segmentNumber * FileBasedShuffleSegment.SEGMENT_SIZE;
if (indexData.length != expectedLen) {
if (indexDataLength != expectedLen) {
LOG.warn("Maybe the index file: {} is being written due to the shuffle-buffer flushing.", filePrefix);
byte[] indexNewData = new byte[expectedLen];
System.arraycopy(indexData, 0, indexNewData, 0, expectedLen);
indexData = indexNewData;
indexData.limit(expectedLen);
}
long dateFileLen = getDataFileLen();
LOG.info("Read index files {}.index for {} ms", filePrefix, System.currentTimeMillis() - start);
return new ShuffleIndexResult(ByteBuffer.wrap(indexData), dateFileLen);
return new ShuffleIndexResult(indexData, dateFileLen);
} catch (Exception e) {
LOG.info("Fail to read index files {}.index", filePrefix, e);
}
Expand Down

0 comments on commit 6357775

Please sign in to comment.