Skip to content

Commit

Permalink
Add shared distributions for Raptor tables
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Feb 3, 2016
1 parent b43cd37 commit d7dd2dc
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 32 deletions.
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.raptor;

import com.facebook.presto.raptor.metadata.ColumnInfo;
import com.facebook.presto.raptor.metadata.Distribution;
import com.facebook.presto.raptor.metadata.MetadataDao;
import com.facebook.presto.raptor.metadata.ShardDelta;
import com.facebook.presto.raptor.metadata.ShardInfo;
Expand Down Expand Up @@ -72,11 +73,14 @@
import static com.facebook.presto.raptor.RaptorSessionProperties.getExternalBatchId;
import static com.facebook.presto.raptor.RaptorTableProperties.BUCKETED_ON_PROPERTY;
import static com.facebook.presto.raptor.RaptorTableProperties.BUCKET_COUNT_PROPERTY;
import static com.facebook.presto.raptor.RaptorTableProperties.DISTRIBUTION_NAME_PROPERTY;
import static com.facebook.presto.raptor.RaptorTableProperties.getBucketColumns;
import static com.facebook.presto.raptor.RaptorTableProperties.getBucketCount;
import static com.facebook.presto.raptor.RaptorTableProperties.getDistributionName;
import static com.facebook.presto.raptor.RaptorTableProperties.getSortColumns;
import static com.facebook.presto.raptor.RaptorTableProperties.getTemporalColumn;
import static com.facebook.presto.raptor.util.DatabaseUtil.onDemandDao;
import static com.facebook.presto.raptor.util.DatabaseUtil.runIgnoringConstraintViolation;
import static com.facebook.presto.raptor.util.DatabaseUtil.runTransaction;
import static com.facebook.presto.raptor.util.Types.checkType;
import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS;
Expand Down Expand Up @@ -334,21 +338,6 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
}
}

OptionalInt bucketCount = getBucketCount(tableMetadata.getProperties());
List<RaptorColumnHandle> bucketColumnHandles = getBucketColumnHandles(getBucketColumns(tableMetadata.getProperties()), columnHandleMap);

if (bucketCount.isPresent() && bucketColumnHandles.isEmpty()) {
throw new PrestoException(INVALID_TABLE_PROPERTY, format("Must specify '%s' along with '%s'", BUCKETED_ON_PROPERTY, BUCKET_COUNT_PROPERTY));
}
if (!bucketCount.isPresent() && !bucketColumnHandles.isEmpty()) {
throw new PrestoException(INVALID_TABLE_PROPERTY, format("Must specify '%s' along with '%s'", BUCKET_COUNT_PROPERTY, BUCKETED_ON_PROPERTY));
}
for (RaptorColumnHandle column : bucketColumnHandles) {
if (!column.getColumnType().equals(BIGINT)) {
throw new PrestoException(NOT_SUPPORTED, "Bucketing is only supported for BIGINT columns");
}
}

RaptorColumnHandle sampleWeightColumnHandle = null;
if (tableMetadata.isSampled()) {
sampleWeightColumnHandle = new RaptorColumnHandle(connectorId, SAMPLE_WEIGHT_COLUMN_NAME, columnId, BIGINT);
Expand All @@ -360,6 +349,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

setTransactionId(transactionId);

Optional<DistributionInfo> distribution = getOrCreateDistribution(columnHandleMap, tableMetadata.getProperties());

return new RaptorOutputTableHandle(
connectorId,
transactionId,
Expand All @@ -371,8 +362,74 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
sortColumnHandles,
nCopies(sortColumnHandles.size(), ASC_NULLS_FIRST),
temporalColumnHandle,
bucketCount,
bucketColumnHandles);
distribution.map(info -> OptionalLong.of(info.getDistributionId())).orElse(OptionalLong.empty()),
distribution.map(info -> OptionalInt.of(info.getBucketCount())).orElse(OptionalInt.empty()),
distribution.map(DistributionInfo::getBucketColumns).orElse(ImmutableList.of()));
}

private Optional<DistributionInfo> getOrCreateDistribution(Map<String, RaptorColumnHandle> columnHandleMap, Map<String, Object> properties)
{
OptionalInt bucketCount = getBucketCount(properties);
List<RaptorColumnHandle> bucketColumnHandles = getBucketColumnHandles(getBucketColumns(properties), columnHandleMap);

if (bucketCount.isPresent() && bucketColumnHandles.isEmpty()) {
throw new PrestoException(INVALID_TABLE_PROPERTY, format("Must specify '%s' along with '%s'", BUCKETED_ON_PROPERTY, BUCKET_COUNT_PROPERTY));
}
if (!bucketCount.isPresent() && !bucketColumnHandles.isEmpty()) {
throw new PrestoException(INVALID_TABLE_PROPERTY, format("Must specify '%s' along with '%s'", BUCKET_COUNT_PROPERTY, BUCKETED_ON_PROPERTY));
}
ImmutableList.Builder<Type> bucketColumnTypes = ImmutableList.builder();
for (RaptorColumnHandle column : bucketColumnHandles) {
if (!column.getColumnType().equals(BIGINT)) {
throw new PrestoException(NOT_SUPPORTED, "Bucketing is only supported for BIGINT columns");
}
bucketColumnTypes.add(column.getColumnType());
}

long distributionId;
String distributionName = getDistributionName(properties);
if (distributionName != null) {
if (bucketColumnHandles.isEmpty()) {
throw new PrestoException(INVALID_TABLE_PROPERTY, format("Must specify '%s' along with '%s'", BUCKETED_ON_PROPERTY, DISTRIBUTION_NAME_PROPERTY));
}

Distribution distribution = dao.getDistribution(distributionName);
if (distribution == null) {
if (!bucketCount.isPresent()) {
throw new PrestoException(INVALID_TABLE_PROPERTY, "Distribution does not exist and bucket count is not specified");
}
distribution = getOrCreateDistribution(distributionName, bucketColumnTypes.build(), bucketCount.getAsInt());
}
distributionId = distribution.getId();

if (bucketCount.isPresent() && (distribution.getBucketCount() != bucketCount.getAsInt())) {
throw new PrestoException(INVALID_TABLE_PROPERTY, "Bucket count must match distribution");
}
if (!distribution.getColumnTypes().equals(bucketColumnTypes.build())) {
throw new PrestoException(INVALID_TABLE_PROPERTY, "Bucket column types must match distribution");
}
}
else if (bucketCount.isPresent()) {
String types = Distribution.serializeColumnTypes(bucketColumnTypes.build());
distributionId = dao.insertDistribution(null, types, bucketCount.getAsInt());
}
else {
return Optional.empty();
}

return Optional.of(new DistributionInfo(distributionId, bucketCount.getAsInt(), bucketColumnHandles));
}

private Distribution getOrCreateDistribution(String name, List<Type> columnTypes, int bucketCount)
{
String types = Distribution.serializeColumnTypes(columnTypes);
runIgnoringConstraintViolation(() -> dao.insertDistribution(name, types, bucketCount));

Distribution distribution = dao.getDistribution(name);
if (distribution == null) {
throw new PrestoException(RAPTOR_ERROR, "Distribution does not exist after insert");
}
return distribution;
}

private static Optional<RaptorColumnHandle> getTemporalColumnHandle(String temporalColumn, Map<String, RaptorColumnHandle> columnHandleMap)
Expand Down Expand Up @@ -421,8 +478,8 @@ public void finishCreateTable(ConnectorSession session, ConnectorOutputTableHand
long newTableId = runTransaction(dbi, (dbiHandle, status) -> {
MetadataDao dao = dbiHandle.attach(MetadataDao.class);

Integer bucketCount = table.getBucketCount().isPresent() ? table.getBucketCount().getAsInt() : null;
long tableId = dao.insertTable(table.getSchemaName(), table.getTableName(), true, bucketCount);
Long distributionId = table.getDistributionId().isPresent() ? table.getDistributionId().getAsLong() : null;
long tableId = dao.insertTable(table.getSchemaName(), table.getTableName(), true, distributionId);

List<RaptorColumnHandle> sortColumnHandles = table.getSortColumnHandles();
List<RaptorColumnHandle> bucketColumnHandles = table.getBucketColumnHandles();
Expand Down Expand Up @@ -667,4 +724,33 @@ public void rollback()
shardManager.rollbackTransaction(transactionId);
}
}

private static class DistributionInfo
{
private final long distributionId;
private final int bucketCount;
private final List<RaptorColumnHandle> bucketColumns;

public DistributionInfo(long distributionId, int bucketCount, List<RaptorColumnHandle> bucketColumns)
{
this.distributionId = distributionId;
this.bucketCount = bucketCount;
this.bucketColumns = ImmutableList.copyOf(requireNonNull(bucketColumns, "bucketColumns is null"));
}

public long getDistributionId()
{
return distributionId;
}

public int getBucketCount()
{
return bucketCount;
}

public List<RaptorColumnHandle> getBucketColumns()
{
return bucketColumns;
}
}
}
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.raptor;

import com.facebook.presto.raptor.metadata.Distribution;
import com.facebook.presto.raptor.metadata.ForMetadata;
import com.facebook.presto.raptor.metadata.ShardDelta;
import com.facebook.presto.raptor.metadata.ShardInfo;
Expand Down Expand Up @@ -72,6 +73,7 @@ public IDBI createDBI(@ForMetadata ConnectionFactory connectionFactory, TypeMana
{
DBI dbi = new DBI(connectionFactory);
dbi.registerMapper(new TableColumn.Mapper(typeManager));
dbi.registerMapper(new Distribution.Mapper(typeManager));
return dbi;
}
}
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;

import static com.facebook.presto.raptor.util.MetadataUtil.checkSchemaName;
import static com.facebook.presto.raptor.util.MetadataUtil.checkTableName;
Expand All @@ -41,6 +42,7 @@ public class RaptorOutputTableHandle
private final List<RaptorColumnHandle> sortColumnHandles;
private final List<SortOrder> sortOrders;
private final Optional<RaptorColumnHandle> temporalColumnHandle;
private final OptionalLong distributionId;
private final OptionalInt bucketCount;
private final List<RaptorColumnHandle> bucketColumnHandles;

Expand All @@ -56,6 +58,7 @@ public RaptorOutputTableHandle(
@JsonProperty("sortColumnHandles") List<RaptorColumnHandle> sortColumnHandles,
@JsonProperty("sortOrders") List<SortOrder> sortOrders,
@JsonProperty("temporalColumnHandle") Optional<RaptorColumnHandle> temporalColumnHandle,
@JsonProperty("distributionId") OptionalLong distributionId,
@JsonProperty("bucketCount") OptionalInt bucketCount,
@JsonProperty("bucketColumnHandles") List<RaptorColumnHandle> bucketColumnHandles)
{
Expand All @@ -69,6 +72,7 @@ public RaptorOutputTableHandle(
this.sortOrders = requireNonNull(sortOrders, "sortOrders is null");
this.sortColumnHandles = requireNonNull(sortColumnHandles, "sortColumnHandles is null");
this.temporalColumnHandle = requireNonNull(temporalColumnHandle, "temporalColumnHandle is null");
this.distributionId = requireNonNull(distributionId, "distributionId is null");
this.bucketCount = requireNonNull(bucketCount, "bucketCount is null");
this.bucketColumnHandles = ImmutableList.copyOf(requireNonNull(bucketColumnHandles, "bucketColumnHandles is null"));
}
Expand Down Expand Up @@ -133,6 +137,12 @@ public Optional<RaptorColumnHandle> getTemporalColumnHandle()
return temporalColumnHandle;
}

@JsonProperty
public OptionalLong getDistributionId()
{
return distributionId;
}

@JsonProperty
public OptionalInt getBucketCount()
{
Expand Down
Expand Up @@ -35,6 +35,7 @@ public class RaptorTableProperties
public static final String TEMPORAL_COLUMN_PROPERTY = "temporal_column";
public static final String BUCKET_COUNT_PROPERTY = "bucket_count";
public static final String BUCKETED_ON_PROPERTY = "bucketed_on";
public static final String DISTRIBUTION_NAME_PROPERTY = "distribution_name";

private final List<PropertyMetadata<?>> tableProperties;

Expand All @@ -58,6 +59,9 @@ public RaptorTableProperties(TypeManager typeManager)
typeManager,
BUCKETED_ON_PROPERTY,
"Table columns on which to bucket the table"))
.add(lowerCaseStringSessionProperty(
DISTRIBUTION_NAME_PROPERTY,
"Shared distribution name for colocated tables"))
.build();
}

Expand Down Expand Up @@ -87,6 +91,11 @@ public static List<String> getBucketColumns(Map<String, Object> tableProperties)
return stringList(tableProperties.get(BUCKETED_ON_PROPERTY));
}

public static String getDistributionName(Map<String, Object> tableProperties)
{
return (String) tableProperties.get(DISTRIBUTION_NAME_PROPERTY);
}

public static PropertyMetadata<String> lowerCaseStringSessionProperty(String name, String description)
{
return new PropertyMetadata<>(
Expand Down
@@ -0,0 +1,114 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.raptor.metadata;

import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

import javax.inject.Inject;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;

import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_ERROR;
import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
import static io.airlift.json.JsonCodec.listJsonCodec;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

public class Distribution
{
private static final JsonCodec<List<String>> LIST_CODEC = listJsonCodec(String.class);

private final long id;
private final Optional<String> name;
private final List<Type> columnTypes;
private final int bucketCount;

public Distribution(long id, Optional<String> name, List<Type> columnTypes, int bucketCount)
{
this.id = id;
this.name = requireNonNull(name, "name is null");
this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
this.bucketCount = bucketCount;
}

public long getId()
{
return id;
}

public Optional<String> getName()
{
return name;
}

public List<Type> getColumnTypes()
{
return columnTypes;
}

public int getBucketCount()
{
return bucketCount;
}

public static class Mapper
implements ResultSetMapper<Distribution>
{
private final TypeManager typeManager;

@Inject
public Mapper(TypeManager typeManager)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

@Override
public Distribution map(int index, ResultSet rs, StatementContext ctx)
throws SQLException
{
List<String> typeNames = LIST_CODEC.fromJson(rs.getString("column_types"));

ImmutableList.Builder<Type> types = ImmutableList.builder();
for (String typeName : typeNames) {
Type type = typeManager.getType(parseTypeSignature(typeName));
if (type == null) {
throw new PrestoException(RAPTOR_ERROR, "Unknown distribution column type: " + typeName);
}
types.add(type);
}

return new Distribution(
rs.getLong("distribution_id"),
Optional.ofNullable(rs.getString("distribution_name")),
types.build(),
rs.getInt("bucket_count"));
}
}

public static String serializeColumnTypes(List<Type> columnTypes)
{
return LIST_CODEC.toJson(columnTypes.stream()
.map(type -> type.getTypeSignature().toString())
.collect(toList()));
}
}

0 comments on commit d7dd2dc

Please sign in to comment.