Skip to content

Commit

Permalink
Refactor Hive write path, target path and write mode decision
Browse files Browse the repository at this point in the history
Historically, we use the equality of `writePath` and `targetPath`
and whether `writePath` exists to decide the write mode.
This is difficult to understand. Thus the `WriteMode` enum was
introduced in 126fac0.
However, it is only used in `SemiTransactionalMetastore`.

Also, `tagetPath` and `writePath` have to be fetched separately
from `LocationService`. And there are root path (query level)
and non-root path (partition/table level). These APIs are confusing.

This commit did the following refactors:
- Store the `WriteMode` in `LocationHandle`.
- returns a `WriteInfo` contains `targetPath`, `writePath` and the `writeMode`.
  • Loading branch information
wenleix committed May 11, 2018
1 parent 932060b commit 0b27478
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 145 deletions.
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.hive;

import com.facebook.presto.hive.HdfsEnvironment.HdfsContext;
import com.facebook.presto.hive.LocationHandle.WriteMode;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import com.facebook.presto.hive.metastore.Table;
Expand All @@ -30,6 +31,9 @@
import static com.facebook.presto.hive.HiveWriteUtils.getTableDefaultLocation;
import static com.facebook.presto.hive.HiveWriteUtils.isS3FileSystem;
import static com.facebook.presto.hive.HiveWriteUtils.pathExists;
import static com.facebook.presto.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY;
import static com.facebook.presto.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_NEW_DIRECTORY;
import static com.facebook.presto.hive.LocationHandle.WriteMode.STAGE_AND_MOVE_TO_TARGET_DIRECTORY;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand All @@ -55,15 +59,13 @@ public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore, Conn
throw new PrestoException(HIVE_PATH_ALREADY_EXISTS, format("Target directory for table '%s.%s' already exists: %s", schemaName, tableName, targetPath));
}

Path writePath;
if (shouldUseTemporaryDirectory(context, targetPath)) {
writePath = createTemporaryPath(context, hdfsEnvironment, targetPath);
Path writePath = createTemporaryPath(context, hdfsEnvironment, targetPath);
return new LocationHandle(targetPath, writePath, false, STAGE_AND_MOVE_TO_TARGET_DIRECTORY);
}
else {
writePath = targetPath;
return new LocationHandle(targetPath, targetPath, false, DIRECT_TO_TARGET_NEW_DIRECTORY);
}

return new LocationHandle(targetPath, Optional.of(writePath), false);
}

@Override
Expand All @@ -72,15 +74,13 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore,
HdfsContext context = new HdfsContext(session, table.getDatabaseName(), table.getTableName());
Path targetPath = new Path(table.getStorage().getLocation());

Optional<Path> writePath;
if (shouldUseTemporaryDirectory(context, targetPath)) {
writePath = Optional.of(createTemporaryPath(context, hdfsEnvironment, targetPath));
Path writePath = createTemporaryPath(context, hdfsEnvironment, targetPath);
return new LocationHandle(targetPath, writePath, true, STAGE_AND_MOVE_TO_TARGET_DIRECTORY);
}
else {
writePath = Optional.empty();
return new LocationHandle(targetPath, targetPath, true, DIRECT_TO_TARGET_EXISTING_DIRECTORY);
}

return new LocationHandle(targetPath, writePath, true);
}

private boolean shouldUseTemporaryDirectory(HdfsContext context, Path path)
Expand All @@ -90,38 +90,46 @@ private boolean shouldUseTemporaryDirectory(HdfsContext context, Path path)
}

@Override
public Path targetPath(LocationHandle locationHandle, Partition partition, String partitionName)
public WriteInfo getQueryWriteInfo(LocationHandle locationHandle)
{
return new Path(partition.getStorage().getLocation());
}

@Override
public Path targetPath(LocationHandle locationHandle, Optional<String> partitionName)
{
if (!partitionName.isPresent()) {
return locationHandle.getTargetPath();
}
return new Path(locationHandle.getTargetPath(), partitionName.get());
return new WriteInfo(locationHandle.getTargetPath(), locationHandle.getWritePath(), locationHandle.getWriteMode());
}

@Override
public Path targetPathRoot(LocationHandle locationHandle)
public WriteInfo getTableWriteInfo(LocationHandle locationHandle)
{
return locationHandle.getTargetPath();
return new WriteInfo(locationHandle.getTargetPath(), locationHandle.getWritePath(), locationHandle.getWriteMode());
}

@Override
public Optional<Path> writePath(LocationHandle locationHandle, Optional<String> partitionName)
public WriteInfo getPartitionWriteInfo(LocationHandle locationHandle, Optional<Partition> partition, String partitionName)
{
if (!partitionName.isPresent()) {
return locationHandle.getWritePath();
if (partition.isPresent()) {
// existing partition
WriteMode writeMode = locationHandle.getWriteMode();
Path targetPath = new Path(partition.get().getStorage().getLocation());

Path writePath;
switch (writeMode) {
case STAGE_AND_MOVE_TO_TARGET_DIRECTORY:
writePath = new Path(locationHandle.getWritePath(), partitionName);
break;
case DIRECT_TO_TARGET_EXISTING_DIRECTORY:
writePath = targetPath;
break;
case DIRECT_TO_TARGET_NEW_DIRECTORY:
default:
throw new UnsupportedOperationException(format("inserting into existing partition is not supported for %s", writeMode));
}

return new WriteInfo(targetPath, writePath, writeMode);
}
else {
// new partition
return new WriteInfo(
new Path(locationHandle.getTargetPath(), partitionName),
new Path(locationHandle.getWritePath(), partitionName),
locationHandle.getWriteMode());
}
return locationHandle.getWritePath().map(path -> new Path(path, partitionName.get()));
}

@Override
public Optional<Path> writePathRoot(LocationHandle locationHandle)
{
return locationHandle.getWritePath();
}
}
Expand Up @@ -14,14 +14,14 @@
package com.facebook.presto.hive;

import com.facebook.presto.hive.HdfsEnvironment.HdfsContext;
import com.facebook.presto.hive.LocationService.WriteInfo;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Database;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore.WriteMode;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.statistics.HiveStatisticsProvider;
Expand Down Expand Up @@ -149,9 +149,6 @@
import static com.facebook.presto.hive.metastore.MetastoreUtil.getProtectMode;
import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyOnline;
import static com.facebook.presto.hive.metastore.PrincipalType.USER;
import static com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore.WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY;
import static com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore.WriteMode.DIRECT_TO_TARGET_NEW_DIRECTORY;
import static com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore.WriteMode.STAGE_AND_MOVE_TO_TARGET_DIRECTORY;
import static com.facebook.presto.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf;
Expand Down Expand Up @@ -629,7 +626,7 @@ private void createHiveTable(ConnectorSession session, ConnectorTableMetadata ta
else {
external = false;
LocationHandle locationHandle = locationService.forNewTable(metastore, session, schemaName, tableName);
targetPath = locationService.targetPathRoot(locationHandle);
targetPath = locationService.getQueryWriteInfo(locationHandle).getTargetPath();
}

Table table = buildTableObject(
Expand Down Expand Up @@ -848,10 +845,8 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
session.getUser(),
tableProperties);

Path writePathRoot = locationService.writePathRoot(locationHandle).get();
Path targetPathRoot = locationService.targetPathRoot(locationHandle);
WriteMode mode = writePathRoot.equals(targetPathRoot) ? DIRECT_TO_TARGET_NEW_DIRECTORY : STAGE_AND_MOVE_TO_TARGET_DIRECTORY;
metastore.declareIntentionToWrite(session, mode, writePathRoot, result.getFilePrefix(), schemaTableName);
WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), result.getFilePrefix(), schemaTableName);

return result;
}
Expand All @@ -866,9 +861,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
.map(partitionUpdateCodec::fromJson)
.collect(toList());

Path targetPath = locationService.targetPathRoot(handle.getLocationHandle());
Path writePath = locationService.writePathRoot(handle.getLocationHandle()).get();

WriteInfo writeInfo = locationService.getQueryWriteInfo(handle.getLocationHandle());
Table table = buildTableObject(
session.getQueryId(),
handle.getSchemaName(),
Expand All @@ -879,7 +872,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
handle.getPartitionedBy(),
handle.getBucketProperty(),
handle.getAdditionalTableParameters(),
targetPath,
writeInfo.getTargetPath(),
false,
prestoVersion);
PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(handle.getTableOwner());
Expand All @@ -904,7 +897,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
table = updateStatistics(table, tableStatistic, ADD);
}

metastore.createTable(session, table, principalPrivileges, Optional.of(writePath), false);
metastore.createTable(session, table, principalPrivileges, Optional.of(writeInfo.getWritePath()), false);

if (!handle.getPartitionedBy().isEmpty()) {
if (isRespectTableFormat(session)) {
Expand Down Expand Up @@ -1055,16 +1048,8 @@ public HiveInsertTableHandle beginInsert(ConnectorSession session, ConnectorTabl
tableStorageFormat,
isRespectTableFormat(session) ? tableStorageFormat : HiveSessionProperties.getHiveStorageFormat(session));

Optional<Path> writePathRoot = locationService.writePathRoot(locationHandle);
Path targetPathRoot = locationService.targetPathRoot(locationHandle);
if (writePathRoot.isPresent()) {
WriteMode mode = writePathRoot.get().equals(targetPathRoot) ? DIRECT_TO_TARGET_NEW_DIRECTORY : STAGE_AND_MOVE_TO_TARGET_DIRECTORY;
metastore.declareIntentionToWrite(session, mode, writePathRoot.get(), result.getFilePrefix(), tableName);
}
else {
metastore.declareIntentionToWrite(session, DIRECT_TO_TARGET_EXISTING_DIRECTORY, targetPathRoot, result.getFilePrefix(), tableName);
}

WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), result.getFilePrefix(), tableName);
return result;
}

Expand Down
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.hive;

import com.facebook.presto.hive.HdfsEnvironment.HdfsContext;
import com.facebook.presto.hive.LocationService.WriteInfo;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.HivePageSinkMetadataProvider;
import com.facebook.presto.hive.metastore.Partition;
Expand Down Expand Up @@ -64,6 +65,7 @@
import static com.facebook.presto.hive.HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION;
import static com.facebook.presto.hive.HiveType.toHiveTypes;
import static com.facebook.presto.hive.HiveWriteUtils.getField;
import static com.facebook.presto.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf;
Expand Down Expand Up @@ -182,17 +184,17 @@ public HiveWriterFactory(
Path writePath;
if (isCreateTable) {
this.table = null;
writePath = locationService.writePathRoot(locationHandle)
.orElseThrow(() -> new IllegalArgumentException("CREATE TABLE must have a write path"));
WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
checkArgument(writeInfo.getWriteMode() != DIRECT_TO_TARGET_EXISTING_DIRECTORY, "CREATE TABLE write mode cannot be DIRECT_TO_TARGET_EXISTING_DIRECTORY");
writePath = writeInfo.getWritePath();
}
else {
Optional<Table> table = pageSinkMetadataProvider.getTable();
if (!table.isPresent()) {
throw new PrestoException(HIVE_INVALID_METADATA, format("Table %s.%s was dropped during insert", schemaName, tableName));
}
this.table = table.get();
writePath = locationService.writePathRoot(locationHandle)
.orElseGet(() -> locationService.targetPathRoot(locationHandle));
writePath = locationService.getQueryWriteInfo(locationHandle).getWritePath();
}

this.bucketCount = requireNonNull(bucketCount, "bucketCount is null");
Expand Down Expand Up @@ -259,8 +261,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt

boolean isNew;
Properties schema;
Path target;
Path write;
WriteInfo writeInfo;
StorageFormat outputStorageFormat;
if (!partition.isPresent()) {
if (table == null) {
Expand All @@ -276,27 +277,36 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
.map(HiveType::getHiveTypeName)
.map(HiveTypeName::toString)
.collect(joining(":")));
target = locationService.targetPath(locationHandle, partitionName);
write = locationService.writePath(locationHandle, partitionName).get();

if (partitionName.isPresent() && !target.equals(write)) {
// When target path is different from write path,
// verify that the target directory for the partition does not already exist
if (HiveWriteUtils.pathExists(new HdfsContext(session, schemaName, tableName), hdfsEnvironment, target)) {
throw new PrestoException(HIVE_PATH_ALREADY_EXISTS, format(
"Target directory for new partition '%s' of table '%s.%s' already exists: %s",
partitionName,
schemaName,
tableName,
target));

if (!partitionName.isPresent()) {
// new unpartitioned table
writeInfo = locationService.getTableWriteInfo(locationHandle);
}
else {
// a new partition in a new partitioned table
writeInfo = locationService.getPartitionWriteInfo(locationHandle, partition, partitionName.get());

if (!writeInfo.getWriteMode().isWritePathSameAsTargetPath()) {
// When target path is different from write path,
// verify that the target directory for the partition does not already exist
if (HiveWriteUtils.pathExists(new HdfsContext(session, schemaName, tableName), hdfsEnvironment, writeInfo.getTargetPath())) {
throw new PrestoException(HIVE_PATH_ALREADY_EXISTS, format(
"Target directory for new partition '%s' of table '%s.%s' already exists: %s",
partitionName,
schemaName,
tableName,
writeInfo.getTargetPath()));
}
}
}
}
else {
// Write to: a new partition in an existing partitioned table,
// or an existing unpartitioned table
if (partitionName.isPresent()) {
// a new partition in an existing partitioned table
isNew = true;
writeInfo = locationService.getPartitionWriteInfo(locationHandle, partition, partitionName.get());
}
else {
if (bucketNumber.isPresent()) {
Expand All @@ -306,10 +316,10 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
throw new PrestoException(HIVE_PARTITION_READ_ONLY, "Unpartitioned Hive tables are immutable");
}
isNew = false;
writeInfo = locationService.getTableWriteInfo(locationHandle);
}

schema = getHiveSchema(table);
target = locationService.targetPath(locationHandle, partitionName);
write = locationService.writePath(locationHandle, partitionName).orElse(target);
}

if (partitionName.isPresent()) {
Expand Down Expand Up @@ -357,15 +367,14 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
outputStorageFormat = partition.get().getStorage().getStorageFormat();
schema = getHiveSchema(partition.get(), table);

target = locationService.targetPath(locationHandle, partition.get(), partitionName.get());
write = locationService.writePath(locationHandle, partitionName).orElse(target);
writeInfo = locationService.getPartitionWriteInfo(locationHandle, partition, partitionName.get());
}

validateSchema(partitionName, schema);

String fileNameWithExtension = fileName + getFileExtension(conf, outputStorageFormat);

Path path = new Path(write, fileNameWithExtension);
Path path = new Path(writeInfo.getWritePath(), fileNameWithExtension);

HiveFileWriter hiveFileWriter = null;
for (HiveFileWriterFactory fileWriterFactory : fileWriterFactories) {
Expand Down Expand Up @@ -426,7 +435,15 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
hiveWriter.getRowCount()));
};

return new HiveWriter(hiveFileWriter, partitionName, isNew, fileNameWithExtension, write.toString(), target.toString(), onCommit, hiveWriterStats);
return new HiveWriter(
hiveFileWriter,
partitionName,
isNew,
fileNameWithExtension,
writeInfo.getWritePath().toString(),
writeInfo.getTargetPath().toString(),
onCommit,
hiveWriterStats);
}

private void validateSchema(Optional<String> partitionName, Properties schema)
Expand Down

0 comments on commit 0b27478

Please sign in to comment.