Skip to content

Commit

Permalink
Formalize InformationSchema as a Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
erichwang committed Dec 11, 2015
1 parent 65e079a commit 6dc9b56
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 67 deletions.
Expand Up @@ -13,9 +13,7 @@
*/
package com.facebook.presto.connector;

import com.facebook.presto.connector.informationSchema.InformationSchemaMetadata;
import com.facebook.presto.connector.informationSchema.InformationSchemaPageSourceProvider;
import com.facebook.presto.connector.informationSchema.InformationSchemaSplitManager;
import com.facebook.presto.connector.informationSchema.InformationSchemaConnector;
import com.facebook.presto.connector.system.SystemConnector;
import com.facebook.presto.index.IndexManager;
import com.facebook.presto.metadata.HandleResolver;
Expand Down Expand Up @@ -244,9 +242,11 @@ private synchronized void addConnector(String catalogName, String connectorId, C

metadataManager.addConnectorMetadata(connectorId, catalogName, connectorMetadata);

metadataManager.addInformationSchemaMetadata(makeInformationSchemaConnectorId(connectorId), catalogName, new InformationSchemaMetadata(catalogName));
splitManager.addConnectorSplitManager(makeInformationSchemaConnectorId(connectorId), new InformationSchemaSplitManager(nodeManager));
pageSourceManager.addConnectorPageSourceProvider(makeInformationSchemaConnectorId(connectorId), new InformationSchemaPageSourceProvider(metadataManager));
Connector informationSchemaConnector = new InformationSchemaConnector(makeInformationSchemaConnectorId(connectorId), catalogName, nodeManager, metadataManager);
handleResolver.addHandleResolver(makeInformationSchemaConnectorId(connectorId), informationSchemaConnector.getHandleResolver());
metadataManager.addInformationSchemaMetadata(makeInformationSchemaConnectorId(connectorId), catalogName, informationSchemaConnector.getMetadata());
splitManager.addConnectorSplitManager(makeInformationSchemaConnectorId(connectorId), informationSchemaConnector.getSplitManager());
pageSourceManager.addConnectorPageSourceProvider(makeInformationSchemaConnectorId(connectorId), informationSchemaConnector.getPageSourceProvider());

Connector systemConnector = new SystemConnector(nodeManager, systemTables);
metadataManager.addSystemTablesMetadata(makeSystemTablesConnectorId(connectorId), catalogName, systemConnector.getMetadata());
Expand Down
Expand Up @@ -23,15 +23,27 @@
import java.util.Map;
import java.util.Objects;

import static java.util.Objects.requireNonNull;

public class InformationSchemaColumnHandle
implements ColumnHandle
{
private final String connectorId;
private final String columnName;

@JsonCreator
public InformationSchemaColumnHandle(@JsonProperty("columnName") String columnName)
public InformationSchemaColumnHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("columnName") String columnName)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.columnName = requireNonNull(columnName, "columnName is null");
}

@JsonProperty
public String getConnectorId()
{
this.columnName = columnName;
return connectorId;
}

@JsonProperty
Expand All @@ -43,7 +55,7 @@ public String getColumnName()
@Override
public int hashCode()
{
return Objects.hash(columnName);
return Objects.hash(connectorId, columnName);
}

@Override
Expand All @@ -56,20 +68,21 @@ public boolean equals(Object obj)
return false;
}
final InformationSchemaColumnHandle other = (InformationSchemaColumnHandle) obj;
return Objects.equals(this.columnName, other.columnName);
return Objects.equals(this.connectorId, other.connectorId) &&
Objects.equals(this.columnName, other.columnName);
}

@Override
public String toString()
{
return "information_schema:" + columnName;
return connectorId + ":" + columnName;
}

public static Map<String, ColumnHandle> toInformationSchemaColumnHandles(ConnectorTableMetadata tableMetadata)
public static Map<String, ColumnHandle> toInformationSchemaColumnHandles(String connectorId, ConnectorTableMetadata tableMetadata)
{
ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (ColumnMetadata columnMetadata : tableMetadata.getColumns()) {
columnHandles.put(columnMetadata.getName(), new InformationSchemaColumnHandle(columnMetadata.getName()));
columnHandles.put(columnMetadata.getName(), new InformationSchemaColumnHandle(connectorId, columnMetadata.getName()));
}

return columnHandles.build();
Expand Down
@@ -0,0 +1,70 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.connector.informationSchema;

import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorPageSourceProvider;
import com.facebook.presto.spi.ConnectorSplitManager;
import com.facebook.presto.spi.NodeManager;

import static java.util.Objects.requireNonNull;

public class InformationSchemaConnector
implements Connector
{
private final ConnectorHandleResolver handleResolver;
private final ConnectorMetadata metadata;
private final ConnectorSplitManager splitManager;
private final ConnectorPageSourceProvider pageSourceProvider;

public InformationSchemaConnector(String connectorId, String catalogName, NodeManager nodeManager, Metadata metadata)
{
requireNonNull(connectorId, "connectorId is null");
requireNonNull(catalogName, "catalogName is null");
requireNonNull(nodeManager, "nodeManager is null");
requireNonNull(metadata, "metadata is null");

this.handleResolver = new InformationSchemaHandleResolver(connectorId);
this.metadata = new InformationSchemaMetadata(connectorId, catalogName);
this.splitManager = new InformationSchemaSplitManager(nodeManager);
this.pageSourceProvider = new InformationSchemaPageSourceProvider(metadata);
}

@Override
public ConnectorHandleResolver getHandleResolver()
{
return handleResolver;
}

@Override
public ConnectorMetadata getMetadata()
{
return metadata;
}

@Override
public ConnectorSplitManager getSplitManager()
{
return splitManager;
}

@Override
public ConnectorPageSourceProvider getPageSourceProvider()
{
return pageSourceProvider;
}
}
Expand Up @@ -18,25 +18,37 @@
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle;

import static java.util.Objects.requireNonNull;

public class InformationSchemaHandleResolver
implements ConnectorHandleResolver
{
private final String connectorId;

public InformationSchemaHandleResolver(String connectorId)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
}

@Override
public boolean canHandle(ConnectorTableHandle tableHandle)
{
return tableHandle instanceof InformationSchemaTableHandle;
return (tableHandle instanceof InformationSchemaTableHandle) &&
((InformationSchemaTableHandle) tableHandle).getConnectorId().equals(connectorId);
}

@Override
public boolean canHandle(ColumnHandle columnHandle)
{
return columnHandle instanceof InformationSchemaColumnHandle;
return (columnHandle instanceof InformationSchemaColumnHandle) &&
((InformationSchemaColumnHandle) columnHandle).getConnectorId().equals(connectorId);
}

@Override
public boolean canHandle(ConnectorSplit split)
{
return split instanceof InformationSchemaSplit;
return (split instanceof InformationSchemaSplit) &&
((InformationSchemaSplit) split).getConnectorId().equals(connectorId);
}

@Override
Expand Down
Expand Up @@ -91,11 +91,13 @@ public class InformationSchemaMetadata
.build())
.build();

private final String connectorId;
private final String catalogName;

public InformationSchemaMetadata(String catalogName)
public InformationSchemaMetadata(String connectorId, String catalogName)
{
this.catalogName = catalogName;
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.catalogName = requireNonNull(catalogName, "catalogName is null");
}

private InformationSchemaTableHandle checkTableHandle(ConnectorTableHandle tableHandle)
Expand All @@ -119,7 +121,7 @@ public ConnectorTableHandle getTableHandle(ConnectorSession connectorSession, Sc
return null;
}

return new InformationSchemaTableHandle(catalogName, tableName.getSchemaName(), tableName.getTableName());
return new InformationSchemaTableHandle(connectorId, catalogName, tableName.getSchemaName(), tableName.getTableName());
}

@Override
Expand Down Expand Up @@ -159,7 +161,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn

ConnectorTableMetadata tableMetadata = TABLES.get(informationSchemaTableHandle.getSchemaTableName());

return toInformationSchemaColumnHandles(tableMetadata);
return toInformationSchemaColumnHandles(informationSchemaTableHandle.getConnectorId(), tableMetadata);
}

@Override
Expand Down

This file was deleted.

Expand Up @@ -30,16 +30,19 @@
public class InformationSchemaSplit
implements ConnectorSplit
{
private final String connectorId;
private final InformationSchemaTableHandle tableHandle;
private final Map<String, NullableValue> filters;
private final List<HostAddress> addresses;

@JsonCreator
public InformationSchemaSplit(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("tableHandle") InformationSchemaTableHandle tableHandle,
@JsonProperty("filters") Map<String, NullableValue> filters,
@JsonProperty("addresses") List<HostAddress> addresses)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.filters = requireNonNull(filters, "filters is null");

Expand All @@ -48,6 +51,12 @@ public InformationSchemaSplit(
this.addresses = ImmutableList.copyOf(addresses);
}

@JsonProperty
public String getConnectorId()
{
return connectorId;
}

@Override
public boolean isRemotelyAccessible()
{
Expand Down Expand Up @@ -83,6 +92,7 @@ public Object getInfo()
public String toString()
{
return toStringHelper(this)
.add("connectorId", connectorId)
.add("tableHandle", tableHandle)
.add("filters", filters)
.add("addresses", addresses)
Expand Down
Expand Up @@ -73,6 +73,8 @@ public ConnectorSplitSource getPartitionSplits(ConnectorSession session, Connect
return new FixedSplitSource(null, ImmutableList.<ConnectorSplit>of());
}

InformationSchemaTableHandle tableHandle = checkType(table, InformationSchemaTableHandle.class, "table");

ConnectorPartition partition = Iterables.getOnlyElement(partitions);
InformationSchemaPartition informationSchemaPartition = checkType(partition, InformationSchemaPartition.class, "partition");

Expand All @@ -84,7 +86,7 @@ public ConnectorSplitSource getPartitionSplits(ConnectorSession session, Connect
filters.put(informationSchemaColumnHandle.getColumnName(), entry.getValue());
}

ConnectorSplit split = new InformationSchemaSplit(informationSchemaPartition.getTable(), filters.build(), localAddress);
ConnectorSplit split = new InformationSchemaSplit(tableHandle.getConnectorId(), informationSchemaPartition.getTable(), filters.build(), localAddress);

return new FixedSplitSource(null, ImmutableList.of(split));
}
Expand Down
Expand Up @@ -25,21 +25,30 @@
public class InformationSchemaTableHandle
implements ConnectorTableHandle
{
private final String connectorId;
private final String catalogName;
private final String schemaName;
private final String tableName;

@JsonCreator
public InformationSchemaTableHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("catalogName") String catalogName,
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
}

@JsonProperty
public String getConnectorId()
{
return connectorId;
}

@JsonProperty
public String getCatalogName()
{
Expand All @@ -66,13 +75,13 @@ public SchemaTableName getSchemaTableName()
@Override
public String toString()
{
return "information_schema:" + catalogName + ":" + schemaName + ":" + tableName;
return connectorId + ":" + catalogName + ":" + schemaName + ":" + tableName;
}

@Override
public int hashCode()
{
return Objects.hash(catalogName, schemaName, tableName);
return Objects.hash(connectorId, catalogName, schemaName, tableName);
}

@Override
Expand All @@ -85,7 +94,8 @@ public boolean equals(Object obj)
return false;
}
InformationSchemaTableHandle other = (InformationSchemaTableHandle) obj;
return Objects.equals(this.catalogName, other.catalogName) &&
return Objects.equals(this.connectorId, other.connectorId) &&
Objects.equals(this.catalogName, other.catalogName) &&
Objects.equals(this.schemaName, other.schemaName) &&
Objects.equals(this.tableName, other.tableName);
}
Expand Down

0 comments on commit 6dc9b56

Please sign in to comment.