Skip to content

Commit

Permalink
Correct read time stats in ParquetPageSource
Browse files Browse the repository at this point in the history
Original codepath measures time spent on parquetReader.nextBatch(), in which only prepares reader for next batch, instead of reading data.
  • Loading branch information
qqibrow authored and martint committed Dec 16, 2018
1 parent 28ec8f4 commit 834b01c
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
Expand Up @@ -34,6 +34,7 @@ public class HdfsParquetDataSource
private final String name;
private final long size;
private final FSDataInputStream inputStream;
private long readTimeNanos;
private long readBytes;
private final FileFormatDataSourceStats stats;

Expand All @@ -51,6 +52,12 @@ public final long getReadBytes()
return readBytes;
}

@Override
public long getReadTimeNanos()
{
return readTimeNanos;
}

@Override
public final long getSize()
{
Expand All @@ -73,16 +80,20 @@ public final void readFully(long position, byte[] buffer)
@Override
public final void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength)
{
readInternal(position, buffer, bufferOffset, bufferLength);
readBytes += bufferLength;

long start = System.nanoTime();
readInternal(position, buffer, bufferOffset, bufferLength);
long currentReadTimeNanos = System.nanoTime() - start;

readTimeNanos += currentReadTimeNanos;
stats.readDataBytesPerSecond(bufferLength, currentReadTimeNanos);
}

private void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength)
{
try {
long readStart = System.nanoTime();
inputStream.readFully(position, buffer, bufferOffset, bufferLength);
stats.readDataBytesPerSecond(bufferLength, System.nanoTime() - readStart);
}
catch (PrestoException e) {
// just in case there is a Presto wrapper or hook
Expand Down
Expand Up @@ -64,7 +64,6 @@ public class ParquetPageSource

private int batchId;
private boolean closed;
private long readTimeNanos;
private final boolean useParquetColumnNames;

public ParquetPageSource(
Expand Down Expand Up @@ -125,7 +124,7 @@ public long getCompletedBytes()
@Override
public long getReadTimeNanos()
{
return readTimeNanos;
return parquetReader.getDataSource().getReadTimeNanos();
}

@Override
Expand All @@ -145,12 +144,8 @@ public Page getNextPage()
{
try {
batchId++;
long start = System.nanoTime();

int batchSize = parquetReader.nextBatch();

readTimeNanos += System.nanoTime() - start;

if (closed || batchSize <= 0) {
close();
return null;
Expand Down
Expand Up @@ -21,6 +21,8 @@ public interface ParquetDataSource
{
long getReadBytes();

long getReadTimeNanos();

long getSize();

void readFully(long position, byte[] buffer);
Expand Down

0 comments on commit 834b01c

Please sign in to comment.