Skip to content

Commit

Permalink
Make SystemTables transactional
Browse files Browse the repository at this point in the history
  • Loading branch information
erichwang committed Dec 28, 2015
1 parent 03a92db commit 1e1df56
Show file tree
Hide file tree
Showing 38 changed files with 532 additions and 81 deletions.
Expand Up @@ -38,7 +38,6 @@
import com.facebook.presto.split.RecordPageSinkProvider;
import com.facebook.presto.split.RecordPageSourceProvider;
import com.facebook.presto.split.SplitManager;
import com.facebook.presto.transaction.LegacyTransactionConnector;
import com.facebook.presto.transaction.LegacyTransactionConnectorFactory;
import com.facebook.presto.transaction.TransactionManager;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -177,7 +176,11 @@ private synchronized void addCatalogConnector(String catalogName, String connect
String informationSchemaId = makeInformationSchemaConnectorId(connectorId);
addConnectorInternal(ConnectorType.INFORMATION_SCHEMA, catalogName, informationSchemaId, new InformationSchemaConnector(informationSchemaId, catalogName, nodeManager, metadataManager));
String systemId = makeSystemTablesConnectorId(connectorId);
addConnectorInternal(ConnectorType.SYSTEM, catalogName, systemId, new LegacyTransactionConnector(systemId, new SystemConnector(systemId, nodeManager, connector.getSystemTables())));
addConnectorInternal(ConnectorType.SYSTEM, catalogName, systemId, new SystemConnector(
systemId,
nodeManager,
connector.getSystemTables(),
transactionId -> transactionManager.getTransactionHandle(transactionId, connectorId)));

// Register session and table properties once per catalog
metadataManager.getSessionPropertyManager().addConnectorSessionProperties(catalogName, connector.getSessionProperties());
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.transaction.ConnectorTransactionHandle;

import javax.inject.Inject;

Expand Down Expand Up @@ -62,7 +63,7 @@ public ConnectorTableMetadata getTableMetadata()
}

@Override
public RecordCursor cursor(ConnectorSession session, TupleDomain<Integer> constraint)
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
Builder table = InMemoryRecordSet.builder(CATALOG_TABLE);
for (Map.Entry<String, String> entry : metadata.getCatalogNames().entrySet()) {
Expand Down
@@ -0,0 +1,136 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.connector.system;

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.TransactionalConnectorPageSourceProvider;
import com.facebook.presto.spi.TransactionalConnectorSplitManager;
import com.facebook.presto.spi.transaction.ConnectorTransactionHandle;
import com.facebook.presto.spi.transaction.IsolationLevel;
import com.facebook.presto.spi.transaction.TransactionalConnectorMetadata;
import com.facebook.presto.transaction.InternalTransactionalConnector;
import com.facebook.presto.transaction.TransactionId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Objects.requireNonNull;

public class GlobalSystemConnector
implements InternalTransactionalConnector
{
public static final String NAME = "system";

private final String connectorId;
private final Set<SystemTable> systemTables;

public GlobalSystemConnector(String connectorId, Set<SystemTable> systemTables)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null"));
}

@Override
public ConnectorTransactionHandle beginTransaction(TransactionId transactionId, IsolationLevel isolationLevel, boolean readOnly)
{
return new GlobalSystemTransactionHandle(connectorId, transactionId);
}

@Override
public ConnectorHandleResolver getHandleResolver()
{
return new GlobalSystemHandleResolver(connectorId);
}

@Override
public TransactionalConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
{
return new TransactionalConnectorMetadata()
{
@Override
public List<String> listSchemaNames(ConnectorSession session)
{
return ImmutableList.of();
}

@Override
public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
return null;
}

@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
throw new UnsupportedOperationException();
}

@Override
public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull)
{
return ImmutableList.of();
}

@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new UnsupportedOperationException();
}

@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
throw new UnsupportedOperationException();
}

@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
return ImmutableMap.of();
}
};
}

@Override
public TransactionalConnectorSplitManager getSplitManager()
{
return new TransactionalConnectorSplitManager() {};
}

@Override
public TransactionalConnectorPageSourceProvider getPageSourceProvider()
{
return (transactionHandle, session, split, columns) -> {
throw new UnsupportedOperationException();
};
}

@Override
public Set<SystemTable> getSystemTables()
{
return systemTables;
}
}
Expand Up @@ -13,10 +13,9 @@
*/
package com.facebook.presto.connector.system;

import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorFactory;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.transaction.TransactionalConnector;
import com.facebook.presto.spi.transaction.TransactionalConnectorFactory;
import com.google.common.collect.ImmutableSet;

import javax.inject.Inject;
Expand All @@ -26,29 +25,26 @@

import static java.util.Objects.requireNonNull;

public class
SystemConnectorFactory
implements ConnectorFactory
public class GlobalSystemConnectorFactory
implements TransactionalConnectorFactory
{
private final NodeManager nodeManager;
private final Set<SystemTable> tables;

@Inject
public SystemConnectorFactory(NodeManager nodeManager, Set<SystemTable> tables)
public GlobalSystemConnectorFactory(Set<SystemTable> tables)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.tables = ImmutableSet.copyOf(requireNonNull(tables, "tables is null"));
}

@Override
public String getName()
{
return SystemConnector.NAME;
return GlobalSystemConnector.NAME;
}

@Override
public Connector create(String connectorId, Map<String, String> config)
public TransactionalConnector create(String connectorId, Map<String, String> config)
{
return new SystemConnector(connectorId, nodeManager, tables);
return new GlobalSystemConnector(connectorId, tables);
}
}
@@ -0,0 +1,82 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.connector.system;

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.transaction.ConnectorTransactionHandle;

import static java.util.Objects.requireNonNull;

public class GlobalSystemHandleResolver
implements ConnectorHandleResolver
{
private final String connectorId;

public GlobalSystemHandleResolver(String connectorId)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
}

@Override
public boolean canHandle(ConnectorTableHandle tableHandle)
{
return false;
}

@Override
public boolean canHandle(ColumnHandle columnHandle)
{
return false;
}

@Override
public boolean canHandle(ConnectorSplit split)
{
return false;
}

@Override
public boolean canHandle(ConnectorTransactionHandle transactionHandle)
{
return (transactionHandle instanceof GlobalSystemTransactionHandle) &&
((GlobalSystemTransactionHandle) transactionHandle).getConnectorId().equals(connectorId);
}

@Override
public Class<? extends ConnectorTableHandle> getTableHandleClass()
{
throw new UnsupportedOperationException();
}

@Override
public Class<? extends ColumnHandle> getColumnHandleClass()
{
throw new UnsupportedOperationException();
}

@Override
public Class<? extends ConnectorSplit> getSplitClass()
{
throw new UnsupportedOperationException();
}

@Override
public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
{
return GlobalSystemTransactionHandle.class;
}
}
@@ -0,0 +1,81 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.connector.system;

import com.facebook.presto.spi.transaction.ConnectorTransactionHandle;
import com.facebook.presto.transaction.TransactionId;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class GlobalSystemTransactionHandle
implements ConnectorTransactionHandle
{
private final String connectorId;
private final TransactionId transactionId;

@JsonCreator
public GlobalSystemTransactionHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("transactionId") TransactionId transactionId)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.transactionId = requireNonNull(transactionId, "transactionId is null");
}

@JsonProperty
public String getConnectorId()
{
return connectorId;
}

@JsonProperty
public TransactionId getTransactionId()
{
return transactionId;
}

@Override
public int hashCode()
{
return Objects.hash(connectorId, transactionId);
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final GlobalSystemTransactionHandle other = (GlobalSystemTransactionHandle) obj;
return Objects.equals(this.connectorId, other.connectorId) &&
Objects.equals(this.transactionId, other.transactionId);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("connectorId", connectorId)
.add("transactionId", transactionId)
.toString();
}
}

0 comments on commit 1e1df56

Please sign in to comment.