Skip to content

Commit

Permalink
Pass table handles to beginQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Aug 4, 2019
1 parent 8e3d74d commit 24a40be
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 21 deletions.
Expand Up @@ -13,7 +13,8 @@
*/
package io.prestosql.execution;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.SetThreadName;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -42,6 +43,7 @@
import io.prestosql.server.BasicQueryInfo;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.QueryId;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.split.SplitManager;
import io.prestosql.split.SplitSource;
import io.prestosql.sql.analyzer.Analysis;
Expand Down Expand Up @@ -71,7 +73,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -315,7 +316,7 @@ public void start()
// analyze query
PlanRoot plan = analyzeQuery();

metadata.beginQuery(getSession(), plan.getConnectors());
metadata.beginQuery(getSession(), plan.getTableHandles());

// plan distribution of query
planDistribution(plan);
Expand Down Expand Up @@ -396,23 +397,23 @@ private PlanRoot doAnalyzeQuery()
stateMachine.endAnalysis();

boolean explainAnalyze = analysis.getStatement() instanceof Explain && ((Explain) analysis.getStatement()).isAnalyze();
return new PlanRoot(fragmentedPlan, !explainAnalyze, extractConnectors(analysis));
return new PlanRoot(fragmentedPlan, !explainAnalyze, extractTableHandles(analysis));
}

private static Set<CatalogName> extractConnectors(Analysis analysis)
private static Multimap<CatalogName, ConnectorTableHandle> extractTableHandles(Analysis analysis)
{
ImmutableSet.Builder<CatalogName> connectors = ImmutableSet.builder();
ImmutableMultimap.Builder<CatalogName, ConnectorTableHandle> tableHandles = ImmutableMultimap.builder();

for (TableHandle tableHandle : analysis.getTables()) {
connectors.add(tableHandle.getCatalogName());
tableHandles.put(tableHandle.getCatalogName(), tableHandle.getConnectorHandle());
}

if (analysis.getInsert().isPresent()) {
TableHandle target = analysis.getInsert().get().getTarget();
connectors.add(target.getCatalogName());
tableHandles.put(target.getCatalogName(), target.getConnectorHandle());
}

return connectors.build();
return tableHandles.build();
}

private void planDistribution(PlanRoot plan)
Expand Down Expand Up @@ -598,13 +599,13 @@ private static class PlanRoot
{
private final SubPlan root;
private final boolean summarizeTaskInfos;
private final Set<CatalogName> connectors;
private final Multimap<CatalogName, ConnectorTableHandle> tableHandles;

public PlanRoot(SubPlan root, boolean summarizeTaskInfos, Set<CatalogName> connectors)
public PlanRoot(SubPlan root, boolean summarizeTaskInfos, Multimap<CatalogName, ConnectorTableHandle> tableHandles)
{
this.root = requireNonNull(root, "root is null");
this.summarizeTaskInfos = summarizeTaskInfos;
this.connectors = ImmutableSet.copyOf(connectors);
this.tableHandles = ImmutableMultimap.copyOf(requireNonNull(tableHandles, "tableHandles is null"));
}

public SubPlan getRoot()
Expand All @@ -617,9 +618,9 @@ public boolean isSummarizeTaskInfos()
return summarizeTaskInfos;
}

public Set<CatalogName> getConnectors()
public Multimap<CatalogName, ConnectorTableHandle> getTableHandles()
{
return connectors;
return tableHandles;
}
}

Expand Down
Expand Up @@ -13,6 +13,7 @@
*/
package io.prestosql.metadata;

import com.google.common.collect.Multimap;
import io.airlift.slice.Slice;
import io.prestosql.Session;
import io.prestosql.connector.CatalogName;
Expand All @@ -27,6 +28,7 @@
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorCapabilities;
import io.prestosql.spi.connector.ConnectorOutputMetadata;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorViewDefinition;
import io.prestosql.spi.connector.Constraint;
Expand Down Expand Up @@ -227,7 +229,7 @@ public interface Metadata
/**
* Start a SELECT/UPDATE/INSERT/DELETE query
*/
void beginQuery(Session session, Set<CatalogName> connectors);
void beginQuery(Session session, Multimap<CatalogName, ConnectorTableHandle> connectors);

/**
* Cleanup after a query. This is the very last notification after the query finishes, regardless if it succeeds or fails.
Expand Down
Expand Up @@ -704,12 +704,12 @@ public Optional<NewTableLayout> getNewTableLayout(Session session, String catalo
}

@Override
public void beginQuery(Session session, Set<CatalogName> connectors)
public void beginQuery(Session session, Multimap<CatalogName, ConnectorTableHandle> tableHandles)
{
for (CatalogName catalogName : connectors) {
ConnectorMetadata metadata = getMetadata(session, catalogName);
ConnectorSession connectorSession = session.toConnectorSession(catalogName);
metadata.beginQuery(connectorSession);
for (Entry<CatalogName, Collection<ConnectorTableHandle>> entry : tableHandles.asMap().entrySet()) {
ConnectorMetadata metadata = getMetadata(session, entry.getKey());
ConnectorSession connectorSession = session.toConnectorSession(entry.getKey());
metadata.beginQuery(connectorSession, entry.getValue());
registerCatalogForQueryId(session.getQueryId(), metadata);
}
}
Expand Down
Expand Up @@ -15,6 +15,7 @@

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import io.airlift.slice.Slice;
import io.prestosql.Session;
import io.prestosql.connector.CatalogName;
Expand All @@ -29,6 +30,7 @@
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorCapabilities;
import io.prestosql.spi.connector.ConnectorOutputMetadata;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorViewDefinition;
import io.prestosql.spi.connector.Constraint;
Expand Down Expand Up @@ -289,7 +291,7 @@ public void finishStatisticsCollection(Session session, AnalyzeTableHandle table
}

@Override
public void beginQuery(Session session, Set<CatalogName> connectors)
public void beginQuery(Session session, Multimap<CatalogName, ConnectorTableHandle> connectors)
{
throw new UnsupportedOperationException();
}
Expand Down
Expand Up @@ -390,9 +390,20 @@ default Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession ses

/**
* Start a SELECT/UPDATE/INSERT/DELETE query. This notification is triggered after the planning phase completes.
*
* @deprecated Use {@link #beginQuery(ConnectorSession, Collection)} instead.
*/
@Deprecated
default void beginQuery(ConnectorSession session) {}

/**
* Start a SELECT/UPDATE/INSERT/DELETE query. This notification is triggered after the planning phase completes.
*/
default void beginQuery(ConnectorSession session, Collection<ConnectorTableHandle> tableHandles)
{
beginQuery(session);
}

/**
* Cleanup after a SELECT/UPDATE/INSERT/DELETE query. This is the very last notification after the query finishes, whether it succeeds or fails.
* An exception thrown in this method will not affect the result of the query.
Expand Down
Expand Up @@ -371,6 +371,14 @@ public void beginQuery(ConnectorSession session)
}
}

@Override
public void beginQuery(ConnectorSession session, Collection<ConnectorTableHandle> tableHandles)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
delegate.beginQuery(session, tableHandles);
}
}

@Override
public void cleanupQuery(ConnectorSession session)
{
Expand Down

0 comments on commit 24a40be

Please sign in to comment.