Skip to content

Commit

Permalink
Extract partitionUpdates deserialization logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shixuan-fan committed Mar 9, 2019
1 parent 3199fb5 commit edcd840
Showing 1 changed file with 10 additions and 8 deletions.
Expand Up @@ -1075,10 +1075,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
{
HiveOutputTableHandle handle = (HiveOutputTableHandle) tableHandle;

List<PartitionUpdate> partitionUpdates = fragments.stream()
.map(Slice::getBytes)
.map(partitionUpdateCodec::fromJson)
.collect(toList());
List<PartitionUpdate> partitionUpdates = getPartitionUpdates(fragments);

WriteInfo writeInfo = locationService.getQueryWriteInfo(handle.getLocationHandle());
Table table = buildTableObject(
Expand Down Expand Up @@ -1297,10 +1294,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
{
HiveInsertTableHandle handle = (HiveInsertTableHandle) insertHandle;

List<PartitionUpdate> partitionUpdates = fragments.stream()
.map(Slice::getBytes)
.map(partitionUpdateCodec::fromJson)
.collect(toList());
List<PartitionUpdate> partitionUpdates = getPartitionUpdates(fragments);

HiveStorageFormat tableStorageFormat = handle.getTableStorageFormat();
partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates);
Expand Down Expand Up @@ -2044,6 +2038,14 @@ private static boolean hasAdminRole(Set<PrestoPrincipal> roles)
return roles.stream().anyMatch(principal -> principal.getName().equalsIgnoreCase(ADMIN_ROLE_NAME));
}

private List<PartitionUpdate> getPartitionUpdates(Collection<Slice> fragments)
{
return fragments.stream()
.map(Slice::getBytes)
.map(partitionUpdateCodec::fromJson)
.collect(toList());
}

private void verifyJvmTimeZone()
{
if (!allowCorruptWritesForTesting && !timeZone.equals(DateTimeZone.getDefault())) {
Expand Down

0 comments on commit edcd840

Please sign in to comment.