From 1380637f37133f90e6c0fbe51bae83c2bc325cca Mon Sep 17 00:00:00 2001 From: Zhenxiao Luo Date: Mon, 25 Apr 2016 14:55:57 -0700 Subject: [PATCH] Use actual read bytes for ParquetDataSource --- .../hive/parquet/HdfsParquetDataSource.java | 8 ++++++++ .../presto/hive/parquet/ParquetDataSource.java | 2 ++ .../presto/hive/parquet/ParquetPageSource.java | 17 +++++------------ .../hive/parquet/ParquetPageSourceFactory.java | 2 +- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/HdfsParquetDataSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/HdfsParquetDataSource.java index 35c44ff3420e..681dc385e638 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/HdfsParquetDataSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/HdfsParquetDataSource.java @@ -33,6 +33,7 @@ public class HdfsParquetDataSource private final String name; private final long size; private final FSDataInputStream inputStream; + private long readBytes; public HdfsParquetDataSource(Path path, long size, FSDataInputStream inputStream) { @@ -41,6 +42,12 @@ public HdfsParquetDataSource(Path path, long size, FSDataInputStream inputStream this.inputStream = inputStream; } + @Override + public final long getReadBytes() + { + return readBytes; + } + @Override public final long getSize() { @@ -66,6 +73,7 @@ public final void readFully(long position, byte[] buffer, int bufferOffset, int throws IOException { readInternal(position, buffer, bufferOffset, bufferLength); + readBytes += bufferLength; } private void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetDataSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetDataSource.java index 6c1c0394b1a9..022950f31005 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetDataSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetDataSource.java @@ -19,6 +19,8 @@ public interface ParquetDataSource extends Closeable { + long getReadBytes(); + long getSize(); void readFully(long position, byte[] buffer) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java index de58e2040d47..c9b42ec7a672 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java @@ -34,7 +34,6 @@ import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; import io.airlift.units.DataSize; -import org.apache.hadoop.fs.Path; import org.joda.time.DateTimeZone; import parquet.column.ColumnDescriptor; import parquet.schema.MessageType; @@ -68,8 +67,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Maps.uniqueIndex; -import static java.lang.Math.max; -import static java.lang.Math.min; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; @@ -81,6 +78,7 @@ class ParquetPageSource private static final long GUESSED_MEMORY_USAGE = new DataSize(16, DataSize.Unit.MEGABYTE).toBytes(); private final ParquetReader parquetReader; + private final ParquetDataSource dataSource; private final MessageType requestedSchema; // for debugging heap dump private final List columnNames; @@ -90,16 +88,15 @@ class ParquetPageSource private final int[] hiveColumnIndexes; private final long totalBytes; - private long completedBytes; private int batchId; private boolean closed; private long readTimeNanos; public ParquetPageSource( ParquetReader parquetReader, + ParquetDataSource dataSource, MessageType fileSchema, MessageType requestedSchema, - Path path, long totalBytes, Properties splitSchema, List columns, @@ -109,7 +106,6 @@ public ParquetPageSource( TypeManager typeManager, boolean useParquetColumnNames) { - requireNonNull(path, "path is null"); checkArgument(totalBytes >= 0, "totalBytes is negative"); requireNonNull(splitSchema, "splitSchema is null"); requireNonNull(columns, "columns is null"); @@ -117,6 +113,7 @@ public ParquetPageSource( requireNonNull(effectivePredicate, "effectivePredicate is null"); this.parquetReader = parquetReader; + this.dataSource = dataSource; this.requestedSchema = requestedSchema; this.totalBytes = totalBytes; @@ -240,7 +237,7 @@ public long getTotalBytes() @Override public long getCompletedBytes() { - return completedBytes; + return dataSource.getReadBytes(); } @Override @@ -289,11 +286,7 @@ public Page getNextPage() blocks[fieldId] = new LazyBlock(batchSize, new ParquetBlockLoader(columnDescriptor, type)); } } - Page page = new Page(batchSize, blocks); - - long newCompletedBytes = (long) (totalBytes * parquetReader.getProgress()); - completedBytes = min(totalBytes, max(completedBytes, newCompletedBytes)); - return page; + return new Page(batchSize, blocks); } catch (PrestoException e) { closeWithSuppression(e); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java index 386b0e379b52..b35b4df2ff3a 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java @@ -184,9 +184,9 @@ public static ParquetPageSource createParquetPageSource( return new ParquetPageSource( parquetReader, + dataSource, fileSchema, requestedSchema, - path, length, schema, columns,