Skip to content

Commit

Permalink
Add config properties to control Hive writes
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Oct 9, 2015
1 parent 7a068d1 commit 25d44d2
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 9 deletions.
Expand Up @@ -96,6 +96,9 @@ public class HiveClientConfig
private boolean useParquetColumnNames;

private HiveStorageFormat hiveStorageFormat = HiveStorageFormat.RCBINARY;
private boolean respectTableFormat = true;
private boolean immutablePartitions;
private int maxPartitionsPerWriter = 100;

private List<String> resourceConfigFiles;

Expand Down Expand Up @@ -473,6 +476,46 @@ public HiveClientConfig setHiveStorageFormat(HiveStorageFormat hiveStorageFormat
return this;
}

public boolean isRespectTableFormat()
{
return respectTableFormat;
}

@Config("hive.respect-table-format")
@ConfigDescription("Should new partitions be written using the existing table format or the default Presto format")
public HiveClientConfig setRespectTableFormat(boolean respectTableFormat)
{
this.respectTableFormat = respectTableFormat;
return this;
}

public boolean isImmutablePartitions()
{
return immutablePartitions;
}

@Config("hive.immutable-partitions")
@ConfigDescription("Can new data be inserted into existing partitions")
public HiveClientConfig setImmutablePartitions(boolean immutablePartitions)
{
this.immutablePartitions = immutablePartitions;
return this;
}

@Min(1)
public int getMaxPartitionsPerWriter()
{
return maxPartitionsPerWriter;
}

@Config("hive.max-partitions-per-writers")
@ConfigDescription("Maximum number of partitions per writer")
public HiveClientConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter)
{
this.maxPartitionsPerWriter = maxPartitionsPerWriter;
return this;
}

public String getDomainSocketPath()
{
return domainSocketPath;
Expand Down
Expand Up @@ -56,6 +56,7 @@

import static com.facebook.presto.hive.HiveColumnHandle.SAMPLE_WEIGHT_COLUMN_NAME;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_READ_ONLY;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PATH_ALREADY_EXISTS;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_TOO_MANY_OPEN_PARTITIONS;
Expand Down Expand Up @@ -102,13 +103,15 @@ public class HivePageSink
private final HdfsEnvironment hdfsEnvironment;
private final JobConf conf;

private final int maxWriters;
private final int maxOpenPartitions;
private final JsonCodec<PartitionUpdate> partitionUpdateCodec;

private final List<Object> dataRow;
private final List<Object> partitionRow;

private final Table table;
private final boolean immutablePartitions;
private final boolean respectTableFormat;

private HiveRecordWriter[] writers = new HiveRecordWriter[0];

Expand All @@ -124,7 +127,9 @@ public HivePageSink(
PageIndexerFactory pageIndexerFactory,
TypeManager typeManager,
HdfsEnvironment hdfsEnvironment,
int maxWriters,
boolean respectTableFormat,
int maxOpenPartitions,
boolean immutablePartitions,
JsonCodec<PartitionUpdate> partitionUpdateCodec)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
Expand All @@ -143,7 +148,9 @@ public HivePageSink(
this.typeManager = requireNonNull(typeManager, "typeManager is null");

this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.maxWriters = maxWriters;
this.respectTableFormat = respectTableFormat;
this.maxOpenPartitions = maxOpenPartitions;
this.immutablePartitions = immutablePartitions;
this.partitionUpdateCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null");

// divide input columns into partition and data columns
Expand Down Expand Up @@ -243,7 +250,7 @@ public void appendPage(Page page, Block sampleWeightBlock)
Block[] partitionBlocks = getPartitionBlocks(page);

int[] indexes = pageIndexer.indexPage(new Page(page.getPositionCount(), partitionBlocks));
if (pageIndexer.getMaxIndex() >= maxWriters) {
if (pageIndexer.getMaxIndex() >= maxOpenPartitions) {
throw new PrestoException(HIVE_TOO_MANY_OPEN_PARTITIONS, "Too many open partitions");
}
if (pageIndexer.getMaxIndex() >= writers.length) {
Expand Down Expand Up @@ -317,7 +324,12 @@ private HiveRecordWriter createWriter(List<Object> partitionRow)
tableName,
table.getPartitionKeys());
target = table.getSd().getLocation();
outputFormat = table.getSd().getOutputFormat();
if (respectTableFormat) {
outputFormat = table.getSd().getOutputFormat();
}
else {
outputFormat = tableStorageFormat.getOutputFormat();
}
serDe = table.getSd().getSerdeInfo().getSerializationLib();
}
if (!partitionName.isEmpty()) {
Expand Down Expand Up @@ -353,6 +365,10 @@ private HiveRecordWriter createWriter(List<Object> partitionRow)
conf);
}
else {
if (immutablePartitions) {
throw new PrestoException(HIVE_PARTITION_READ_ONLY, "Hive partitions are immutable");
}

// Append to an existing partition
HiveWriteUtils.checkPartitionIsWritable(partitionName, partition.get());

Expand Down
Expand Up @@ -36,7 +36,9 @@ public class HivePageSinkProvider
private final HiveMetastore metastore;
private final PageIndexerFactory pageIndexerFactory;
private final TypeManager typeManager;
private final int maxWriters = 100;
private final boolean respectTableFormat;
private final int maxOpenPartitions;
private final boolean immutablePartitions;
private final JsonCodec<PartitionUpdate> partitionUpdateCodec;

@Inject
Expand All @@ -45,12 +47,16 @@ public HivePageSinkProvider(
HiveMetastore metastore,
PageIndexerFactory pageIndexerFactory,
TypeManager typeManager,
HiveClientConfig config,
JsonCodec<PartitionUpdate> partitionUpdateCodec)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.respectTableFormat = config.isRespectTableFormat();
this.maxOpenPartitions = config.getMaxPartitionsPerWriter();
this.immutablePartitions = config.isImmutablePartitions();
this.partitionUpdateCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null");
}

Expand Down Expand Up @@ -82,7 +88,9 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
pageIndexerFactory,
typeManager,
hdfsEnvironment,
maxWriters,
respectTableFormat,
maxOpenPartitions,
immutablePartitions,
partitionUpdateCodec);
}
}
Expand Up @@ -444,7 +444,7 @@ protected void setup(String host, int port, String databaseName, String timeZone
hiveClientConfig.getMaxInitialSplits(),
false
);
pageSinkProvider = new HivePageSinkProvider(hdfsEnvironment, metastoreClient, new GroupByHashPageIndexerFactory(), typeManager, partitionUpdateCodec);
pageSinkProvider = new HivePageSinkProvider(hdfsEnvironment, metastoreClient, new GroupByHashPageIndexerFactory(), typeManager, new HiveClientConfig(), partitionUpdateCodec);
pageSourceProvider = new HivePageSourceProvider(hiveClientConfig, hdfsEnvironment, DEFAULT_HIVE_RECORD_CURSOR_PROVIDER, DEFAULT_HIVE_DATA_STREAM_FACTORIES, TYPE_MANAGER);
}

Expand Down
Expand Up @@ -166,7 +166,7 @@ protected void setup(String host, int port, String databaseName, String awsAcces
hdfsEnvironment,
new HadoopDirectoryLister(),
executor);
pageSinkProvider = new HivePageSinkProvider(hdfsEnvironment, metastoreClient, new GroupByHashPageIndexerFactory(), typeManager, partitionUpdateCodec);
pageSinkProvider = new HivePageSinkProvider(hdfsEnvironment, metastoreClient, new GroupByHashPageIndexerFactory(), typeManager, new HiveClientConfig(), partitionUpdateCodec);
pageSourceProvider = new HivePageSourceProvider(hiveClientConfig, hdfsEnvironment, DEFAULT_HIVE_RECORD_CURSOR_PROVIDER, DEFAULT_HIVE_DATA_STREAM_FACTORIES, TYPE_MANAGER);
}

Expand Down
Expand Up @@ -65,6 +65,9 @@ public void testDefaults()
.setDomainSocketPath(null)
.setResourceConfigFiles((String) null)
.setHiveStorageFormat(HiveStorageFormat.RCBINARY)
.setRespectTableFormat(true)
.setImmutablePartitions(false)
.setMaxPartitionsPerWriter(100)
.setUseParquetColumnNames(false)
.setS3AwsAccessKey(null)
.setS3AwsSecretKey(null)
Expand Down Expand Up @@ -120,6 +123,9 @@ public void testExplicitPropertyMappings()
.put("hive.domain-compaction-threshold", "42")
.put("hive.recursive-directories", "true")
.put("hive.storage-format", "SEQUENCEFILE")
.put("hive.respect-table-format", "false")
.put("hive.immutable-partitions", "true")
.put("hive.max-partitions-per-writers", "222")
.put("hive.force-local-scheduling", "true")
.put("hive.max-concurrent-file-renames", "100")
.put("hive.assume-canonical-partition-keys", "true")
Expand Down Expand Up @@ -175,6 +181,9 @@ public void testExplicitPropertyMappings()
.setVerifyChecksum(false)
.setResourceConfigFiles(ImmutableList.of("/foo.xml", "/bar.xml"))
.setHiveStorageFormat(HiveStorageFormat.SEQUENCEFILE)
.setRespectTableFormat(false)
.setImmutablePartitions(true)
.setMaxPartitionsPerWriter(222)
.setDomainSocketPath("/foo")
.setUseParquetColumnNames(true)
.setS3AwsAccessKey("abc123")
Expand Down

0 comments on commit 25d44d2

Please sign in to comment.