Skip to content

Commit

Permalink
Add create table support for Hive
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenxiao authored and electrum committed Mar 31, 2015
1 parent 8ee8f02 commit 3759d1c
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 45 deletions.
143 changes: 105 additions & 38 deletions presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java
Expand Up @@ -32,7 +32,6 @@
import com.facebook.presto.spi.type.TypeManager;
import com.google.common.base.Function;
import com.google.common.base.StandardSystemProperty;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -75,6 +74,7 @@
import static com.google.common.collect.Iterables.transform;
import static java.lang.String.format;
import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;

public class HiveMetadata
Expand Down Expand Up @@ -299,7 +299,61 @@ public ColumnMetadata getColumnMetadata(ConnectorTableHandle tableHandle, Connec
@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
throw new UnsupportedOperationException();
checkArgument(!isNullOrEmpty(tableMetadata.getOwner()), "Table owner is null or empty");

SchemaTableName schemaTableName = tableMetadata.getTable();
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();

ImmutableList.Builder<String> columnNames = ImmutableList.builder();
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();

buildColumnInfo(tableMetadata, columnNames, columnTypes);

ImmutableList.Builder<FieldSchema> partitionKeys = ImmutableList.builder();
ImmutableList.Builder<FieldSchema> columns = ImmutableList.builder();

List<String> names = columnNames.build();
List<String> typeNames = columnTypes.build().stream()
.map(HiveType::toHiveType)
.map(HiveType::getHiveTypeName)
.collect(toList());

for (int i = 0; i < names.size(); i++) {
if (tableMetadata.getColumns().get(i).isPartitionKey()) {
partitionKeys.add(new FieldSchema(names.get(i), typeNames.get(i), null));
}
else {
columns.add(new FieldSchema(names.get(i), typeNames.get(i), null));
}
}

Path targetPath = getTargetPath(schemaName, tableName, schemaTableName);

HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(session, this.hiveStorageFormat);
SerDeInfo serdeInfo = new SerDeInfo();
serdeInfo.setName(tableName);
serdeInfo.setSerializationLib(hiveStorageFormat.getSerDe());

StorageDescriptor sd = new StorageDescriptor();
sd.setLocation(targetPath.toString());

sd.setCols(columns.build());
sd.setSerdeInfo(serdeInfo);
sd.setInputFormat(hiveStorageFormat.getInputFormat());
sd.setOutputFormat(hiveStorageFormat.getOutputFormat());

Table table = new Table();
table.setDbName(schemaName);
table.setTableName(tableName);
table.setOwner(tableMetadata.getOwner());
table.setTableType(TableType.MANAGED_TABLE.toString());
String tableComment = "Created by Presto";
table.setParameters(ImmutableMap.of("comment", tableComment));
table.setPartitionKeys(partitionKeys.build());
table.setSd(sd);

metastore.createTable(table);
}

@Override
Expand Down Expand Up @@ -346,42 +400,15 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto

ImmutableList.Builder<String> columnNames = ImmutableList.builder();
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
for (ColumnMetadata column : tableMetadata.getColumns()) {
// TODO: also verify that the OutputFormat supports the type
if (!HiveRecordSink.isTypeSupported(column.getType())) {
throw new PrestoException(NOT_SUPPORTED, format("Cannot create table with unsupported type: %s", column.getType().getDisplayName()));
}
columnNames.add(column.getName());
columnTypes.add(column.getType());
}
if (tableMetadata.isSampled()) {
columnNames.add(SAMPLE_WEIGHT_COLUMN_NAME);
columnTypes.add(BIGINT);
}

// get the root directory for the database
SchemaTableName table = tableMetadata.getTable();
String schemaName = table.getSchemaName();
String tableName = table.getTableName();
SchemaTableName schemaTableName = tableMetadata.getTable();
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();

String location = getDatabase(schemaName).getLocationUri();
if (isNullOrEmpty(location)) {
throw new RuntimeException(format("Database '%s' location is not set", schemaName));
}
buildColumnInfo(tableMetadata, columnNames, columnTypes);

Path databasePath = new Path(location);
if (!pathExists(databasePath)) {
throw new RuntimeException(format("Database '%s' location does not exist: %s", schemaName, databasePath));
}
if (!isDirectory(databasePath)) {
throw new RuntimeException(format("Database '%s' location is not a directory: %s", schemaName, databasePath));
}

// verify the target directory for the table
Path targetPath = new Path(databasePath, tableName);
if (pathExists(targetPath)) {
throw new RuntimeException(format("Target directory for table '%s' already exists: %s", table, targetPath));
}
Path targetPath = getTargetPath(schemaName, tableName, schemaTableName);

if (!useTemporaryDirectory(targetPath)) {
return new HiveOutputTableHandle(
Expand Down Expand Up @@ -438,10 +465,10 @@ public void commitCreateTable(ConnectorOutputTableHandle tableHandle, Collection
}

// create the table in the metastore
List<String> types = FluentIterable.from(handle.getColumnTypes())
.transform(HiveType::toHiveType)
.transform(HiveType::getHiveTypeName)
.toList();
List<String> types = handle.getColumnTypes().stream()
.map(HiveType::toHiveType)
.map(HiveType::getHiveTypeName)
.collect(toList());

boolean sampled = false;
ImmutableList.Builder<FieldSchema> columns = ImmutableList.builder();
Expand Down Expand Up @@ -488,6 +515,29 @@ public void commitCreateTable(ConnectorOutputTableHandle tableHandle, Collection
metastore.createTable(table);
}

private Path getTargetPath(String schemaName, String tableName, SchemaTableName schemaTableName)
{
String location = getDatabase(schemaName).getLocationUri();
if (isNullOrEmpty(location)) {
throw new RuntimeException(format("Database '%s' location is not set", schemaName));
}

Path databasePath = new Path(location);
if (!pathExists(databasePath)) {
throw new RuntimeException(format("Database '%s' location does not exist: %s", schemaName, databasePath));
}
if (!isDirectory(databasePath)) {
throw new RuntimeException(format("Database '%s' location is not a directory: %s", schemaName, databasePath));
}

// verify the target directory for the table
Path targetPath = new Path(databasePath, tableName);
if (pathExists(targetPath)) {
throw new RuntimeException(format("Target directory for table '%s' already exists: %s", schemaTableName, targetPath));
}
return targetPath;
}

private Database getDatabase(String database)
{
try {
Expand Down Expand Up @@ -683,6 +733,23 @@ private void verifyJvmTimeZone()
}
}

private static void buildColumnInfo(ConnectorTableMetadata tableMetadata, ImmutableList.Builder<String> names, ImmutableList.Builder<Type> types)
{
for (ColumnMetadata column : tableMetadata.getColumns()) {
// TODO: also verify that the OutputFormat supports the type
if (!HiveRecordSink.isTypeSupported(column.getType())) {
throw new PrestoException(NOT_SUPPORTED, format("Cannot create table with unsupported type: %s", column.getType().getDisplayName()));
}
names.add(column.getName());
types.add(column.getType());
}

if (tableMetadata.isSampled()) {
names.add(SAMPLE_WEIGHT_COLUMN_NAME);
types.add(BIGINT);
}
}

private static Function<HiveColumnHandle, ColumnMetadata> columnMetadataGetter(Table table, final TypeManager typeManager)
{
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
Expand Down
Expand Up @@ -34,11 +34,4 @@ public void testInsert()
{
// Hive connector currently does not support insert
}

@Override
public void testCreateTable()
throws Exception
{
// Hive connector currently does not support create table
}
}

0 comments on commit 3759d1c

Please sign in to comment.