Skip to content

Commit

Permalink
Convert Raptor to use TableLayout API
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Dec 4, 2015
1 parent 9509dfb commit 42c666c
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 102 deletions.
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;


import javax.inject.Inject; import javax.inject.Inject;


Expand Down Expand Up @@ -56,6 +57,13 @@ public boolean canHandle(ConnectorSplit split)
((RaptorSplit) split).getConnectorId().equals(connectorId); ((RaptorSplit) split).getConnectorId().equals(connectorId);
} }


@Override
public boolean canHandle(ConnectorTableLayoutHandle handle)
{
return (handle instanceof RaptorTableLayoutHandle) &&
((RaptorTableLayoutHandle) handle).getTable().getConnectorId().equals(connectorId);
}

@Override @Override
public boolean canHandle(ConnectorOutputTableHandle tableHandle) public boolean canHandle(ConnectorOutputTableHandle tableHandle)
{ {
Expand All @@ -82,6 +90,12 @@ public Class<? extends ColumnHandle> getColumnHandleClass()
return RaptorColumnHandle.class; return RaptorColumnHandle.class;
} }


@Override
public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
{
return RaptorTableLayoutHandle.class;
}

@Override @Override
public Class<? extends ConnectorSplit> getSplitClass() public Class<? extends ConnectorSplit> getSplitClass()
{ {
Expand Down
Expand Up @@ -29,9 +29,12 @@
import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayout;
import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.ConnectorTableMetadata; import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.ConnectorViewDefinition; import com.facebook.presto.spi.ConnectorViewDefinition;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix; import com.facebook.presto.spi.SchemaTablePrefix;
Expand All @@ -56,6 +59,7 @@
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.Predicate; import java.util.function.Predicate;


Expand Down Expand Up @@ -239,6 +243,20 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
return Multimaps.asMap(columns.build()); return Multimaps.asMap(columns.build());
} }


@Override
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
{
RaptorTableHandle handle = checkType(table, RaptorTableHandle.class, "table");
ConnectorTableLayout layout = new ConnectorTableLayout(new RaptorTableLayoutHandle(handle, constraint.getSummary()));
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
}

@Override
public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle)
{
return new ConnectorTableLayout(handle);
}

@Override @Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{ {
Expand Down

This file was deleted.

Expand Up @@ -18,13 +18,11 @@
import com.facebook.presto.raptor.metadata.ShardNodes; import com.facebook.presto.raptor.metadata.ShardNodes;
import com.facebook.presto.raptor.util.SynchronizedResultIterator; import com.facebook.presto.raptor.util.SynchronizedResultIterator;
import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPartition;
import com.facebook.presto.spi.ConnectorPartitionResult;
import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitManager; import com.facebook.presto.spi.ConnectorSplitManager;
import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.HostAddress; import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node; import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager; import com.facebook.presto.spi.NodeManager;
Expand Down Expand Up @@ -52,9 +50,7 @@
import static com.facebook.presto.raptor.util.Types.checkType; import static com.facebook.presto.raptor.util.Types.checkType;
import static com.facebook.presto.spi.NodeState.ACTIVE; import static com.facebook.presto.spi.NodeState.ACTIVE;
import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE; import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Maps.uniqueIndex; import static com.google.common.collect.Maps.uniqueIndex;
import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static java.lang.String.format; import static java.lang.String.format;
Expand Down Expand Up @@ -93,23 +89,12 @@ public void destroy()
} }


@Override @Override
public ConnectorPartitionResult getPartitions(ConnectorSession session, ConnectorTableHandle tableHandle, TupleDomain<ColumnHandle> tupleDomain) public ConnectorSplitSource getSplits(ConnectorSession session, ConnectorTableLayoutHandle layout)
{ {
RaptorTableHandle handle = checkType(tableHandle, RaptorTableHandle.class, "table"); RaptorTableLayoutHandle handle = checkType(layout, RaptorTableLayoutHandle.class, "layout");
ConnectorPartition partition = new RaptorPartition(handle.getTableId(), tupleDomain); RaptorTableHandle table = handle.getTable();
return new ConnectorPartitionResult(ImmutableList.of(partition), tupleDomain); TupleDomain<RaptorColumnHandle> effectivePredicate = toRaptorTupleDomain(handle.getConstraint());
} return new RaptorSplitSource(table.getTableId(), effectivePredicate, table.getTransactionId());

@Override
public ConnectorSplitSource getPartitionSplits(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorPartition> partitions)
{
RaptorTableHandle raptorTableHandle = checkType(tableHandle, RaptorTableHandle.class, "tableHandle");

checkArgument(partitions.size() == 1, "expected exactly one partition");
RaptorPartition partition = checkType(getOnlyElement(partitions), RaptorPartition.class, "partition");
TupleDomain<RaptorColumnHandle> effectivePredicate = toRaptorTupleDomain(partition.getEffectivePredicate());

return new RaptorSplitSource(raptorTableHandle.getTableId(), effectivePredicate, raptorTableHandle.getTransactionId());
} }


private static List<HostAddress> getAddressesForNodes(Map<String, Node> nodeMap, Iterable<String> nodeIdentifiers) private static List<HostAddress> getAddressesForNodes(Map<String, Node> nodeMap, Iterable<String> nodeIdentifiers)
Expand Down
@@ -0,0 +1,56 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.raptor;

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import static java.util.Objects.requireNonNull;

public class RaptorTableLayoutHandle
implements ConnectorTableLayoutHandle
{
private final RaptorTableHandle table;
private final TupleDomain<ColumnHandle> constraint;

@JsonCreator
public RaptorTableLayoutHandle(
@JsonProperty("table") RaptorTableHandle table,
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint)
{
this.table = requireNonNull(table, "table is null");
this.constraint = requireNonNull(constraint, "constraint is null");
}

@JsonProperty
public RaptorTableHandle getTable()
{
return table;
}

@JsonProperty
public TupleDomain<ColumnHandle> getConstraint()
{
return constraint;
}

@Override
public String toString()
{
return table.toString();
}
}
Expand Up @@ -22,15 +22,14 @@
import com.facebook.presto.raptor.RaptorMetadata; import com.facebook.presto.raptor.RaptorMetadata;
import com.facebook.presto.raptor.RaptorSplitManager; import com.facebook.presto.raptor.RaptorSplitManager;
import com.facebook.presto.raptor.RaptorTableHandle; import com.facebook.presto.raptor.RaptorTableHandle;
import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.raptor.RaptorTableLayoutHandle;
import com.facebook.presto.spi.ConnectorPartition;
import com.facebook.presto.spi.ConnectorPartitionResult;
import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.ConnectorTableMetadata; import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.BigintType; import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.type.TypeRegistry; import com.facebook.presto.type.TypeRegistry;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
Expand All @@ -57,11 +56,11 @@
import static com.google.common.io.Files.createTempDir; import static com.google.common.io.Files.createTempDir;
import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.json.JsonCodec.jsonCodec; import static io.airlift.json.JsonCodec.jsonCodec;
import static io.airlift.testing.Assertions.assertInstanceOf;
import static io.airlift.testing.FileUtils.deleteRecursively; import static io.airlift.testing.FileUtils.deleteRecursively;
import static java.lang.String.format; import static java.lang.String.format;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;


@Test(singleThreaded = true) @Test(singleThreaded = true)
public class TestRaptorSplitManager public class TestRaptorSplitManager
Expand All @@ -76,6 +75,7 @@ public class TestRaptorSplitManager


private Handle dummyHandle; private Handle dummyHandle;
private File temporary; private File temporary;
private RaptorMetadata metadata;
private RaptorSplitManager raptorSplitManager; private RaptorSplitManager raptorSplitManager;
private ConnectorTableHandle tableHandle; private ConnectorTableHandle tableHandle;
private ShardManager shardManager; private ShardManager shardManager;
Expand All @@ -97,7 +97,7 @@ public void setup()
nodeManager.addNode("raptor", new PrestoNode(nodeName, new URI("http://127.0.0.1/"), NodeVersion.UNKNOWN)); nodeManager.addNode("raptor", new PrestoNode(nodeName, new URI("http://127.0.0.1/"), NodeVersion.UNKNOWN));


RaptorConnectorId connectorId = new RaptorConnectorId("raptor"); RaptorConnectorId connectorId = new RaptorConnectorId("raptor");
RaptorMetadata metadata = new RaptorMetadata(connectorId, dbi, shardManager, SHARD_INFO_CODEC, SHARD_DELTA_CODEC); metadata = new RaptorMetadata(connectorId, dbi, shardManager, SHARD_INFO_CODEC, SHARD_DELTA_CODEC);


metadata.createTable(SESSION, TEST_TABLE); metadata.createTable(SESSION, TEST_TABLE);
tableHandle = metadata.getTableHandle(SESSION, TEST_TABLE.getTable()); tableHandle = metadata.getTableHandle(SESSION, TEST_TABLE.getTable());
Expand Down Expand Up @@ -133,16 +133,12 @@ public void teardown()
public void testSanity() public void testSanity()
throws InterruptedException throws InterruptedException
{ {
ConnectorPartitionResult partitionResult = raptorSplitManager.getPartitions(SESSION, tableHandle, TupleDomain.<ColumnHandle>all()); List<ConnectorTableLayoutResult> layouts = metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty());
assertEquals(partitionResult.getPartitions().size(), 1); assertEquals(layouts.size(), 1);
assertTrue(partitionResult.getUndeterminedTupleDomain().isAll()); ConnectorTableLayoutResult layout = getOnlyElement(layouts);
assertInstanceOf(layout.getTableLayout().getHandle(), RaptorTableLayoutHandle.class);


List<ConnectorPartition> partitions = partitionResult.getPartitions(); ConnectorSplitSource splitSource = raptorSplitManager.getSplits(SESSION, layout.getTableLayout().getHandle());
ConnectorPartition partition = getOnlyElement(partitions);
TupleDomain<ColumnHandle> columnUnionedTupleDomain = TupleDomain.columnWiseUnion(partition.getTupleDomain(), partition.getTupleDomain());
assertEquals(columnUnionedTupleDomain, TupleDomain.<ColumnHandle>all());

ConnectorSplitSource splitSource = raptorSplitManager.getPartitionSplits(SESSION, tableHandle, partitions);
int splitCount = 0; int splitCount = 0;
while (!splitSource.isFinished()) { while (!splitSource.isFinished()) {
splitCount += getFutureValue(splitSource.getNextBatch(1000)).size(); splitCount += getFutureValue(splitSource.getNextBatch(1000)).size();
Expand All @@ -156,9 +152,8 @@ public void testNoHostForShard()
{ {
deleteShardNodes(); deleteShardNodes();


ConnectorPartitionResult result = raptorSplitManager.getPartitions(SESSION, tableHandle, TupleDomain.<ColumnHandle>all()); ConnectorTableLayoutResult layout = getOnlyElement(metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty()));

ConnectorSplitSource splitSource = raptorSplitManager.getSplits(SESSION, layout.getTableLayout().getHandle());
ConnectorSplitSource splitSource = raptorSplitManager.getPartitionSplits(SESSION, tableHandle, result.getPartitions());
getFutureValue(splitSource.getNextBatch(1000)); getFutureValue(splitSource.getNextBatch(1000));
} }


Expand All @@ -173,8 +168,8 @@ public void testAssignRandomNodeWhenBackupAvailable()


deleteShardNodes(); deleteShardNodes();


ConnectorPartitionResult result = raptorSplitManagerWithBackup.getPartitions(SESSION, tableHandle, TupleDomain.<ColumnHandle>all()); ConnectorTableLayoutResult layout = getOnlyElement(metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty()));
ConnectorSplitSource partitionSplit = raptorSplitManagerWithBackup.getPartitionSplits(SESSION, tableHandle, result.getPartitions()); ConnectorSplitSource partitionSplit = raptorSplitManagerWithBackup.getSplits(SESSION, layout.getTableLayout().getHandle());
List<ConnectorSplit> batch = getFutureValue(partitionSplit.getNextBatch(1), PrestoException.class); List<ConnectorSplit> batch = getFutureValue(partitionSplit.getNextBatch(1), PrestoException.class);
assertEquals(getOnlyElement(getOnlyElement(batch).getAddresses()), node.getHostAndPort()); assertEquals(getOnlyElement(getOnlyElement(batch).getAddresses()), node.getHostAndPort());
} }
Expand All @@ -186,8 +181,8 @@ public void testNoNodes()
deleteShardNodes(); deleteShardNodes();


RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new RaptorConnectorId("fbraptor"), new InMemoryNodeManager(), shardManager, true); RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new RaptorConnectorId("fbraptor"), new InMemoryNodeManager(), shardManager, true);
ConnectorPartitionResult result = raptorSplitManagerWithBackup.getPartitions(SESSION, tableHandle, TupleDomain.<ColumnHandle>all()); ConnectorTableLayoutResult layout = getOnlyElement(metadata.getTableLayouts(SESSION, tableHandle, Constraint.alwaysTrue(), Optional.empty()));
ConnectorSplitSource splitSource = raptorSplitManagerWithBackup.getPartitionSplits(SESSION, tableHandle, result.getPartitions()); ConnectorSplitSource splitSource = raptorSplitManagerWithBackup.getSplits(SESSION, layout.getTableLayout().getHandle());
getFutureValue(splitSource.getNextBatch(1000), PrestoException.class); getFutureValue(splitSource.getNextBatch(1000), PrestoException.class);
} }


Expand Down

0 comments on commit 42c666c

Please sign in to comment.