Skip to content

Commit

Permalink
Make catalog names transactional
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Oct 19, 2016
1 parent 88b42d4 commit cf04cf0
Show file tree
Hide file tree
Showing 21 changed files with 258 additions and 117 deletions.
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.connector.system.SystemConnector;
import com.facebook.presto.index.IndexManager;
import com.facebook.presto.metadata.Catalog;
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.metadata.HandleResolver;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.metadata.MetadataManager;
Expand Down Expand Up @@ -69,7 +70,6 @@
import static com.facebook.presto.connector.ConnectorId.createSystemTablesConnectorId;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand All @@ -79,6 +79,7 @@ public class ConnectorManager
private static final Logger log = Logger.get(ConnectorManager.class);

private final MetadataManager metadataManager;
private final CatalogManager catalogManager;
private final AccessControlManager accessControlManager;
private final SplitManager splitManager;
private final PageSourceManager pageSourceManager;
Expand All @@ -97,9 +98,6 @@ public class ConnectorManager
@GuardedBy("this")
private final ConcurrentMap<String, ConnectorFactory> connectorFactories = new ConcurrentHashMap<>();

@GuardedBy("this")
private final Set<String> catalogs = newConcurrentHashSet();

@GuardedBy("this")
private final ConcurrentMap<ConnectorId, MaterializedConnector> connectors = new ConcurrentHashMap<>();

Expand All @@ -108,6 +106,7 @@ public class ConnectorManager
@Inject
public ConnectorManager(
MetadataManager metadataManager,
CatalogManager catalogManager,
AccessControlManager accessControlManager,
SplitManager splitManager,
PageSourceManager pageSourceManager,
Expand All @@ -123,6 +122,7 @@ public ConnectorManager(
TransactionManager transactionManager)
{
this.metadataManager = metadataManager;
this.catalogManager = catalogManager;
this.accessControlManager = accessControlManager;
this.splitManager = splitManager;
this.pageSourceManager = pageSourceManager;
Expand Down Expand Up @@ -178,7 +178,7 @@ private synchronized ConnectorId createConnection(String catalogName, ConnectorF
requireNonNull(catalogName, "catalogName is null");
requireNonNull(properties, "properties is null");
requireNonNull(connectorFactory, "connectorFactory is null");
checkArgument(!catalogs.contains(catalogName), "A catalog already exists for %s", catalogName);
checkArgument(!catalogManager.getCatalog(catalogName).isPresent(), "A catalog already exists for %s", catalogName);

ConnectorId connectorId = new ConnectorId(catalogName);
checkState(!connectors.containsKey(connectorId), "A connector %s already exists", connectorId);
Expand Down Expand Up @@ -212,8 +212,7 @@ private synchronized void addCatalogConnector(String catalogName, ConnectorId co
informationSchemaConnector.getConnector(),
systemConnector.getConnectorId(),
systemConnector.getConnector());
transactionManager.registerCatalog(catalog);
metadataManager.registerConnectorCatalog(connectorId, catalogName);
catalogManager.registerCatalog(catalog);

// todo the following managers currently need access to catalog name, so they must be handled specially
for (Procedure procedure : connector.getProcedures()) {
Expand Down
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.connector.system;

import com.facebook.presto.connector.ConnectorId;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.InMemoryRecordSet;
Expand All @@ -24,6 +23,8 @@
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.transaction.TransactionId;
import com.facebook.presto.transaction.TransactionManager;

import javax.inject.Inject;

Expand All @@ -32,6 +33,7 @@
import static com.facebook.presto.metadata.MetadataUtil.TableMetadataBuilder.tableMetadataBuilder;
import static com.facebook.presto.spi.SystemTable.Distribution.SINGLE_COORDINATOR;
import static com.facebook.presto.spi.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.util.Types.checkType;
import static java.util.Objects.requireNonNull;

public class CatalogSystemTable
Expand All @@ -43,12 +45,12 @@ public class CatalogSystemTable
.column("catalog_name", createUnboundedVarcharType())
.column("connector_id", createUnboundedVarcharType())
.build();
private final Metadata metadata;
private final TransactionManager transactionManager;

@Inject
public CatalogSystemTable(Metadata metadata)
public CatalogSystemTable(TransactionManager transactionManager)
{
this.metadata = requireNonNull(metadata);
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
}

@Override
Expand All @@ -66,8 +68,9 @@ public ConnectorTableMetadata getTableMetadata()
@Override
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
TransactionId transactionId = checkType(transactionHandle, SystemTransactionHandle.class, "transactionHandle").getTransactionId();
Builder table = InMemoryRecordSet.builder(CATALOG_TABLE);
for (Map.Entry<String, ConnectorId> entry : metadata.getCatalogNames().entrySet()) {
for (Map.Entry<String, ConnectorId> entry : transactionManager.getCatalogNames(transactionId).entrySet()) {
table.addRow(entry.getKey(), entry.getValue().toString());
}
return table.build().cursor();
Expand Down
Expand Up @@ -13,7 +13,7 @@
*/
package com.facebook.presto.connector.system.jdbc;

import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.connector.system.GlobalSystemTransactionHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.InMemoryRecordSet;
Expand All @@ -22,6 +22,9 @@
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.transaction.TransactionId;
import com.facebook.presto.transaction.TransactionManager;
import com.facebook.presto.util.Types;

import javax.inject.Inject;

Expand All @@ -38,12 +41,12 @@ public class CatalogJdbcTable
.column("table_cat", createUnboundedVarcharType())
.build();

private final Metadata metadata;
private final TransactionManager transactionManager;

@Inject
public CatalogJdbcTable(Metadata metadata)
public CatalogJdbcTable(TransactionManager transactionManager)
{
this.metadata = requireNonNull(metadata);
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
}

@Override
Expand All @@ -55,8 +58,9 @@ public ConnectorTableMetadata getTableMetadata()
@Override
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
TransactionId transactionId = Types.checkType(transactionHandle, GlobalSystemTransactionHandle.class, "transactionHandle").getTransactionId();
Builder table = InMemoryRecordSet.builder(METADATA);
for (String name : metadata.getCatalogNames().keySet()) {
for (String name : transactionManager.getCatalogNames(transactionId).keySet()) {
table.addRow(name);
}
return table.build().cursor();
Expand Down
Expand Up @@ -120,7 +120,7 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
Optional<String> tableFilter = stringFilter(constraint, 2);

Builder table = InMemoryRecordSet.builder(METADATA);
for (String catalog : filter(metadata.getCatalogNames().keySet(), catalogFilter)) {
for (String catalog : filter(metadata.getCatalogNames(session).keySet(), catalogFilter)) {
QualifiedTablePrefix prefix = FilterUtil.tablePrefix(catalog, schemaFilter, tableFilter);
for (Entry<QualifiedObjectName, List<ColumnMetadata>> entry : metadata.listTableColumns(session, prefix).entrySet()) {
addColumnRows(table, entry.getKey(), entry.getValue());
Expand Down
Expand Up @@ -68,7 +68,7 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
Optional<String> catalogFilter = FilterUtil.stringFilter(constraint, 1);

Builder table = InMemoryRecordSet.builder(METADATA);
for (String catalog : filter(metadata.getCatalogNames().keySet(), catalogFilter)) {
for (String catalog : filter(metadata.getCatalogNames(session).keySet(), catalogFilter)) {
for (String schema : metadata.listSchemaNames(session, catalog)) {
table.addRow(schema, catalog);
}
Expand Down
Expand Up @@ -83,7 +83,7 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
Optional<String> typeFilter = stringFilter(constraint, 3);

Builder table = InMemoryRecordSet.builder(METADATA);
for (String catalog : filter(metadata.getCatalogNames().keySet(), catalogFilter)) {
for (String catalog : filter(metadata.getCatalogNames(session).keySet(), catalogFilter)) {
QualifiedTablePrefix prefix = tablePrefix(catalog, schemaFilter, tableFilter);

if (FilterUtil.emptyOrEquals(typeFilter, "TABLE")) {
Expand Down
Expand Up @@ -97,7 +97,7 @@ public CompletableFuture<?> execute(CreateTable statement, TransactionManager tr
else if (element instanceof LikeClause) {
LikeClause likeClause = (LikeClause) element;
QualifiedObjectName likeTableName = createQualifiedObjectName(session, statement, likeClause.getTableName());
if (!metadata.getCatalogNames().containsKey(likeTableName.getCatalogName())) {
if (!metadata.getCatalogNames(session).containsKey(likeTableName.getCatalogName())) {
throw new SemanticException(MISSING_CATALOG, statement, "LIKE table catalog '%s' does not exist", likeTableName.getCatalogName());
}
if (!tableName.getCatalogName().equals(likeTableName.getCatalogName())) {
Expand Down
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.metadata;

import com.google.common.collect.ImmutableList;

import javax.annotation.concurrent.ThreadSafe;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

@ThreadSafe
public class CatalogManager
{
private final ConcurrentMap<String, Catalog> catalogs = new ConcurrentHashMap<>();

public synchronized void registerCatalog(Catalog catalog)
{
requireNonNull(catalog, "catalog is null");

checkState(catalogs.put(catalog.getCatalogName(), catalog) == null, "Catalog '%s' is already registered", catalog.getCatalogName());
}

public void removeCatalog(String catalogName)
{
catalogs.remove(catalogName);
}

public List<Catalog> getCatalogs()
{
return ImmutableList.copyOf(catalogs.values());
}

public Optional<Catalog> getCatalog(String catalogName)
{
return Optional.ofNullable(catalogs.get(catalogName));
}
}
Expand Up @@ -227,7 +227,7 @@ public interface Metadata
* @return Map of catalog name to connector id
*/
@NotNull
Map<String, ConnectorId> getCatalogNames();
Map<String, ConnectorId> getCatalogNames(Session session);

/**
* Get the names that match the specified table prefix (never null).
Expand Down
Expand Up @@ -72,15 +72,12 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;

import static com.facebook.presto.metadata.QualifiedObjectName.convertFromSchemaTableName;
import static com.facebook.presto.metadata.TableLayout.fromConnectorLayout;
import static com.facebook.presto.metadata.ViewDefinition.ViewColumn;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_VIEW;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.SYNTAX_ERROR;
import static com.facebook.presto.spi.function.OperatorType.BETWEEN;
import static com.facebook.presto.spi.function.OperatorType.EQUAL;
Expand All @@ -94,16 +91,13 @@
import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
import static com.facebook.presto.transaction.TransactionManager.createTestTransactionManager;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public class MetadataManager
implements Metadata
{
private final ConcurrentMap<String, ConnectorId> connectorsByCatalog = new ConcurrentHashMap<>();

private final FunctionRegistry functions;
private final ProcedureRegistry procedures;
private final TypeManager typeManager;
Expand Down Expand Up @@ -156,6 +150,11 @@ public MetadataManager(FeaturesConfig featuresConfig,
}

public static MetadataManager createTestMetadataManager()
{
return createTestMetadataManager(new CatalogManager());
}

public static MetadataManager createTestMetadataManager(CatalogManager catalogManager)
{
TypeManager typeManager = new TypeRegistry();
return new MetadataManager(
Expand All @@ -165,14 +164,7 @@ public static MetadataManager createTestMetadataManager()
new SessionPropertyManager(),
new SchemaPropertyManager(),
new TablePropertyManager(),
createTestTransactionManager());
}

public synchronized void registerConnectorCatalog(ConnectorId connectorId, String catalogName)
{
requireNonNull(connectorId, "connectorId is null");
requireNonNull(catalogName, "catalogName is null");
checkArgument(connectorsByCatalog.putIfAbsent(catalogName, connectorId) == null, "Catalog '%s' is already registered", catalogName);
createTestTransactionManager(catalogManager));
}

@Override
Expand Down Expand Up @@ -639,13 +631,13 @@ public void finishDelete(Session session, TableHandle tableHandle, Collection<Sl
@Override
public Optional<ConnectorId> getCatalogHandle(Session session, String catalogName)
{
return Optional.ofNullable(connectorsByCatalog.get(catalogName));
return transactionManager.getOptionalCatalogMetadata(session.getRequiredTransactionId(), catalogName).map(CatalogMetadata::getConnectorId);
}

@Override
public Map<String, ConnectorId> getCatalogNames()
public Map<String, ConnectorId> getCatalogNames(Session session)
{
return ImmutableMap.copyOf(connectorsByCatalog);
return transactionManager.getCatalogNames(session.getRequiredTransactionId());
}

@Override
Expand Down Expand Up @@ -826,8 +818,7 @@ private ViewDefinition deserializeView(String data)

private Optional<CatalogMetadata> getOptionalCatalogMetadata(Session session, String catalogName)
{
return Optional.ofNullable(connectorsByCatalog.get(catalogName))
.map(connectorId -> getCatalogMetadata(session, connectorId));
return transactionManager.getOptionalCatalogMetadata(session.getRequiredTransactionId(), catalogName);
}

private CatalogMetadata getCatalogMetadata(Session session, ConnectorId connectorId)
Expand All @@ -837,11 +828,7 @@ private CatalogMetadata getCatalogMetadata(Session session, ConnectorId connecto

private CatalogMetadata getCatalogMetadataForWrite(Session session, String catalogName)
{
ConnectorId connectorId = connectorsByCatalog.get(catalogName);
if (connectorId == null) {
throw new PrestoException(NOT_FOUND, "Catalog does not exist: " + catalogName);
}
return getCatalogMetadataForWrite(session, connectorId);
return transactionManager.getCatalogMetadataForWrite(session.getRequiredTransactionId(), catalogName);
}

private CatalogMetadata getCatalogMetadataForWrite(Session session, ConnectorId connectorId)
Expand Down

0 comments on commit cf04cf0

Please sign in to comment.