Skip to content

Commit

Permalink
Properly implement SystemConnector as a Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
erichwang committed Dec 11, 2015
1 parent 6dc9b56 commit 4f47ea7
Show file tree
Hide file tree
Showing 14 changed files with 162 additions and 54 deletions.
Expand Up @@ -153,15 +153,6 @@ public synchronized void createConnection(String catalogName, ConnectorFactory c
addConnector(catalogName, connectorId, connector);
}

public synchronized void createConnection(String catalogName, Connector connector)
{
checkState(!stopped.get(), "ConnectorManager is stopped");
requireNonNull(catalogName, "catalogName is null");
requireNonNull(connector, "connector is null");

addConnector(catalogName, getConnectorId(catalogName), connector);
}

private synchronized void addConnector(String catalogName, String connectorId, Connector connector)
{
checkState(!stopped.get(), "ConnectorManager is stopped");
Expand Down Expand Up @@ -248,7 +239,8 @@ private synchronized void addConnector(String catalogName, String connectorId, C
splitManager.addConnectorSplitManager(makeInformationSchemaConnectorId(connectorId), informationSchemaConnector.getSplitManager());
pageSourceManager.addConnectorPageSourceProvider(makeInformationSchemaConnectorId(connectorId), informationSchemaConnector.getPageSourceProvider());

Connector systemConnector = new SystemConnector(nodeManager, systemTables);
Connector systemConnector = new SystemConnector(makeSystemTablesConnectorId(connectorId), nodeManager, systemTables);
handleResolver.addHandleResolver(makeSystemTablesConnectorId(connectorId), systemConnector.getHandleResolver());
metadataManager.addSystemTablesMetadata(makeSystemTablesConnectorId(connectorId), catalogName, systemConnector.getMetadata());
splitManager.addConnectorSplitManager(makeSystemTablesConnectorId(connectorId), systemConnector.getSplitManager());
pageSourceManager.addConnectorPageSourceProvider(makeSystemTablesConnectorId(connectorId), new RecordPageSourceProvider(systemConnector.getRecordSetProvider()));
Expand Down
Expand Up @@ -23,15 +23,27 @@
import java.util.Map;
import java.util.Objects;

import static java.util.Objects.requireNonNull;

public class SystemColumnHandle
implements ColumnHandle
{
private final String connectorId;
private final String columnName;

@JsonCreator
public SystemColumnHandle(@JsonProperty("columnName") String columnName)
public SystemColumnHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("columnName") String columnName)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.columnName = requireNonNull(columnName, "columnName is null");
}

@JsonProperty
public String getConnectorId()
{
this.columnName = columnName;
return connectorId;
}

@JsonProperty
Expand All @@ -43,7 +55,7 @@ public String getColumnName()
@Override
public int hashCode()
{
return Objects.hash(columnName);
return Objects.hash(connectorId, columnName);
}

@Override
Expand All @@ -56,20 +68,21 @@ public boolean equals(Object obj)
return false;
}
final SystemColumnHandle other = (SystemColumnHandle) obj;
return Objects.equals(this.columnName, other.columnName);
return Objects.equals(this.connectorId, other.connectorId) &&
Objects.equals(this.columnName, other.columnName);
}

@Override
public String toString()
{
return "system:" + columnName;
return connectorId + ":" + columnName;
}

public static Map<String, ColumnHandle> toSystemColumnHandles(ConnectorTableMetadata tableMetadata)
public static Map<String, ColumnHandle> toSystemColumnHandles(String connectorId, ConnectorTableMetadata tableMetadata)
{
ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (ColumnMetadata columnMetadata : tableMetadata.getColumns()) {
columnHandles.put(columnMetadata.getName(), new SystemColumnHandle(columnMetadata.getName()));
columnHandles.put(columnMetadata.getName(), new SystemColumnHandle(connectorId, columnMetadata.getName()));
}

return columnHandles.build();
Expand Down
Expand Up @@ -21,23 +21,22 @@
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.SystemTable;

import javax.inject.Inject;

import java.util.Set;

public class SystemConnector
implements Connector
{
public static final String NAME = "system";

private final SystemHandleResolver handleResolver;
private final ConnectorMetadata metadata;
private final ConnectorSplitManager splitManager;
private final ConnectorRecordSetProvider recordSetProvider;

@Inject
public SystemConnector(NodeManager nodeManager, Set<SystemTable> tables)
public SystemConnector(String connectorId, NodeManager nodeManager, Set<SystemTable> tables)
{
metadata = new SystemTablesMetadata(tables);
handleResolver = new SystemHandleResolver(connectorId);
metadata = new SystemTablesMetadata(connectorId, tables);
splitManager = new SystemSplitManager(nodeManager, tables);
recordSetProvider = new SystemRecordSetProvider(tables);
}
Expand All @@ -57,7 +56,7 @@ public ConnectorSplitManager getSplitManager()
@Override
public ConnectorHandleResolver getHandleResolver()
{
return new SystemHandleResolver();
return handleResolver;
}

@Override
Expand Down
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.connector.system;

import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorFactory;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.SystemTable;
import com.google.common.collect.ImmutableSet;

import javax.inject.Inject;

import java.util.Map;
import java.util.Set;

import static java.util.Objects.requireNonNull;

public class
SystemConnectorFactory
implements ConnectorFactory
{
private final NodeManager nodeManager;
private final Set<SystemTable> tables;

@Inject
public SystemConnectorFactory(NodeManager nodeManager, Set<SystemTable> tables)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.tables = ImmutableSet.copyOf(requireNonNull(tables, "tables is null"));
}

@Override
public String getName()
{
return SystemConnector.NAME;
}

@Override
public Connector create(String connectorId, Map<String, String> config)
{
return new SystemConnector(connectorId, nodeManager, tables);
}
}
Expand Up @@ -19,31 +19,44 @@
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;

import static java.util.Objects.requireNonNull;

public class SystemHandleResolver
implements ConnectorHandleResolver
{
private final String connectorId;

public SystemHandleResolver(String connectorId)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
}

@Override
public boolean canHandle(ConnectorTableHandle tableHandle)
{
return tableHandle instanceof SystemTableHandle;
return (tableHandle instanceof SystemTableHandle) &&
((SystemTableHandle) tableHandle).getConnectorId().equals(connectorId);
}

@Override
public boolean canHandle(ColumnHandle columnHandle)
{
return columnHandle instanceof SystemColumnHandle;
return (columnHandle instanceof SystemColumnHandle) &&
((SystemColumnHandle) columnHandle).getConnectorId().equals(connectorId);
}

@Override
public boolean canHandle(ConnectorSplit split)
{
return split instanceof SystemSplit;
return (split instanceof SystemSplit) &&
((SystemSplit) split).getConnectorId().equals(connectorId);
}

@Override
public boolean canHandle(ConnectorTableLayoutHandle handle)
{
return handle instanceof SystemTableLayoutHandle;
return (handle instanceof SystemTableLayoutHandle) &&
((SystemTableLayoutHandle) handle).getConnectorId().equals(connectorId);
}

@Override
Expand Down
Expand Up @@ -30,21 +30,24 @@
public class SystemSplit
implements ConnectorSplit
{
private final String connectorId;
private final SystemTableHandle tableHandle;
private final List<HostAddress> addresses;
private final TupleDomain<ColumnHandle> constraint;

public SystemSplit(SystemTableHandle tableHandle, HostAddress address, TupleDomain<ColumnHandle> constraint)
public SystemSplit(String connectorId, SystemTableHandle tableHandle, HostAddress address, TupleDomain<ColumnHandle> constraint)
{
this(tableHandle, ImmutableList.of(requireNonNull(address, "address is null")), constraint);
this(connectorId, tableHandle, ImmutableList.of(requireNonNull(address, "address is null")), constraint);
}

@JsonCreator
public SystemSplit(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("tableHandle") SystemTableHandle tableHandle,
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");

requireNonNull(addresses, "hosts is null");
Expand All @@ -53,6 +56,12 @@ public SystemSplit(
this.constraint = requireNonNull(constraint, "constraint is null");
}

@JsonProperty
public String getConnectorId()
{
return connectorId;
}

@Override
public boolean isRemotelyAccessible()
{
Expand Down Expand Up @@ -88,6 +97,7 @@ public Object getInfo()
public String toString()
{
return toStringHelper(this)
.add("connectorId", connectorId)
.add("tableHandle", tableHandle)
.add("addresses", addresses)
.toString();
Expand Down
Expand Up @@ -65,7 +65,7 @@ public ConnectorSplitSource getSplits(ConnectorSession session, ConnectorTableLa
Distribution tableDistributionMode = systemTable.getDistribution();
if (tableDistributionMode == SINGLE_COORDINATOR) {
HostAddress address = nodeManager.getCurrentNode().getHostAndPort();
ConnectorSplit split = new SystemSplit(tableHandle, address, constraint);
ConnectorSplit split = new SystemSplit(tableHandle.getConnectorId(), tableHandle, address, constraint);
return new FixedSplitSource(SystemConnector.NAME, ImmutableList.of(split));
}

Expand All @@ -79,7 +79,7 @@ else if (tableDistributionMode == ALL_NODES) {
}
Set<Node> nodeSet = nodes.build();
for (Node node : nodeSet) {
splits.add(new SystemSplit(tableHandle, node.getHostAndPort(), constraint));
splits.add(new SystemSplit(tableHandle.getConnectorId(), tableHandle, node.getHostAndPort(), constraint));
}
return new FixedSplitSource(SystemConnector.NAME, splits.build());
}
Expand Down
Expand Up @@ -27,21 +27,31 @@
public class SystemTableHandle
implements ConnectorTableHandle
{
private final String connectorId;
private final String schemaName;
private final String tableName;

@JsonCreator
public SystemTableHandle(@JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName)
public SystemTableHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.schemaName = checkSchemaName(schemaName);
this.tableName = checkTableName(tableName);
}

public SystemTableHandle(SchemaTableName tableName)
public static SystemTableHandle fromSchemaTableName(String connectorId, SchemaTableName tableName)
{
requireNonNull(tableName, "tableName is null");
this.schemaName = tableName.getSchemaName();
this.tableName = tableName.getTableName();
return new SystemTableHandle(connectorId, tableName.getSchemaName(), tableName.getTableName());
}

@JsonProperty
public String getConnectorId()
{
return connectorId;
}

@JsonProperty
Expand All @@ -64,13 +74,13 @@ public SchemaTableName getSchemaTableName()
@Override
public String toString()
{
return "system:" + schemaName + "." + tableName;
return connectorId + ":" + schemaName + "." + tableName;
}

@Override
public int hashCode()
{
return Objects.hash(schemaName, tableName);
return Objects.hash(connectorId, schemaName, tableName);
}

@Override
Expand All @@ -83,7 +93,8 @@ public boolean equals(Object obj)
return false;
}
final SystemTableHandle other = (SystemTableHandle) obj;
return Objects.equals(this.schemaName, other.schemaName) &&
return Objects.equals(this.connectorId, other.connectorId) &&
Objects.equals(this.schemaName, other.schemaName) &&
Objects.equals(this.tableName, other.tableName);
}
}
Expand Up @@ -24,18 +24,27 @@
public class SystemTableLayoutHandle
implements ConnectorTableLayoutHandle
{
private final String connectorId;
private final SystemTableHandle table;
private final TupleDomain<ColumnHandle> constraint;

@JsonCreator
public SystemTableLayoutHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("table") SystemTableHandle table,
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.table = requireNonNull(table, "table is null");
this.constraint = requireNonNull(constraint, "constraint is null");
}

@JsonProperty
public String getConnectorId()
{
return connectorId;
}

@JsonProperty
public SystemTableHandle getTable()
{
Expand Down

0 comments on commit 4f47ea7

Please sign in to comment.