diff --git a/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java b/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java index 6f0a2bea3374f..ab8c98736aea0 100644 --- a/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java +++ b/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java @@ -13,7 +13,8 @@ */ package io.prestosql.execution; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.concurrent.SetThreadName; import io.airlift.log.Logger; @@ -42,6 +43,7 @@ import io.prestosql.server.BasicQueryInfo; import io.prestosql.spi.PrestoException; import io.prestosql.spi.QueryId; +import io.prestosql.spi.connector.ConnectorTableHandle; import io.prestosql.split.SplitManager; import io.prestosql.split.SplitSource; import io.prestosql.sql.analyzer.Analysis; @@ -71,7 +73,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -315,7 +316,7 @@ public void start() // analyze query PlanRoot plan = analyzeQuery(); - metadata.beginQuery(getSession(), plan.getConnectors()); + metadata.beginQuery(getSession(), plan.getTableHandles()); // plan distribution of query planDistribution(plan); @@ -396,23 +397,23 @@ private PlanRoot doAnalyzeQuery() stateMachine.endAnalysis(); boolean explainAnalyze = analysis.getStatement() instanceof Explain && ((Explain) analysis.getStatement()).isAnalyze(); - return new PlanRoot(fragmentedPlan, !explainAnalyze, extractConnectors(analysis)); + return new PlanRoot(fragmentedPlan, !explainAnalyze, extractTableHandles(analysis)); } - private static Set extractConnectors(Analysis analysis) + private static Multimap extractTableHandles(Analysis analysis) { - ImmutableSet.Builder connectors = ImmutableSet.builder(); + ImmutableMultimap.Builder tableHandles = ImmutableMultimap.builder(); for (TableHandle tableHandle : analysis.getTables()) { - connectors.add(tableHandle.getCatalogName()); + tableHandles.put(tableHandle.getCatalogName(), tableHandle.getConnectorHandle()); } if (analysis.getInsert().isPresent()) { TableHandle target = analysis.getInsert().get().getTarget(); - connectors.add(target.getCatalogName()); + tableHandles.put(target.getCatalogName(), target.getConnectorHandle()); } - return connectors.build(); + return tableHandles.build(); } private void planDistribution(PlanRoot plan) @@ -598,13 +599,13 @@ private static class PlanRoot { private final SubPlan root; private final boolean summarizeTaskInfos; - private final Set connectors; + private final Multimap tableHandles; - public PlanRoot(SubPlan root, boolean summarizeTaskInfos, Set connectors) + public PlanRoot(SubPlan root, boolean summarizeTaskInfos, Multimap tableHandles) { this.root = requireNonNull(root, "root is null"); this.summarizeTaskInfos = summarizeTaskInfos; - this.connectors = ImmutableSet.copyOf(connectors); + this.tableHandles = ImmutableMultimap.copyOf(requireNonNull(tableHandles, "tableHandles is null")); } public SubPlan getRoot() @@ -617,9 +618,9 @@ public boolean isSummarizeTaskInfos() return summarizeTaskInfos; } - public Set getConnectors() + public Multimap getTableHandles() { - return connectors; + return tableHandles; } } diff --git a/presto-main/src/main/java/io/prestosql/metadata/Metadata.java b/presto-main/src/main/java/io/prestosql/metadata/Metadata.java index b726729f74dcd..f4e23543c112f 100644 --- a/presto-main/src/main/java/io/prestosql/metadata/Metadata.java +++ b/presto-main/src/main/java/io/prestosql/metadata/Metadata.java @@ -13,6 +13,7 @@ */ package io.prestosql.metadata; +import com.google.common.collect.Multimap; import io.airlift.slice.Slice; import io.prestosql.Session; import io.prestosql.connector.CatalogName; @@ -27,6 +28,7 @@ import io.prestosql.spi.connector.ColumnMetadata; import io.prestosql.spi.connector.ConnectorCapabilities; import io.prestosql.spi.connector.ConnectorOutputMetadata; +import io.prestosql.spi.connector.ConnectorTableHandle; import io.prestosql.spi.connector.ConnectorTableMetadata; import io.prestosql.spi.connector.ConnectorViewDefinition; import io.prestosql.spi.connector.Constraint; @@ -227,7 +229,7 @@ public interface Metadata /** * Start a SELECT/UPDATE/INSERT/DELETE query */ - void beginQuery(Session session, Set connectors); + void beginQuery(Session session, Multimap connectors); /** * Cleanup after a query. This is the very last notification after the query finishes, regardless if it succeeds or fails. diff --git a/presto-main/src/main/java/io/prestosql/metadata/MetadataManager.java b/presto-main/src/main/java/io/prestosql/metadata/MetadataManager.java index 9040f77d072f6..9c2f65a3b1828 100644 --- a/presto-main/src/main/java/io/prestosql/metadata/MetadataManager.java +++ b/presto-main/src/main/java/io/prestosql/metadata/MetadataManager.java @@ -704,12 +704,12 @@ public Optional getNewTableLayout(Session session, String catalo } @Override - public void beginQuery(Session session, Set connectors) + public void beginQuery(Session session, Multimap tableHandles) { - for (CatalogName catalogName : connectors) { - ConnectorMetadata metadata = getMetadata(session, catalogName); - ConnectorSession connectorSession = session.toConnectorSession(catalogName); - metadata.beginQuery(connectorSession); + for (Entry> entry : tableHandles.asMap().entrySet()) { + ConnectorMetadata metadata = getMetadata(session, entry.getKey()); + ConnectorSession connectorSession = session.toConnectorSession(entry.getKey()); + metadata.beginQuery(connectorSession, entry.getValue()); registerCatalogForQueryId(session.getQueryId(), metadata); } } diff --git a/presto-main/src/test/java/io/prestosql/metadata/AbstractMockMetadata.java b/presto-main/src/test/java/io/prestosql/metadata/AbstractMockMetadata.java index 2c5682e8d344b..75ec61d918e79 100644 --- a/presto-main/src/test/java/io/prestosql/metadata/AbstractMockMetadata.java +++ b/presto-main/src/test/java/io/prestosql/metadata/AbstractMockMetadata.java @@ -15,6 +15,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Multimap; import io.airlift.slice.Slice; import io.prestosql.Session; import io.prestosql.connector.CatalogName; @@ -29,6 +30,7 @@ import io.prestosql.spi.connector.ColumnMetadata; import io.prestosql.spi.connector.ConnectorCapabilities; import io.prestosql.spi.connector.ConnectorOutputMetadata; +import io.prestosql.spi.connector.ConnectorTableHandle; import io.prestosql.spi.connector.ConnectorTableMetadata; import io.prestosql.spi.connector.ConnectorViewDefinition; import io.prestosql.spi.connector.Constraint; @@ -289,7 +291,7 @@ public void finishStatisticsCollection(Session session, AnalyzeTableHandle table } @Override - public void beginQuery(Session session, Set connectors) + public void beginQuery(Session session, Multimap connectors) { throw new UnsupportedOperationException(); } diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorMetadata.java b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorMetadata.java index 820829e4e78bd..3dfe6bfc2a950 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorMetadata.java +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorMetadata.java @@ -390,9 +390,20 @@ default Optional finishCreateTable(ConnectorSession ses /** * Start a SELECT/UPDATE/INSERT/DELETE query. This notification is triggered after the planning phase completes. + * + * @deprecated Use {@link #beginQuery(ConnectorSession, Collection)} instead. */ + @Deprecated default void beginQuery(ConnectorSession session) {} + /** + * Start a SELECT/UPDATE/INSERT/DELETE query. This notification is triggered after the planning phase completes. + */ + default void beginQuery(ConnectorSession session, Collection tableHandles) + { + beginQuery(session); + } + /** * Cleanup after a SELECT/UPDATE/INSERT/DELETE query. This is the very last notification after the query finishes, whether it succeeds or fails. * An exception thrown in this method will not affect the result of the query. diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java b/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java index 80c1df477e4e7..dec970ff87323 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java @@ -371,6 +371,14 @@ public void beginQuery(ConnectorSession session) } } + @Override + public void beginQuery(ConnectorSession session, Collection tableHandles) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + delegate.beginQuery(session, tableHandles); + } + } + @Override public void cleanupQuery(ConnectorSession session) {