Skip to content

Commit

Permalink
Add Hive connector procedure call to create empty partition
Browse files Browse the repository at this point in the history
  • Loading branch information
wenleix committed Aug 24, 2018
1 parent 6799240 commit c92cfef
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 14 deletions.
@@ -0,0 +1,128 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.hive.LocationService.WriteInfo;
import com.facebook.presto.hive.PartitionUpdate.UpdateMode;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.procedure.Procedure.Argument;
import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.apache.hadoop.hive.common.FileUtils;

import javax.inject.Inject;
import javax.inject.Provider;

import java.lang.invoke.MethodHandle;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
import static com.facebook.presto.spi.block.MethodHandleUtil.methodHandle;
import static com.facebook.presto.spi.type.StandardTypes.VARCHAR;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class CreateEmptyPartitionProcedure
implements Provider<Procedure>
{
private static final MethodHandle CREATE_EMPTY_PARTITION = methodHandle(
CreateEmptyPartitionProcedure.class,
"createEmptyPartition",
ConnectorSession.class,
String.class,
String.class,
List.class,
List.class);

private final Supplier<TransactionalMetadata> hiveMetadataFactory;
private final ExtendedHiveMetastore metastore;
private final LocationService locationService;
private final JsonCodec<PartitionUpdate> partitionUpdateJsonCodec;

@Inject
public CreateEmptyPartitionProcedure(Supplier<TransactionalMetadata> hiveMetadataFactory, ExtendedHiveMetastore metastore, LocationService locationService, JsonCodec<PartitionUpdate> partitionUpdateCodec)
{
this.hiveMetadataFactory = requireNonNull(hiveMetadataFactory, "hiveMetadataFactory is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.locationService = requireNonNull(locationService, "locationService is null");
this.partitionUpdateJsonCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null");
}

@Override
public Procedure get()
{
return new Procedure(
"system",
"create_empty_partition",
ImmutableList.of(
new Argument("schema", VARCHAR),
new Argument("table", VARCHAR),
new Argument("partitionColumnNames", "array(varchar)"),
new Argument("partitionValues", "array(varchar)")),
CREATE_EMPTY_PARTITION.bindTo(this));
}

public void createEmptyPartition(ConnectorSession session, String schema, String table, List<Object> partitionColumnNames, List<Object> partitionValues)
{
TransactionalMetadata hiveMetadata = hiveMetadataFactory.get();

HiveInsertTableHandle hiveInsertTableHandle = (HiveInsertTableHandle) hiveMetadata.beginInsert(session, new HiveTableHandle(schema, table));

List<String> actualPartitionColumnNames = hiveInsertTableHandle.getInputColumns().stream()
.filter(HiveColumnHandle::isPartitionKey)
.map(HiveColumnHandle::getName)
.collect(toImmutableList());
if (!Objects.equals(partitionColumnNames, actualPartitionColumnNames)) {
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "input partition column names doesn't match actual partition column names");
}

List<String> partitionStringValues = partitionValues.stream()
.map(String.class::cast)
.collect(toImmutableList());

if (metastore.getPartition(schema, table, partitionStringValues).isPresent()) {
throw new PrestoException(ALREADY_EXISTS, "Partition already exists");
}
String partitionName = FileUtils.makePartName(actualPartitionColumnNames, partitionStringValues);

WriteInfo writeInfo = locationService.getPartitionWriteInfo(hiveInsertTableHandle.getLocationHandle(), Optional.empty(), partitionName);
Slice serializedPartitionUpdate = Slices.wrappedBuffer(
partitionUpdateJsonCodec.toJsonBytes(
new PartitionUpdate(
partitionName,
UpdateMode.NEW,
writeInfo.getWritePath(),
writeInfo.getTargetPath(),
ImmutableList.of(),
0,
0,
0)));

hiveMetadata.finishInsert(
session,
hiveInsertTableHandle,
ImmutableList.of(serializedPartitionUpdate),
ImmutableList.of());
hiveMetadata.commit();
}
}
Expand Up @@ -27,13 +27,15 @@
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
import io.airlift.event.client.EventClient;

import javax.inject.Singleton;

import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
Expand Down Expand Up @@ -85,6 +87,7 @@ public void configure(Binder binder)
binder.bind(LocationService.class).to(HiveLocationService.class).in(Scopes.SINGLETON);
binder.bind(TableParameterCodec.class).in(Scopes.SINGLETON);
binder.bind(HiveMetadataFactory.class).in(Scopes.SINGLETON);
binder.bind(new TypeLiteral<Supplier<TransactionalMetadata>>() {}).to(HiveMetadataFactory.class).in(Scopes.SINGLETON);
binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON);
binder.bind(ConnectorSplitManager.class).to(HiveSplitManager.class).in(Scopes.SINGLETON);
newExporter(binder).export(ConnectorSplitManager.class).as(generatedNameOf(HiveSplitManager.class, connectorId));
Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.spi.transaction.IsolationLevel;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class HiveConnector
private final ConnectorPageSinkProvider pageSinkProvider;
private final ConnectorNodePartitioningProvider nodePartitioningProvider;
private final Set<SystemTable> systemTables;
private final Set<Procedure> procedures;
private final List<PropertyMetadata<?>> sessionProperties;
private final List<PropertyMetadata<?>> schemaProperties;
private final List<PropertyMetadata<?>> tableProperties;
Expand All @@ -69,6 +71,7 @@ public HiveConnector(
ConnectorPageSinkProvider pageSinkProvider,
ConnectorNodePartitioningProvider nodePartitioningProvider,
Set<SystemTable> systemTables,
Set<Procedure> procedures,
List<PropertyMetadata<?>> sessionProperties,
List<PropertyMetadata<?>> schemaProperties,
List<PropertyMetadata<?>> tableProperties,
Expand All @@ -83,6 +86,7 @@ public HiveConnector(
this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null");
this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null"));
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null"));
this.schemaProperties = ImmutableList.copyOf(requireNonNull(schemaProperties, "schemaProperties is null"));
this.tableProperties = ImmutableList.copyOf(requireNonNull(tableProperties, "tableProperties is null"));
Expand Down Expand Up @@ -128,6 +132,12 @@ public Set<SystemTable> getSystemTables()
return systemTables;
}

@Override
public Set<Procedure> getProcedures()
{
return procedures;
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
Expand Down
Expand Up @@ -36,9 +36,12 @@
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.type.TypeManager;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.event.client.EventModule;
Expand All @@ -50,6 +53,7 @@
import java.lang.management.ManagementFactory;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
Expand Down Expand Up @@ -98,6 +102,7 @@ public Connector create(String connectorId, Map<String, String> config, Connecto
new HiveMetastoreModule(connectorId, Optional.ofNullable(metastore)),
new HiveSecurityModule(),
new HiveAuthenticationModule(),
new HiveProcedureModule(),
binder -> {
MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
binder.bind(MBeanServer.class).toInstance(new RebindSafeMBeanServer(platformMBeanServer));
Expand All @@ -124,6 +129,7 @@ public Connector create(String connectorId, Map<String, String> config, Connecto
HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
ConnectorAccessControl accessControl = new PartitionsAwareAccessControl(injector.getInstance(ConnectorAccessControl.class));
Set<Procedure> procedures = injector.getInstance(Key.get(new TypeLiteral<Set<Procedure>>() {}));

return new HiveConnector(
lifeCycleManager,
Expand All @@ -134,6 +140,7 @@ public Connector create(String connectorId, Map<String, String> config, Connecto
new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
ImmutableSet.of(),
procedures,
hiveSessionProperties.getSessionProperties(),
HiveSchemaProperties.SCHEMA_PROPERTIES,
hiveTableProperties.getTableProperties(),
Expand Down
@@ -0,0 +1,33 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.spi.procedure.Procedure;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.multibindings.Multibinder;

import static com.google.inject.multibindings.Multibinder.newSetBinder;

public class HiveProcedureModule
implements Module
{
@Override
public void configure(Binder binder)
{
Multibinder<Procedure> procedures = newSetBinder(binder, Procedure.class);
procedures.addBinding().toProvider(CreateEmptyPartitionProcedure.class).in(Scopes.SINGLETON);
}
}
Expand Up @@ -918,6 +918,34 @@ public void testCastNullToColumnTypes()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testCreateEmptyBucketedPartition()
{
for (TestingHiveStorageFormat storageFormat : getAllTestingHiveStorageFormat()) {
testCreateEmptyBucketedPartition(storageFormat.getFormat());
}
}

public void testCreateEmptyBucketedPartition(HiveStorageFormat storageFormat)
{
String tableName = "test_insert_partitioned_bucketed_table";
createPartitionedBucketedTable(tableName, storageFormat);

List<String> orderStatusList = ImmutableList.of("F", "O", "P");
for (int i = 0; i < orderStatusList.size(); i++) {
String sql = format("CALL system.create_empty_partition('%s', '%s', ARRAY['orderstatus'], ARRAY['%s'])", TPCH_SCHEMA, tableName, orderStatusList.get(i));
assertUpdate(sql);
assertQuery(
"SELECT count(*) FROM \"test_insert_partitioned_bucketed_table$partitions\"",
"SELECT " + (i + 1));

assertQueryFails(sql, "Partition already exists.*");
}

assertUpdate("DROP TABLE " + tableName);
assertFalse(getQueryRunner().tableExists(getSession(), tableName));
}

@Test
public void testInsertPartitionedBucketedTable()
{
Expand All @@ -927,20 +955,9 @@ public void testInsertPartitionedBucketedTable()
private void testInsertPartitionedBucketedTable(HiveStorageFormat storageFormat)
{
String tableName = "test_insert_partitioned_bucketed_table";
createPartitionedBucketedTable(tableName, storageFormat);

assertUpdate("" +
"CREATE TABLE " + tableName + " (" +
" custkey bigint," +
" custkey2 bigint," +
" comment varchar," +
" orderstatus varchar)" +
"WITH (" +
"format = '" + storageFormat + "', " +
"partitioned_by = ARRAY[ 'orderstatus' ], " +
"bucketed_by = ARRAY[ 'custkey', 'custkey2' ], " +
"bucket_count = 11)");

ImmutableList<String> orderStatusList = ImmutableList.of("F", "O", "P");
List<String> orderStatusList = ImmutableList.of("F", "O", "P");
for (int i = 0; i < orderStatusList.size(); i++) {
String orderStatus = orderStatusList.get(i);
assertUpdate(
Expand All @@ -961,6 +978,21 @@ private void testInsertPartitionedBucketedTable(HiveStorageFormat storageFormat)
assertFalse(getQueryRunner().tableExists(getSession(), tableName));
}

private void createPartitionedBucketedTable(String tableName, HiveStorageFormat storageFormat)
{
assertUpdate("" +
"CREATE TABLE " + tableName + " (" +
" custkey bigint," +
" custkey2 bigint," +
" comment varchar," +
" orderstatus varchar)" +
"WITH (" +
"format = '" + storageFormat + "', " +
"partitioned_by = ARRAY[ 'orderstatus' ], " +
"bucketed_by = ARRAY[ 'custkey', 'custkey2' ], " +
"bucket_count = 11)");
}

@Test
public void testInsertPartitionedBucketedTableWithUnionAll()
{
Expand All @@ -983,7 +1015,7 @@ private void testInsertPartitionedBucketedTableWithUnionAll(HiveStorageFormat st
"bucketed_by = ARRAY[ 'custkey', 'custkey2' ], " +
"bucket_count = 11)");

ImmutableList<String> orderStatusList = ImmutableList.of("F", "O", "P");
List<String> orderStatusList = ImmutableList.of("F", "O", "P");
for (int i = 0; i < orderStatusList.size(); i++) {
String orderStatus = orderStatusList.get(i);
assertUpdate(
Expand Down

0 comments on commit c92cfef

Please sign in to comment.