Skip to content

Commit

Permalink
Add transactional index interface
Browse files Browse the repository at this point in the history
- Merged the metadata calls from IndexResolver into Metadata
  • Loading branch information
erichwang committed Dec 28, 2015
1 parent 385b49f commit 04c5515
Show file tree
Hide file tree
Showing 18 changed files with 165 additions and 84 deletions.
Expand Up @@ -21,9 +21,9 @@
import com.facebook.presto.security.AccessControlManager;
import com.facebook.presto.spi.ConnectorFactory;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexResolver;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.TransactionalConnectorIndexProvider;
import com.facebook.presto.spi.TransactionalConnectorPageSinkProvider;
import com.facebook.presto.spi.TransactionalConnectorPageSourceProvider;
import com.facebook.presto.spi.TransactionalConnectorRecordSetProvider;
Expand Down Expand Up @@ -238,10 +238,10 @@ private synchronized void addConnectorInternal(ConnectorType type, String catalo
}
}

ConnectorIndexResolver indexResolver = null;
TransactionalConnectorIndexProvider indexProvider = null;
try {
indexResolver = connector.getIndexResolver();
requireNonNull(indexResolver, format("Connector %s returned a null index resolver", connectorId));
indexProvider = connector.getIndexProvider();
requireNonNull(indexProvider, format("Connector %s returned a null index provider", connectorId));
}
catch (UnsupportedOperationException ignored) {
}
Expand Down Expand Up @@ -282,8 +282,8 @@ else if (type == ConnectorType.SYSTEM) {
pageSinkManager.addConnectorPageSinkProvider(connectorId, connectorPageSinkProvider);
}

if (indexResolver != null) {
indexManager.addIndexResolver(connectorId, indexResolver);
if (indexProvider != null) {
indexManager.addIndexProvider(connectorId, indexProvider);
}

if (accessControl != null) {
Expand Down
Expand Up @@ -15,18 +15,12 @@

import com.facebook.presto.Session;
import com.facebook.presto.metadata.IndexHandle;
import com.facebook.presto.metadata.ResolvedIndex;
import com.facebook.presto.metadata.TableHandle;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorIndex;
import com.facebook.presto.spi.ConnectorIndexResolver;
import com.facebook.presto.spi.ConnectorResolvedIndex;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.TransactionalConnectorIndexProvider;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand All @@ -35,49 +29,24 @@

public class IndexManager
{
private final ConcurrentMap<String, ConnectorIndexResolver> resolvers = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TransactionalConnectorIndexProvider> providers = new ConcurrentHashMap<>();

public void addIndexResolver(String connectorId, ConnectorIndexResolver resolver)
public void addIndexProvider(String connectorId, TransactionalConnectorIndexProvider provider)
{
checkState(resolvers.putIfAbsent(connectorId, resolver) == null, "IndexResolver for connector '%s' is already registered", connectorId);
}

public Optional<ResolvedIndex> resolveIndex(
Session session,
TableHandle tableHandle,
Set<ColumnHandle> indexableColumns,
Set<ColumnHandle> outputColumns,
TupleDomain<ColumnHandle> tupleDomain)
{
ConnectorIndexResolver resolver = resolvers.get(tableHandle.getConnectorId());
if (resolver == null) {
return Optional.empty();
}

ConnectorSession connectorSession = session.toConnectorSession(tableHandle.getConnectorId());
ConnectorResolvedIndex resolved = resolver.resolveIndex(connectorSession, tableHandle.getConnectorHandle(), indexableColumns, outputColumns, tupleDomain);

if (resolved == null) {
return Optional.empty();
}

return Optional.of(new ResolvedIndex(tableHandle.getConnectorId(), resolved));
checkState(providers.putIfAbsent(connectorId, provider) == null, "IndexProvider for connector '%s' is already registered", connectorId);
}

public ConnectorIndex getIndex(Session session, IndexHandle indexHandle, List<ColumnHandle> lookupSchema, List<ColumnHandle> outputSchema)
{
// assumes connectorId and catalog are the same
ConnectorSession connectorSession = session.toConnectorSession(indexHandle.getConnectorId());
return getResolver(indexHandle)
.getIndex(connectorSession, indexHandle.getConnectorHandle(), lookupSchema, outputSchema);
TransactionalConnectorIndexProvider provider = getProvider(indexHandle);
return provider.getIndex(indexHandle.getTransactionHandle(), connectorSession, indexHandle.getConnectorHandle(), lookupSchema, outputSchema);
}

private ConnectorIndexResolver getResolver(IndexHandle handle)
private TransactionalConnectorIndexProvider getProvider(IndexHandle handle)
{
ConnectorIndexResolver result = resolvers.get(handle.getConnectorId());

checkArgument(result != null, "No index resolver for connector '%s'", handle.getConnectorId());

TransactionalConnectorIndexProvider result = providers.get(handle.getConnectorId());
checkArgument(result != null, "No index provider for connector '%s'", handle.getConnectorId());
return result;
}
}
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.metadata;

import com.facebook.presto.spi.ConnectorIndexHandle;
import com.facebook.presto.spi.transaction.ConnectorTransactionHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

Expand All @@ -24,14 +25,17 @@
public final class IndexHandle
{
private final String connectorId;
private final ConnectorTransactionHandle transactionHandle;
private final ConnectorIndexHandle connectorHandle;

@JsonCreator
public IndexHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle,
@JsonProperty("connectorHandle") ConnectorIndexHandle connectorHandle)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
this.connectorHandle = requireNonNull(connectorHandle, "connectorHandle is null");
}

Expand All @@ -41,6 +45,12 @@ public String getConnectorId()
return connectorId;
}

@JsonProperty
public ConnectorTransactionHandle getTransactionHandle()
{
return transactionHandle;
}

@JsonProperty
public ConnectorIndexHandle getConnectorHandle()
{
Expand All @@ -50,7 +60,7 @@ public ConnectorIndexHandle getConnectorHandle()
@Override
public int hashCode()
{
return Objects.hash(connectorId, connectorHandle);
return Objects.hash(connectorId, transactionHandle, connectorHandle);
}

@Override
Expand All @@ -64,12 +74,13 @@ public boolean equals(Object obj)
}
final IndexHandle other = (IndexHandle) obj;
return Objects.equals(this.connectorId, other.connectorId) &&
Objects.equals(this.transactionHandle, other.transactionHandle) &&
Objects.equals(this.connectorHandle, other.connectorHandle);
}

@Override
public String toString()
{
return connectorId + ":" + connectorHandle;
return connectorId + ":" + transactionHandle + ":" + connectorHandle;
}
}
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spi.type.TypeSignature;
Expand Down Expand Up @@ -222,6 +223,11 @@ public interface Metadata
*/
void dropView(Session session, QualifiedObjectName viewName);

/**
* Try to locate a table index that can lookup results by indexableColumns and provide the requested outputColumns.
*/
Optional<ResolvedIndex> resolveIndex(Session session, TableHandle tableHandle, Set<ColumnHandle> indexableColumns, Set<ColumnHandle> outputColumns, TupleDomain<ColumnHandle> tupleDomain);

FunctionRegistry getFunctionRegistry();

TypeManager getTypeManager();
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorPartition;
import com.facebook.presto.spi.ConnectorPartitionResult;
import com.facebook.presto.spi.ConnectorResolvedIndex;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayout;
Expand Down Expand Up @@ -683,6 +684,17 @@ public void dropView(Session session, QualifiedObjectName viewName)
metadata.dropView(session.toConnectorSession(entry.getCatalog()), viewName.asSchemaTableName());
}

@Override
public Optional<ResolvedIndex> resolveIndex(Session session, TableHandle tableHandle, Set<ColumnHandle> indexableColumns, Set<ColumnHandle> outputColumns, TupleDomain<ColumnHandle> tupleDomain)
{
ConnectorEntry entry = lookupConnectorFor(tableHandle);
TransactionalConnectorMetadata metadata = entry.getMetadata(session);
ConnectorTransactionHandle transaction = entry.getTransactionHandle(session);
ConnectorSession connectorSession = session.toConnectorSession(entry.getCatalog());
Optional<ConnectorResolvedIndex> resolvedIndex = metadata.resolveIndex(connectorSession, tableHandle.getConnectorHandle(), indexableColumns, outputColumns, tupleDomain);
return resolvedIndex.map(resolved -> new ResolvedIndex(tableHandle.getConnectorId(), transaction, resolved));
}

@Override
public FunctionRegistry getFunctionRegistry()
{
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorResolvedIndex;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.transaction.ConnectorTransactionHandle;

import static java.util.Objects.requireNonNull;

Expand All @@ -24,12 +25,12 @@ public final class ResolvedIndex
private final IndexHandle indexHandle;
private final TupleDomain<ColumnHandle> undeterminedTupleDomain;

public ResolvedIndex(String connectorId, ConnectorResolvedIndex index)
public ResolvedIndex(String connectorId, ConnectorTransactionHandle transactionHandle, ConnectorResolvedIndex index)
{
requireNonNull(connectorId, "connectorId is null");
requireNonNull(index, "index is null");

indexHandle = new IndexHandle(connectorId, index.getIndexHandle());
indexHandle = new IndexHandle(connectorId, transactionHandle, index.getIndexHandle());
undeterminedTupleDomain = index.getUnresolvedTupleDomain();
}

Expand Down
Expand Up @@ -13,7 +13,6 @@
*/
package com.facebook.presto.sql.planner;

import com.facebook.presto.index.IndexManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.parser.SqlParser;
Expand Down Expand Up @@ -54,12 +53,12 @@ public class PlanOptimizersFactory
private final List<PlanOptimizer> optimizers;

@Inject
public PlanOptimizersFactory(Metadata metadata, SqlParser sqlParser, IndexManager indexManager, FeaturesConfig featuresConfig)
public PlanOptimizersFactory(Metadata metadata, SqlParser sqlParser, FeaturesConfig featuresConfig)
{
this(metadata, sqlParser, indexManager, featuresConfig, false);
this(metadata, sqlParser, featuresConfig, false);
}

public PlanOptimizersFactory(Metadata metadata, SqlParser sqlParser, IndexManager indexManager, FeaturesConfig featuresConfig, boolean forceSingleNode)
public PlanOptimizersFactory(Metadata metadata, SqlParser sqlParser, FeaturesConfig featuresConfig, boolean forceSingleNode)
{
ImmutableList.Builder<PlanOptimizer> builder = ImmutableList.builder();

Expand All @@ -76,7 +75,7 @@ public PlanOptimizersFactory(Metadata metadata, SqlParser sqlParser, IndexManage
new ProjectionPushDown(),
new UnaliasSymbolReferences(), // Run again because predicate pushdown and projection pushdown might add more projections
new PruneUnreferencedOutputs(), // Make sure to run this before index join. Filtered projections may not have all the columns.
new IndexJoinOptimizer(metadata, indexManager), // Run this after projections and filters have been fully simplified and pushed down
new IndexJoinOptimizer(metadata), // Run this after projections and filters have been fully simplified and pushed down
new CountConstantOptimizer(),
new WindowFilterPushDown(metadata), // This must run after PredicatePushDown and LimitPushDown so that it squashes any successive filter nodes and limits
new HashGenerationOptimizer(), // This must run after all other optimizers have run to that all the PlanNodes are created
Expand Down
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.sql.planner.optimizations;

import com.facebook.presto.Session;
import com.facebook.presto.index.IndexManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.ResolvedIndex;
import com.facebook.presto.spi.ColumnHandle;
Expand Down Expand Up @@ -64,13 +63,11 @@
public class IndexJoinOptimizer
extends PlanOptimizer
{
private final IndexManager indexManager;
private final Metadata metadata;

public IndexJoinOptimizer(Metadata metadata, IndexManager indexManager)
public IndexJoinOptimizer(Metadata metadata)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.indexManager = requireNonNull(indexManager, "indexManager is null");
}

@Override
Expand All @@ -82,23 +79,21 @@ public PlanNode optimize(PlanNode plan, Session session, Map<Symbol, com.faceboo
requireNonNull(symbolAllocator, "symbolAllocator is null");
requireNonNull(idAllocator, "idAllocator is null");

return SimplePlanRewriter.rewriteWith(new Rewriter(symbolAllocator, idAllocator, indexManager, metadata, session), plan, null);
return SimplePlanRewriter.rewriteWith(new Rewriter(symbolAllocator, idAllocator, metadata, session), plan, null);
}

private static class Rewriter
extends SimplePlanRewriter<Void>
{
private final IndexManager indexManager;
private final SymbolAllocator symbolAllocator;
private final PlanNodeIdAllocator idAllocator;
private final Metadata metadata;
private final Session session;

private Rewriter(SymbolAllocator symbolAllocator, PlanNodeIdAllocator idAllocator, IndexManager indexManager, Metadata metadata, Session session)
private Rewriter(SymbolAllocator symbolAllocator, PlanNodeIdAllocator idAllocator, Metadata metadata, Session session)
{
this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is null");
this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
this.indexManager = requireNonNull(indexManager, "indexManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.session = requireNonNull(session, "session is null");
}
Expand All @@ -116,7 +111,6 @@ public PlanNode visitJoin(JoinNode node, RewriteContext<Void> context)
Optional<PlanNode> leftIndexCandidate = IndexSourceRewriter.rewriteWithIndex(
leftRewritten,
ImmutableSet.copyOf(leftJoinSymbols),
indexManager,
symbolAllocator,
idAllocator,
metadata,
Expand All @@ -130,7 +124,6 @@ public PlanNode visitJoin(JoinNode node, RewriteContext<Void> context)
Optional<PlanNode> rightIndexCandidate = IndexSourceRewriter.rewriteWithIndex(
rightRewritten,
ImmutableSet.copyOf(rightJoinSymbols),
indexManager,
symbolAllocator,
idAllocator,
metadata,
Expand Down Expand Up @@ -201,32 +194,29 @@ private static Symbol referenceToSymbol(Expression expression)
private static class IndexSourceRewriter
extends SimplePlanRewriter<IndexSourceRewriter.Context>
{
private final IndexManager indexManager;
private final SymbolAllocator symbolAllocator;
private final PlanNodeIdAllocator idAllocator;
private final Metadata metadata;
private final Session session;

private IndexSourceRewriter(IndexManager indexManager, SymbolAllocator symbolAllocator, PlanNodeIdAllocator idAllocator, Metadata metadata, Session session)
private IndexSourceRewriter(SymbolAllocator symbolAllocator, PlanNodeIdAllocator idAllocator, Metadata metadata, Session session)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is null");
this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
this.indexManager = requireNonNull(indexManager, "indexManager is null");
this.session = requireNonNull(session, "session is null");
}

public static Optional<PlanNode> rewriteWithIndex(
PlanNode planNode,
Set<Symbol> lookupSymbols,
IndexManager indexManager,
SymbolAllocator symbolAllocator,
PlanNodeIdAllocator idAllocator,
Metadata metadata,
Session session)
{
AtomicBoolean success = new AtomicBoolean();
IndexSourceRewriter indexSourceRewriter = new IndexSourceRewriter(indexManager, symbolAllocator, idAllocator, metadata, session);
IndexSourceRewriter indexSourceRewriter = new IndexSourceRewriter(symbolAllocator, idAllocator, metadata, session);
PlanNode rewritten = SimplePlanRewriter.rewriteWith(indexSourceRewriter, planNode, new Context(lookupSymbols, success));
if (success.get()) {
return Optional.of(rewritten);
Expand Down Expand Up @@ -268,7 +258,7 @@ private PlanNode planTableScan(TableScanNode node, Expression predicate, Context

Set<ColumnHandle> outputColumns = node.getOutputSymbols().stream().map(node.getAssignments()::get).collect(toImmutableSet());

Optional<ResolvedIndex> optionalResolvedIndex = indexManager.resolveIndex(session, node.getTable(), lookupColumns, outputColumns, simplifiedConstraint);
Optional<ResolvedIndex> optionalResolvedIndex = metadata.resolveIndex(session, node.getTable(), lookupColumns, outputColumns, simplifiedConstraint);
if (!optionalResolvedIndex.isPresent()) {
// No index available, so give up by returning something
return node;
Expand Down
Expand Up @@ -488,7 +488,7 @@ public List<Driver> createDrivers(Session session, @Language("SQL") String sql,
.setExperimentalSyntaxEnabled(true)
.setDistributedIndexJoinsEnabled(false)
.setOptimizeHashGeneration(true);
PlanOptimizersFactory planOptimizersFactory = new PlanOptimizersFactory(metadata, sqlParser, indexManager, featuresConfig, true);
PlanOptimizersFactory planOptimizersFactory = new PlanOptimizersFactory(metadata, sqlParser, featuresConfig, true);

QueryExplainer queryExplainer = new QueryExplainer(
planOptimizersFactory.get(),
Expand Down

0 comments on commit 04c5515

Please sign in to comment.