Skip to content

Commit

Permalink
Use actual read bytes for ParquetDataSource
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenxiao authored and martint committed Apr 27, 2016
1 parent e7e473b commit 1380637
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 13 deletions.
Expand Up @@ -33,6 +33,7 @@ public class HdfsParquetDataSource
private final String name;
private final long size;
private final FSDataInputStream inputStream;
private long readBytes;

public HdfsParquetDataSource(Path path, long size, FSDataInputStream inputStream)
{
Expand All @@ -41,6 +42,12 @@ public HdfsParquetDataSource(Path path, long size, FSDataInputStream inputStream
this.inputStream = inputStream;
}

@Override
public final long getReadBytes()
{
return readBytes;
}

@Override
public final long getSize()
{
Expand All @@ -66,6 +73,7 @@ public final void readFully(long position, byte[] buffer, int bufferOffset, int
throws IOException
{
readInternal(position, buffer, bufferOffset, bufferLength);
readBytes += bufferLength;
}

private void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength)
Expand Down
Expand Up @@ -19,6 +19,8 @@
public interface ParquetDataSource
extends Closeable
{
long getReadBytes();

long getSize();

void readFully(long position, byte[] buffer)
Expand Down
Expand Up @@ -34,7 +34,6 @@
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTimeZone;
import parquet.column.ColumnDescriptor;
import parquet.schema.MessageType;
Expand Down Expand Up @@ -68,8 +67,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Maps.uniqueIndex;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
Expand All @@ -81,6 +78,7 @@ class ParquetPageSource
private static final long GUESSED_MEMORY_USAGE = new DataSize(16, DataSize.Unit.MEGABYTE).toBytes();

private final ParquetReader parquetReader;
private final ParquetDataSource dataSource;
private final MessageType requestedSchema;
// for debugging heap dump
private final List<String> columnNames;
Expand All @@ -90,16 +88,15 @@ class ParquetPageSource
private final int[] hiveColumnIndexes;

private final long totalBytes;
private long completedBytes;
private int batchId;
private boolean closed;
private long readTimeNanos;

public ParquetPageSource(
ParquetReader parquetReader,
ParquetDataSource dataSource,
MessageType fileSchema,
MessageType requestedSchema,
Path path,
long totalBytes,
Properties splitSchema,
List<HiveColumnHandle> columns,
Expand All @@ -109,14 +106,14 @@ public ParquetPageSource(
TypeManager typeManager,
boolean useParquetColumnNames)
{
requireNonNull(path, "path is null");
checkArgument(totalBytes >= 0, "totalBytes is negative");
requireNonNull(splitSchema, "splitSchema is null");
requireNonNull(columns, "columns is null");
requireNonNull(partitionKeys, "partitionKeys is null");
requireNonNull(effectivePredicate, "effectivePredicate is null");

this.parquetReader = parquetReader;
this.dataSource = dataSource;
this.requestedSchema = requestedSchema;
this.totalBytes = totalBytes;

Expand Down Expand Up @@ -240,7 +237,7 @@ public long getTotalBytes()
@Override
public long getCompletedBytes()
{
return completedBytes;
return dataSource.getReadBytes();
}

@Override
Expand Down Expand Up @@ -289,11 +286,7 @@ public Page getNextPage()
blocks[fieldId] = new LazyBlock(batchSize, new ParquetBlockLoader(columnDescriptor, type));
}
}
Page page = new Page(batchSize, blocks);

long newCompletedBytes = (long) (totalBytes * parquetReader.getProgress());
completedBytes = min(totalBytes, max(completedBytes, newCompletedBytes));
return page;
return new Page(batchSize, blocks);
}
catch (PrestoException e) {
closeWithSuppression(e);
Expand Down
Expand Up @@ -184,9 +184,9 @@ public static ParquetPageSource createParquetPageSource(

return new ParquetPageSource(
parquetReader,
dataSource,
fileSchema,
requestedSchema,
path,
length,
schema,
columns,
Expand Down

0 comments on commit 1380637

Please sign in to comment.