Skip to content

Commit

Permalink
Add ConnectorId to enforce separation from catalog name
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Sep 19, 2016
1 parent e66af52 commit 6dd7dd2
Show file tree
Hide file tree
Showing 84 changed files with 589 additions and 423 deletions.
Expand Up @@ -13,6 +13,7 @@
*/ */
package com.facebook.presto.hive; package com.facebook.presto.hive;


import com.facebook.presto.connector.ConnectorId;
import com.facebook.presto.hive.orc.OrcPageSourceFactory; import com.facebook.presto.hive.orc.OrcPageSourceFactory;
import com.facebook.presto.metadata.Split; import com.facebook.presto.metadata.Split;
import com.facebook.presto.operator.DriverContext; import com.facebook.presto.operator.DriverContext;
Expand Down Expand Up @@ -405,7 +406,7 @@ public SourceOperator newTableScanOperator(DriverContext driverContext)
columns.stream().map(columnHandle -> (ColumnHandle) columnHandle).collect(toList()) columns.stream().map(columnHandle -> (ColumnHandle) columnHandle).collect(toList())
); );
SourceOperator operator = sourceOperatorFactory.createOperator(driverContext); SourceOperator operator = sourceOperatorFactory.createOperator(driverContext);
operator.addSplit(new Split("test", TestingTransactionHandle.create("test"), TestingSplit.createLocalSplit())); operator.addSplit(new Split(new ConnectorId("test"), TestingTransactionHandle.create(), TestingSplit.createLocalSplit()));
return operator; return operator;
} }


Expand All @@ -428,7 +429,7 @@ public SourceOperator newScanFilterAndProjectOperator(DriverContext driverContex
types types
); );
SourceOperator operator = sourceOperatorFactory.createOperator(driverContext); SourceOperator operator = sourceOperatorFactory.createOperator(driverContext);
operator.addSplit(new Split("test", TestingTransactionHandle.create("test"), TestingSplit.createLocalSplit())); operator.addSplit(new Split(new ConnectorId("test"), TestingTransactionHandle.create(), TestingSplit.createLocalSplit()));
return operator; return operator;
} }


Expand Down
9 changes: 5 additions & 4 deletions presto-main/src/main/java/com/facebook/presto/Session.java
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto; package com.facebook.presto;


import com.facebook.presto.client.ClientSession; import com.facebook.presto.client.ClientSession;
import com.facebook.presto.connector.ConnectorId;
import com.facebook.presto.metadata.SessionPropertyManager; import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.QueryId;
Expand Down Expand Up @@ -335,17 +336,17 @@ public ConnectorSession toConnectorSession()
return new FullConnectorSession(queryId.toString(), identity, timeZoneKey, locale, startTime); return new FullConnectorSession(queryId.toString(), identity, timeZoneKey, locale, startTime);
} }


public ConnectorSession toConnectorSession(String catalog) public ConnectorSession toConnectorSession(ConnectorId connectorId)
{ {
requireNonNull(catalog, "catalog is null"); requireNonNull(connectorId, "connectorId is null");
return new FullConnectorSession( return new FullConnectorSession(
queryId.toString(), queryId.toString(),
identity, identity,
timeZoneKey, timeZoneKey,
locale, locale,
startTime, startTime,
catalogProperties.getOrDefault(catalog, ImmutableMap.of()), catalogProperties.getOrDefault(connectorId.getCatalogName(), ImmutableMap.of()),
catalog, connectorId.getCatalogName(),
sessionPropertyManager); sessionPropertyManager);
} }


Expand Down
Expand Up @@ -27,9 +27,9 @@ public class ConnectorAwareNodeManager
{ {
private final InternalNodeManager nodeManager; private final InternalNodeManager nodeManager;
private final String environment; private final String environment;
private final String connectorId; private final ConnectorId connectorId;


public ConnectorAwareNodeManager(InternalNodeManager nodeManager, String environment, String connectorId) public ConnectorAwareNodeManager(InternalNodeManager nodeManager, String environment, ConnectorId connectorId)
{ {
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.environment = requireNonNull(environment, "environment is null"); this.environment = requireNonNull(environment, "environment is null");
Expand Down
@@ -0,0 +1,84 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.connector;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

import java.util.Objects;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public final class ConnectorId
{
private static final String INFORMATION_SCHEMA_CONNECTOR_PREFIX = "$info_schema@";
private static final String SYSTEM_TABLES_CONNECTOR_PREFIX = "$system@";

private final String catalogName;

@JsonCreator
public ConnectorId(String catalogName)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
checkArgument(!catalogName.isEmpty(), "catalogName is empty");
}

public String getCatalogName()
{
return catalogName;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ConnectorId that = (ConnectorId) o;
return Objects.equals(catalogName, that.catalogName);
}

@Override
public int hashCode()
{
return Objects.hash(catalogName);
}

@JsonValue
@Override
public String toString()
{
return catalogName;
}

public static boolean isInternalSystemConnector(ConnectorId connectorId)
{
return connectorId.getCatalogName().startsWith(SYSTEM_TABLES_CONNECTOR_PREFIX) ||
connectorId.getCatalogName().startsWith(INFORMATION_SCHEMA_CONNECTOR_PREFIX);
}

public static ConnectorId createInformationSchemaConnectorId(ConnectorId connectorId)
{
return new ConnectorId(INFORMATION_SCHEMA_CONNECTOR_PREFIX + connectorId.getCatalogName());
}

public static ConnectorId createSystemTablesConnectorId(ConnectorId connectorId)
{
return new ConnectorId(SYSTEM_TABLES_CONNECTOR_PREFIX + connectorId.getCatalogName());
}
}
Expand Up @@ -59,6 +59,8 @@
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;


import static com.facebook.presto.connector.ConnectorId.createInformationSchemaConnectorId;
import static com.facebook.presto.connector.ConnectorId.createSystemTablesConnectorId;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Sets.newConcurrentHashSet; import static com.google.common.collect.Sets.newConcurrentHashSet;
Expand All @@ -68,9 +70,6 @@
@ThreadSafe @ThreadSafe
public class ConnectorManager public class ConnectorManager
{ {
public static final String INFORMATION_SCHEMA_CONNECTOR_PREFIX = "$info_schema@";
public static final String SYSTEM_TABLES_CONNECTOR_PREFIX = "$system@";

private static final Logger log = Logger.get(ConnectorManager.class); private static final Logger log = Logger.get(ConnectorManager.class);


private final MetadataManager metadataManager; private final MetadataManager metadataManager;
Expand All @@ -95,7 +94,7 @@ public class ConnectorManager
@GuardedBy("this") @GuardedBy("this")
private final Set<String> catalogs = newConcurrentHashSet(); private final Set<String> catalogs = newConcurrentHashSet();
@GuardedBy("this") @GuardedBy("this")
private final ConcurrentMap<String, Connector> connectors = new ConcurrentHashMap<>(); private final ConcurrentMap<ConnectorId, Connector> connectors = new ConcurrentHashMap<>();


private final AtomicBoolean stopped = new AtomicBoolean(); private final AtomicBoolean stopped = new AtomicBoolean();


Expand Down Expand Up @@ -138,7 +137,7 @@ public synchronized void stop()
return; return;
} }


for (Map.Entry<String, Connector> entry : connectors.entrySet()) { for (Map.Entry<ConnectorId, Connector> entry : connectors.entrySet()) {
Connector connector = entry.getValue(); Connector connector = entry.getValue();
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(connector.getClass().getClassLoader())) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(connector.getClass().getClassLoader())) {
connector.shutdown(); connector.shutdown();
Expand All @@ -163,40 +162,42 @@ public synchronized void addConnectorFactory(ConnectorFactory connectorFactory)
handleResolver.addConnectorName(connectorFactory.getName(), connectorFactory.getHandleResolver()); handleResolver.addConnectorName(connectorFactory.getName(), connectorFactory.getHandleResolver());
} }


public synchronized void createConnection(String catalogName, String connectorName, Map<String, String> properties) public synchronized ConnectorId createConnection(String catalogName, String connectorName, Map<String, String> properties)
{ {
requireNonNull(connectorName, "connectorName is null"); requireNonNull(connectorName, "connectorName is null");
ConnectorFactory connectorFactory = connectorFactories.get(connectorName); ConnectorFactory connectorFactory = connectorFactories.get(connectorName);
checkArgument(connectorFactory != null, "No factory for connector %s", connectorName); checkArgument(connectorFactory != null, "No factory for connector %s", connectorName);
createConnection(catalogName, connectorFactory, properties); return createConnection(catalogName, connectorFactory, properties);
} }


private synchronized void createConnection(String catalogName, ConnectorFactory connectorFactory, Map<String, String> properties) private synchronized ConnectorId createConnection(String catalogName, ConnectorFactory connectorFactory, Map<String, String> properties)
{ {
checkState(!stopped.get(), "ConnectorManager is stopped"); checkState(!stopped.get(), "ConnectorManager is stopped");
requireNonNull(catalogName, "catalogName is null"); requireNonNull(catalogName, "catalogName is null");
requireNonNull(properties, "properties is null"); requireNonNull(properties, "properties is null");
requireNonNull(connectorFactory, "connectorFactory is null"); requireNonNull(connectorFactory, "connectorFactory is null");
checkArgument(!catalogs.contains(catalogName), "A catalog already exists for %s", catalogName); checkArgument(!catalogs.contains(catalogName), "A catalog already exists for %s", catalogName);


String connectorId = getConnectorId(catalogName); ConnectorId connectorId = new ConnectorId(catalogName);
checkState(!connectors.containsKey(connectorId), "A connector %s already exists", connectorId); checkState(!connectors.containsKey(connectorId), "A connector %s already exists", connectorId);


addCatalogConnector(catalogName, connectorId, connectorFactory, properties); addCatalogConnector(catalogName, connectorId, connectorFactory, properties);


catalogs.add(catalogName); catalogs.add(catalogName);

return connectorId;
} }


private synchronized void addCatalogConnector(String catalogName, String connectorId, ConnectorFactory factory, Map<String, String> properties) private synchronized void addCatalogConnector(String catalogName, ConnectorId connectorId, ConnectorFactory factory, Map<String, String> properties)
{ {
Connector connector = createConnector(connectorId, factory, properties); Connector connector = createConnector(connectorId, factory, properties);


addConnectorInternal(ConnectorType.STANDARD, catalogName, connectorId, connector); addConnectorInternal(ConnectorType.STANDARD, catalogName, connectorId, connector);


String informationSchemaId = makeInformationSchemaConnectorId(connectorId); ConnectorId informationSchemaId = createInformationSchemaConnectorId(connectorId);
addConnectorInternal(ConnectorType.INFORMATION_SCHEMA, catalogName, informationSchemaId, new InformationSchemaConnector(catalogName, nodeManager, metadataManager)); addConnectorInternal(ConnectorType.INFORMATION_SCHEMA, catalogName, informationSchemaId, new InformationSchemaConnector(catalogName, nodeManager, metadataManager));


String systemId = makeSystemTablesConnectorId(connectorId); ConnectorId systemId = createSystemTablesConnectorId(connectorId);
addConnectorInternal(ConnectorType.SYSTEM, catalogName, systemId, new SystemConnector( addConnectorInternal(ConnectorType.SYSTEM, catalogName, systemId, new SystemConnector(
systemId, systemId,
nodeManager, nodeManager,
Expand All @@ -208,7 +209,7 @@ private synchronized void addCatalogConnector(String catalogName, String connect
metadataManager.getTablePropertyManager().addProperties(catalogName, connector.getTableProperties()); metadataManager.getTablePropertyManager().addProperties(catalogName, connector.getTableProperties());
} }


private synchronized void addConnectorInternal(ConnectorType type, String catalogName, String connectorId, Connector connector) private synchronized void addConnectorInternal(ConnectorType type, String catalogName, ConnectorId connectorId, Connector connector)
{ {
checkState(!stopped.get(), "ConnectorManager is stopped"); checkState(!stopped.get(), "ConnectorManager is stopped");
checkState(!connectors.containsKey(connectorId), "A connector %s already exists", connectorId); checkState(!connectors.containsKey(connectorId), "A connector %s already exists", connectorId);
Expand Down Expand Up @@ -327,7 +328,7 @@ else if (type == ConnectorType.SYSTEM) {
} }
} }


private Connector createConnector(String connectorId, ConnectorFactory factory, Map<String, String> properties) private Connector createConnector(ConnectorId connectorId, ConnectorFactory factory, Map<String, String> properties)
{ {
Class<?> factoryClass = factory.getClass(); Class<?> factoryClass = factory.getClass();
if (factory instanceof LegacyTransactionConnectorFactory) { if (factory instanceof LegacyTransactionConnectorFactory) {
Expand All @@ -341,7 +342,7 @@ private Connector createConnector(String connectorId, ConnectorFactory factory,
pageIndexerFactory); pageIndexerFactory);


try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(factoryClass.getClassLoader())) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(factoryClass.getClassLoader())) {
return factory.create(connectorId, properties, context); return factory.create(connectorId.getCatalogName(), properties, context);
} }
} }


Expand All @@ -351,20 +352,4 @@ private enum ConnectorType
INFORMATION_SCHEMA, INFORMATION_SCHEMA,
SYSTEM SYSTEM
} }

private static String makeInformationSchemaConnectorId(String connectorId)
{
return INFORMATION_SCHEMA_CONNECTOR_PREFIX + connectorId;
}

private static String makeSystemTablesConnectorId(String connectorId)
{
return SYSTEM_TABLES_CONNECTOR_PREFIX + connectorId;
}

private static String getConnectorId(String catalogName)
{
// for now connectorId == catalogName
return catalogName;
}
} }
Expand Up @@ -13,6 +13,7 @@
*/ */
package com.facebook.presto.connector.system; package com.facebook.presto.connector.system;


import com.facebook.presto.connector.ConnectorId;
import com.facebook.presto.metadata.Metadata; import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableMetadata; import com.facebook.presto.spi.ConnectorTableMetadata;
Expand Down Expand Up @@ -66,8 +67,8 @@ public ConnectorTableMetadata getTableMetadata()
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint) public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{ {
Builder table = InMemoryRecordSet.builder(CATALOG_TABLE); Builder table = InMemoryRecordSet.builder(CATALOG_TABLE);
for (Map.Entry<String, String> entry : metadata.getCatalogNames().entrySet()) { for (Map.Entry<String, ConnectorId> entry : metadata.getCatalogNames().entrySet()) {
table.addRow(entry.getKey(), entry.getValue()); table.addRow(entry.getKey(), entry.getValue().toString());
} }
return table.build().cursor(); return table.build().cursor();
} }
Expand Down
Expand Up @@ -13,6 +13,7 @@
*/ */
package com.facebook.presto.connector.system; package com.facebook.presto.connector.system;


import com.facebook.presto.connector.ConnectorId;
import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorTableMetadata; import com.facebook.presto.spi.ConnectorTableMetadata;
Expand All @@ -28,20 +29,20 @@
public class SystemColumnHandle public class SystemColumnHandle
implements ColumnHandle implements ColumnHandle
{ {
private final String connectorId; private final ConnectorId connectorId;
private final String columnName; private final String columnName;


@JsonCreator @JsonCreator
public SystemColumnHandle( public SystemColumnHandle(
@JsonProperty("connectorId") String connectorId, @JsonProperty("connectorId") ConnectorId connectorId,
@JsonProperty("columnName") String columnName) @JsonProperty("columnName") String columnName)
{ {
this.connectorId = requireNonNull(connectorId, "connectorId is null"); this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.columnName = requireNonNull(columnName, "columnName is null"); this.columnName = requireNonNull(columnName, "columnName is null");
} }


@JsonProperty @JsonProperty
public String getConnectorId() public ConnectorId getConnectorId()
{ {
return connectorId; return connectorId;
} }
Expand Down Expand Up @@ -78,7 +79,7 @@ public String toString()
return connectorId + ":" + columnName; return connectorId + ":" + columnName;
} }


public static Map<String, ColumnHandle> toSystemColumnHandles(String connectorId, ConnectorTableMetadata tableMetadata) public static Map<String, ColumnHandle> toSystemColumnHandles(ConnectorId connectorId, ConnectorTableMetadata tableMetadata)
{ {
ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder(); ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (ColumnMetadata columnMetadata : tableMetadata.getColumns()) { for (ColumnMetadata columnMetadata : tableMetadata.getColumns()) {
Expand Down
Expand Up @@ -13,6 +13,7 @@
*/ */
package com.facebook.presto.connector.system; package com.facebook.presto.connector.system;


import com.facebook.presto.connector.ConnectorId;
import com.facebook.presto.metadata.InternalNodeManager; import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.spi.SystemTable; import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.connector.ConnectorMetadata;
Expand All @@ -31,14 +32,14 @@
public class SystemConnector public class SystemConnector
implements InternalConnector implements InternalConnector
{ {
private final String connectorId; private final ConnectorId connectorId;
private final ConnectorMetadata metadata; private final ConnectorMetadata metadata;
private final ConnectorSplitManager splitManager; private final ConnectorSplitManager splitManager;
private final ConnectorPageSourceProvider pageSourceProvider; private final ConnectorPageSourceProvider pageSourceProvider;
private final Function<TransactionId, ConnectorTransactionHandle> transactionHandleFunction; private final Function<TransactionId, ConnectorTransactionHandle> transactionHandleFunction;


public SystemConnector( public SystemConnector(
String connectorId, ConnectorId connectorId,
InternalNodeManager nodeManager, InternalNodeManager nodeManager,
Set<SystemTable> tables, Set<SystemTable> tables,
Function<TransactionId, ConnectorTransactionHandle> transactionHandleFunction) Function<TransactionId, ConnectorTransactionHandle> transactionHandleFunction)
Expand Down

0 comments on commit 6dd7dd2

Please sign in to comment.