Skip to content

Commit

Permalink
Make info_schema connector transactional
Browse files Browse the repository at this point in the history
  • Loading branch information
erichwang committed Dec 28, 2015
1 parent 04c5515 commit 03a92db
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 31 deletions.
Expand Up @@ -175,7 +175,7 @@ private synchronized void addCatalogConnector(String catalogName, String connect
{
addConnectorInternal(ConnectorType.STANDARD, catalogName, connectorId, connector);
String informationSchemaId = makeInformationSchemaConnectorId(connectorId);
addConnectorInternal(ConnectorType.INFORMATION_SCHEMA, catalogName, informationSchemaId, new LegacyTransactionConnector(informationSchemaId, new InformationSchemaConnector(informationSchemaId, catalogName, nodeManager, metadataManager)));
addConnectorInternal(ConnectorType.INFORMATION_SCHEMA, catalogName, informationSchemaId, new InformationSchemaConnector(informationSchemaId, catalogName, nodeManager, metadataManager));
String systemId = makeSystemTablesConnectorId(connectorId);
addConnectorInternal(ConnectorType.SYSTEM, catalogName, systemId, new LegacyTransactionConnector(systemId, new SystemConnector(systemId, nodeManager, connector.getSystemTables())));

Expand Down
Expand Up @@ -14,22 +14,26 @@
package com.facebook.presto.connector.informationSchema;

import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorPageSourceProvider;
import com.facebook.presto.spi.ConnectorSplitManager;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.TransactionalConnectorPageSourceProvider;
import com.facebook.presto.spi.TransactionalConnectorSplitManager;
import com.facebook.presto.spi.transaction.ConnectorTransactionHandle;
import com.facebook.presto.spi.transaction.IsolationLevel;
import com.facebook.presto.spi.transaction.TransactionalConnectorMetadata;
import com.facebook.presto.transaction.InternalTransactionalConnector;
import com.facebook.presto.transaction.TransactionId;

import static java.util.Objects.requireNonNull;

public class InformationSchemaConnector
implements Connector
implements InternalTransactionalConnector
{
private final String connectorId;
private final ConnectorHandleResolver handleResolver;
private final ConnectorMetadata metadata;
private final ConnectorSplitManager splitManager;
private final ConnectorPageSourceProvider pageSourceProvider;
private final TransactionalConnectorMetadata metadata;
private final TransactionalConnectorSplitManager splitManager;
private final TransactionalConnectorPageSourceProvider pageSourceProvider;

public InformationSchemaConnector(String connectorId, String catalogName, NodeManager nodeManager, Metadata metadata)
{
Expand All @@ -38,32 +42,39 @@ public InformationSchemaConnector(String connectorId, String catalogName, NodeMa
requireNonNull(nodeManager, "nodeManager is null");
requireNonNull(metadata, "metadata is null");

this.connectorId = connectorId;
this.handleResolver = new InformationSchemaHandleResolver(connectorId);
this.metadata = new InformationSchemaMetadata(connectorId, catalogName);
this.splitManager = new InformationSchemaSplitManager(nodeManager);
this.pageSourceProvider = new InformationSchemaPageSourceProvider(metadata);
}

@Override
public ConnectorTransactionHandle beginTransaction(TransactionId transactionId, IsolationLevel isolationLevel, boolean readOnly)
{
return new InformationSchemaTransactionHandle(connectorId, transactionId);
}

@Override
public ConnectorHandleResolver getHandleResolver()
{
return handleResolver;
}

@Override
public ConnectorMetadata getMetadata()
public TransactionalConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
{
return metadata;
}

@Override
public ConnectorSplitManager getSplitManager()
public TransactionalConnectorSplitManager getSplitManager()
{
return splitManager;
}

@Override
public ConnectorPageSourceProvider getPageSourceProvider()
public TransactionalConnectorPageSourceProvider getPageSourceProvider()
{
return pageSourceProvider;
}
Expand Down
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.transaction.ConnectorTransactionHandle;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -51,6 +52,13 @@ public boolean canHandle(ConnectorSplit split)
((InformationSchemaSplit) split).getConnectorId().equals(connectorId);
}

@Override
public boolean canHandle(ConnectorTransactionHandle transactionHandle)
{
return (transactionHandle instanceof InformationSchemaTransactionHandle) &&
((InformationSchemaTransactionHandle) transactionHandle).getConnectorId().equals(connectorId);
}

@Override
public Class<? extends ConnectorTableHandle> getTableHandleClass()
{
Expand All @@ -68,4 +76,10 @@ public Class<? extends ConnectorSplit> getSplitClass()
{
return InformationSchemaSplit.class;
}

@Override
public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
{
return InformationSchemaTransactionHandle.class;
}
}
Expand Up @@ -15,12 +15,12 @@

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.transaction.TransactionalConnectorMetadata;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

Expand All @@ -42,7 +42,7 @@
import static java.util.Objects.requireNonNull;

public class InformationSchemaMetadata
implements ConnectorMetadata
implements TransactionalConnectorMetadata
{
public static final String INFORMATION_SCHEMA = "information_schema";

Expand Down Expand Up @@ -161,7 +161,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn

ConnectorTableMetadata tableMetadata = TABLES.get(informationSchemaTableHandle.getSchemaTableName());

return toInformationSchemaColumnHandles(informationSchemaTableHandle.getConnectorId(), tableMetadata);
return toInformationSchemaColumnHandles(connectorId, tableMetadata);
}

@Override
Expand Down
Expand Up @@ -28,25 +28,24 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorPageSourceProvider;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.FixedPageSource;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TransactionalConnectorPageSourceProvider;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.predicate.NullableValue;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.transaction.ConnectorTransactionHandle;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.airlift.slice.Slice;

import javax.inject.Inject;

import java.lang.invoke.MethodHandle;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -71,20 +70,19 @@
import static java.util.Objects.requireNonNull;

public class InformationSchemaPageSourceProvider
implements ConnectorPageSourceProvider
implements TransactionalConnectorPageSourceProvider
{
private final Metadata metadata;

@Inject
public InformationSchemaPageSourceProvider(Metadata metadata)
{
this.metadata = requireNonNull(metadata, "metadata is null");
}

@Override
public ConnectorPageSource createPageSource(ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns)
public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns)
{
InternalTable table = getInternalTable(session, split, columns);
InternalTable table = getInternalTable(transactionHandle, session, split, columns);

List<Integer> channels = new ArrayList<>();
for (ColumnHandle column : columns) {
Expand All @@ -104,8 +102,9 @@ public ConnectorPageSource createPageSource(ConnectorSession session, ConnectorS
return new FixedPageSource(pages.build());
}

private InternalTable getInternalTable(ConnectorSession connectorSession, ConnectorSplit connectorSplit, List<ColumnHandle> columns)
private InternalTable getInternalTable(ConnectorTransactionHandle transactionHandle, ConnectorSession connectorSession, ConnectorSplit connectorSplit, List<ColumnHandle> columns)
{
InformationSchemaTransactionHandle transaction = checkType(transactionHandle, InformationSchemaTransactionHandle.class, "transaction");
InformationSchemaSplit split = checkType(connectorSplit, InformationSchemaSplit.class, "split");

requireNonNull(columns, "columns is null");
Expand All @@ -114,6 +113,7 @@ private InternalTable getInternalTable(ConnectorSession connectorSession, Connec
Map<String, NullableValue> filters = split.getFilters();

Session session = Session.builder(metadata.getSessionPropertyManager())
.setTransactionId(transaction.getTransactionId())
.setQueryId(new QueryId(connectorSession.getQueryId()))
.setIdentity(connectorSession.getIdentity())
.setSource("information_schema")
Expand Down
Expand Up @@ -41,6 +41,7 @@ public InformationSchemaSplit(
@JsonProperty("tableHandle") InformationSchemaTableHandle tableHandle,
@JsonProperty("filters") Map<String, NullableValue> filters,
@JsonProperty("addresses") List<HostAddress> addresses)

{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
Expand Down
Expand Up @@ -18,20 +18,19 @@
import com.facebook.presto.spi.ConnectorPartitionResult;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitManager;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.TransactionalConnectorSplitManager;
import com.facebook.presto.spi.predicate.NullableValue;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.transaction.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;

import javax.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -41,18 +40,17 @@
import static java.util.Objects.requireNonNull;

public class InformationSchemaSplitManager
implements ConnectorSplitManager
implements TransactionalConnectorSplitManager
{
private final NodeManager nodeManager;

@Inject
public InformationSchemaSplitManager(NodeManager nodeManager)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
}

@Override
public ConnectorPartitionResult getPartitions(ConnectorSession session, ConnectorTableHandle table, TupleDomain<ColumnHandle> tupleDomain)
public ConnectorPartitionResult getPartitions(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableHandle table, TupleDomain<ColumnHandle> tupleDomain)
{
requireNonNull(tupleDomain, "tupleDomain is null");
InformationSchemaTableHandle informationSchemaTableHandle = checkType(table, InformationSchemaTableHandle.class, "table");
Expand All @@ -66,7 +64,7 @@ public ConnectorPartitionResult getPartitions(ConnectorSession session, Connecto
}

@Override
public ConnectorSplitSource getPartitionSplits(ConnectorSession session, ConnectorTableHandle table, List<ConnectorPartition> partitions)
public ConnectorSplitSource getPartitionSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableHandle table, List<ConnectorPartition> partitions)
{
requireNonNull(partitions, "partitions is null");
if (partitions.isEmpty()) {
Expand Down
@@ -0,0 +1,81 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.connector.informationSchema;

import com.facebook.presto.spi.transaction.ConnectorTransactionHandle;
import com.facebook.presto.transaction.TransactionId;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class InformationSchemaTransactionHandle
implements ConnectorTransactionHandle
{
private final String connectorId;
private final TransactionId transactionId;

@JsonCreator
public InformationSchemaTransactionHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("transactionId") TransactionId transactionId)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.transactionId = requireNonNull(transactionId, "transactionId is null");
}

@JsonProperty
public String getConnectorId()
{
return connectorId;
}

@JsonProperty
public TransactionId getTransactionId()
{
return transactionId;
}

@Override
public int hashCode()
{
return Objects.hash(connectorId, transactionId);
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final InformationSchemaTransactionHandle other = (InformationSchemaTransactionHandle) obj;
return Objects.equals(this.connectorId, other.connectorId) &&
Objects.equals(this.transactionId, other.transactionId);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("connectorId", connectorId)
.add("transactionId", transactionId)
.toString();
}
}
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.transaction;

import com.facebook.presto.spi.transaction.ConnectorTransactionHandle;
import com.facebook.presto.spi.transaction.IsolationLevel;
import com.facebook.presto.spi.transaction.TransactionalConnector;

public interface InternalTransactionalConnector
extends TransactionalConnector
{
default ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly)
{
throw new UnsupportedOperationException();
}

ConnectorTransactionHandle beginTransaction(TransactionId transactionId, IsolationLevel isolationLevel, boolean readOnly);
}

0 comments on commit 03a92db

Please sign in to comment.