Skip to content

Commit

Permalink
Use separate ValuesWriterFactory instances for non-default page optio…
Browse files Browse the repository at this point in the history
…ns when creating ParquetWriter

The sharing of a single `ValuesWriterFactory` among multiple
`ParquetProperties` instances with different write options is not
thread-safe and may lead to inconsistency issues.
  • Loading branch information
hantangwangd committed May 13, 2024
1 parent 00655a0 commit 4616db9
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.Builder;
import org.apache.parquet.format.ColumnMetaData;
import org.apache.parquet.format.FileMetaData;
import org.apache.parquet.format.RowGroup;
Expand All @@ -38,6 +39,7 @@
import java.util.Map;

import static com.facebook.presto.parquet.writer.ParquetDataOutput.createDataOutput;
import static com.facebook.presto.parquet.writer.ParquetWriterOptions.DEFAULT_MAX_PAGE_SIZE;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -98,11 +100,19 @@ public ParquetWriter(OutputStream outputStream,

this.messageType = requireNonNull(messageType, "messageType is null");

ParquetProperties parquetProperties = ParquetProperties.builder()
Builder parquetPropertiesBuilder = ParquetProperties.builder()
.withWriterVersion(writerOption.getWriterVersion())
.withPageSize(writerOption.getMaxPageSize())
.withDictionaryPageSize(writerOption.getMaxDictionaryPageSize())
.build();
.withDictionaryPageSize(writerOption.getMaxDictionaryPageSize());

// It's not thread-safe to share a single `ValuesWriterFactory` between all `ParquetProperties` instances with different page options.
// So set a separate `ValuesWriterFactory` instance to `ParquetProperties` with non-default page options on its creation,
// and share a single `ValuesWriterFactory` instance between all `ParquetProperties` instances with default page options.
if (!DEFAULT_MAX_PAGE_SIZE.equals(DataSize.succinctBytes(writerOption.getMaxPageSize())) ||
!DEFAULT_MAX_PAGE_SIZE.equals(DataSize.succinctBytes(writerOption.getMaxDictionaryPageSize()))) {
parquetPropertiesBuilder.withValuesWriterFactory(ParquetWriters.getValuesWriterFactory(writerOption.getWriterVersion()));
}
ParquetProperties parquetProperties = parquetPropertiesBuilder.build();
CompressionCodecName compressionCodecName = getCompressionCodecName(compressionCodecClass);
this.columnWriters = ParquetWriters.getColumnWriters(messageType, primitiveTypes, parquetProperties, compressionCodecName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

public class ParquetWriterOptions
{
private static final DataSize DEFAULT_MAX_ROW_GROUP_SIZE = DataSize.valueOf("128MB");
private static final DataSize DEFAULT_MAX_PAGE_SIZE = DataSize.valueOf("1MB");
protected static final DataSize DEFAULT_MAX_ROW_GROUP_SIZE = DataSize.valueOf("128MB");
protected static final DataSize DEFAULT_MAX_PAGE_SIZE = DataSize.valueOf("1MB");
public static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_2_0;

public static ParquetWriterOptions.Builder builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
import com.google.common.collect.ImmutableList;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
import org.apache.parquet.column.values.factory.DefaultV2ValuesWriterFactory;
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -61,6 +65,18 @@ class ParquetWriters
{
private ParquetWriters() {}

static ValuesWriterFactory getValuesWriterFactory(WriterVersion writerVersion)
{
switch (writerVersion) {
case PARQUET_1_0:
return new DefaultV1ValuesWriterFactory();
case PARQUET_2_0:
return new DefaultV2ValuesWriterFactory();
default:
throw new PrestoException(NOT_SUPPORTED, format("Unsupported Parquet writer version: %s", writerVersion));
}
}

static List<ColumnWriter> getColumnWriters(MessageType messageType, Map<List<String>, Type> prestoTypes, ParquetProperties parquetProperties, CompressionCodecName compressionCodecName)
{
WriterBuilder writeBuilder = new WriterBuilder(messageType, prestoTypes, parquetProperties, compressionCodecName);
Expand Down

0 comments on commit 4616db9

Please sign in to comment.