Skip to content

Commit

Permalink
Merge pull request apache#42 from palantir/pw/parquetUpgrades
Browse files Browse the repository at this point in the history
Upgrade to new parquet
  • Loading branch information
pwoody authored Sep 27, 2016
2 parents 271c85e + 3449f84 commit 7f02e95
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 23 deletions.
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<derby.version>10.12.1.1</derby.version>
<parquet.version>1.8.1</parquet.version>
<parquet.version>1.8.1-palantir7</parquet.version>
<hive.parquet.version>1.6.0</hive.parquet.version>
<jetty.version>9.2.16.v20160414</jetty.version>
<javaxservlet.version>3.1.0</javaxservlet.version>
Expand Down Expand Up @@ -223,6 +223,18 @@
<CodeCacheSize>512m</CodeCacheSize>
</properties>
<repositories>
<repository>
<id>palantir</id>
<!-- This should be at top, it makes maven try the central repo first and then others and hence faster dep resolution -->
<name>Palantir Maven Repository</name>
<url>http://dl.bintray.com/palantir/releases</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>central</id>
<!-- This should be at top, it makes maven try the central repo first and then others and hence faster dep resolution -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Map;
import java.util.Set;

import com.google.common.collect.ImmutableList;
import org.apache.parquet.filter2.compat.RowGroupFilter;
import scala.Option;

import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
Expand Down Expand Up @@ -105,9 +107,13 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
if (rowGroupOffsets == null) {
// then we need to apply the predicate push down filter
footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
MessageType fileSchema = footer.getFileMetaData().getSchema();
FilterCompat.Filter filter = getFilter(configuration);
blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
ParquetFileReader reader = ParquetFileReader.open(taskAttemptContext.getConfiguration(), file, footer);
blocks = filterRowGroups(
ImmutableList.of(RowGroupFilter.FilterLevel.STATISTICS, RowGroupFilter.FilterLevel.DICTIONARY),
filter,
footer.getBlocks(),
reader);
} else {
// otherwise we find the row groups that were selected on the client
footer = readFooter(configuration, file, NO_FILTER);
Expand Down Expand Up @@ -146,8 +152,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
String sparkRequestedSchemaString =
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
this.reader = new ParquetFileReader(
configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
this.reader = new ParquetFileReader(configuration, file, footer);
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ private void readPageV2(DataPageV2 page) throws IOException {
this.defColumn = new VectorizedRleValuesReader(bitWidth);
this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
this.defColumn.initFromBuffer(
this.pageValueCount, page.getDefinitionLevels().toByteArray());
this.pageValueCount, page.getDefinitionLevels().toByteBuffer());
try {
initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
public VectorizedPlainValuesReader() {
}

@Override
public void initFromPage(int valueCount, ByteBuffer byteBuffer, int offset) throws IOException {
initFromPage(valueCount, byteBuffer.array(), offset);
}

@Override
public void initFromPage(int valueCount, byte[] bytes, int offset) throws IOException {
this.buffer = bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import org.apache.spark.sql.execution.vectorized.ColumnVector;

import java.nio.ByteBuffer;

/**
* A values reader for Parquet's run-length encoded data. This is based off of the version in
* parquet-mr with these changes:
Expand All @@ -49,7 +51,7 @@ private enum MODE {
}

// Encoded data.
private byte[] in;
private ByteBuffer in;
private int end;
private int offset;

Expand Down Expand Up @@ -81,7 +83,7 @@ public VectorizedRleValuesReader(int bitWidth) {
}

@Override
public void initFromPage(int valueCount, byte[] page, int start) {
public void initFromPage(int valueCount, ByteBuffer page, int start) {
this.offset = start;
this.in = page;
if (fixedWidth) {
Expand All @@ -90,8 +92,8 @@ public void initFromPage(int valueCount, byte[] page, int start) {
this.end = this.offset + length;
}
} else {
this.end = page.length;
if (this.end != this.offset) init(page[this.offset++] & 255);
this.end = page.limit();
if (this.end != this.offset) init(page.get(this.offset++) & 255);
}
if (bitWidth == 0) {
// 0 bit width, treat this as an RLE run of valueCount number of 0's.
Expand All @@ -105,10 +107,10 @@ public void initFromPage(int valueCount, byte[] page, int start) {

// Initialize the reader from a buffer. This is used for the V2 page encoding where the
// definition are in its own buffer.
public void initFromBuffer(int valueCount, byte[] data) {
public void initFromBuffer(int valueCount, ByteBuffer data) {
this.offset = 0;
this.in = data;
this.end = data.length;
this.end = data.limit();
if (bitWidth == 0) {
// 0 bit width, treat this as an RLE run of valueCount number of 0's.
this.mode = MODE.RLE;
Expand Down Expand Up @@ -527,7 +529,7 @@ private int readUnsignedVarInt() {
int shift = 0;
int b;
do {
b = in[offset++] & 255;
b = in.get(offset++) & 255;
value |= (b & 0x7F) << shift;
shift += 7;
} while ((b & 0x80) != 0);
Expand All @@ -538,10 +540,10 @@ private int readUnsignedVarInt() {
* Reads the next 4 byte little endian int.
*/
private int readIntLittleEndian() {
int ch4 = in[offset] & 255;
int ch3 = in[offset + 1] & 255;
int ch2 = in[offset + 2] & 255;
int ch1 = in[offset + 3] & 255;
int ch4 = in.get(offset) & 255;
int ch3 = in.get(offset + 1) & 255;
int ch2 = in.get(offset + 2) & 255;
int ch1 = in.get(offset + 3) & 255;
offset += 4;
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
}
Expand All @@ -554,17 +556,17 @@ private int readIntLittleEndianPaddedOnBitWidth() {
case 0:
return 0;
case 1:
return in[offset++] & 255;
return in.get(offset++) & 255;
case 2: {
int ch2 = in[offset] & 255;
int ch1 = in[offset + 1] & 255;
int ch2 = in.get(offset) & 255;
int ch1 = in.get(offset + 1) & 255;
offset += 2;
return (ch1 << 8) + ch2;
}
case 3: {
int ch3 = in[offset] & 255;
int ch2 = in[offset + 1] & 255;
int ch1 = in[offset + 2] & 255;
int ch3 = in.get(offset) & 255;
int ch2 = in.get(offset + 1) & 255;
int ch1 = in.get(offset + 2) & 255;
offset += 3;
return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
}
Expand Down

0 comments on commit 7f02e95

Please sign in to comment.