Skip to content

Commit

Permalink
Port system connector to TableLayout API
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Jul 22, 2015
1 parent 42297fc commit a135cb6
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 69 deletions.
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;

public class SystemHandleResolver
implements ConnectorHandleResolver
Expand All @@ -39,12 +40,24 @@ public boolean canHandle(ConnectorSplit split)
return split instanceof SystemSplit;
}

@Override
public boolean canHandle(ConnectorTableLayoutHandle handle)
{
return handle instanceof SystemTableLayoutHandle;
}

@Override
public Class<? extends ConnectorTableHandle> getTableHandleClass()
{
return SystemTableHandle.class;
}

@Override
public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
{
return SystemTableLayoutHandle.class;
}

@Override
public Class<? extends ColumnHandle> getColumnHandleClass()
{
Expand Down
Expand Up @@ -13,30 +13,22 @@
*/
package com.facebook.presto.connector.system;

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPartition;
import com.facebook.presto.spi.ConnectorPartitionResult;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitManager;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.TupleDomain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.facebook.presto.util.Types.checkType;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Maps.uniqueIndex;

Expand All @@ -53,75 +45,22 @@ public SystemSplitManager(NodeManager nodeManager, Set<SystemTable> tables)
}

@Override
public ConnectorPartitionResult getPartitions(ConnectorTableHandle table, TupleDomain<ColumnHandle> tupleDomain)
public ConnectorSplitSource getSplits(ConnectorTableLayoutHandle layout)
{
checkNotNull(tupleDomain, "tupleDomain is null");
SystemTableHandle systemTableHandle = checkType(table, SystemTableHandle.class, "table");

List<ConnectorPartition> partitions = ImmutableList.<ConnectorPartition>of(new SystemPartition(systemTableHandle));
return new ConnectorPartitionResult(partitions, tupleDomain);
}

@Override
public ConnectorSplitSource getPartitionSplits(ConnectorTableHandle table, List<ConnectorPartition> partitions)
{
checkNotNull(partitions, "partitions is null");
if (partitions.isEmpty()) {
return new FixedSplitSource(SystemConnector.NAME, ImmutableList.<ConnectorSplit>of());
}

ConnectorPartition partition = Iterables.getOnlyElement(partitions);
SystemPartition systemPartition = checkType(partition, SystemPartition.class, "partition");

SystemTable systemTable = tables.get(systemPartition.getTableHandle().getSchemaTableName());
checkArgument(systemTable != null, "Table %s does not exist", systemPartition.getTableHandle().getTableName());
SystemTableLayoutHandle layoutHandle = checkType(layout, SystemTableLayoutHandle.class, "layout");
SystemTableHandle tableHandle = layoutHandle.getTable();
SystemTable systemTable = tables.get(tableHandle.getSchemaTableName());

if (systemTable.isDistributed()) {
ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder();
for (Node node : nodeManager.getActiveNodes()) {
splits.add(new SystemSplit(systemPartition.getTableHandle(), node.getHostAndPort()));
splits.add(new SystemSplit(tableHandle, node.getHostAndPort()));
}
return new FixedSplitSource(SystemConnector.NAME, splits.build());
}

HostAddress address = nodeManager.getCurrentNode().getHostAndPort();
ConnectorSplit split = new SystemSplit(systemPartition.getTableHandle(), address);
ConnectorSplit split = new SystemSplit(tableHandle, address);
return new FixedSplitSource(SystemConnector.NAME, ImmutableList.of(split));
}

public static class SystemPartition
implements ConnectorPartition
{
private final SystemTableHandle tableHandle;

public SystemPartition(SystemTableHandle tableHandle)
{
this.tableHandle = checkNotNull(tableHandle, "tableHandle is null");
}

public SystemTableHandle getTableHandle()
{
return tableHandle;
}

@Override
public String getPartitionId()
{
return tableHandle.getSchemaTableName().toString();
}

@Override
public TupleDomain<ColumnHandle> getTupleDomain()
{
return TupleDomain.all();
}

@Override
public String toString()
{
return toStringHelper(this)
.add("tableHandle", tableHandle)
.toString();
}
}
}
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.connector.system;

import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class SystemTableLayoutHandle
implements ConnectorTableLayoutHandle
{
private final SystemTableHandle table;

@JsonCreator
public SystemTableLayoutHandle(@JsonProperty("table") SystemTableHandle table)
{
this.table = table;
}

@JsonProperty
public SystemTableHandle getTable()
{
return table;
}

@Override
public String toString()
{
return table.toString();
}
}
Expand Up @@ -13,21 +13,27 @@
*/
package com.facebook.presto.connector.system;

import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayout;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.ReadOnlyConnectorMetadata;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.TupleDomain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.connector.system.SystemColumnHandle.toSystemColumnHandles;
Expand Down Expand Up @@ -76,6 +82,27 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
return new SystemTableHandle(tableName);
}

@Override
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
{
SystemTableHandle tableHandle = checkType(table, SystemTableHandle.class, "table");
ConnectorTableLayout layout = new ConnectorTableLayout(
new SystemTableLayoutHandle(tableHandle),
Optional.empty(),
TupleDomain.<ColumnHandle>all(),
Optional.empty(),
Optional.empty(),
ImmutableList.of());
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
}

@Override
public ConnectorTableLayout getTableLayout(ConnectorTableLayoutHandle handle)
{
SystemTableLayoutHandle layout = checkType(handle, SystemTableLayoutHandle.class, "layout");
return new ConnectorTableLayout(layout, Optional.empty(), TupleDomain.<ColumnHandle>all(), Optional.empty(), Optional.empty(), ImmutableList.of());
}

@Override
public ConnectorTableMetadata getTableMetadata(ConnectorTableHandle tableHandle)
{
Expand Down

0 comments on commit a135cb6

Please sign in to comment.