Skip to content

Commit

Permalink
Fix BigQuery FTE leaving temporary tables behind
Browse files Browse the repository at this point in the history
also isolates temporary table names to have a common prefix for the same query id
  • Loading branch information
mwd410 authored and losipiuk committed Feb 24, 2023
1 parent 028e4ae commit 6e52968
Show file tree
Hide file tree
Showing 12 changed files with 313 additions and 62 deletions.
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.base;

import io.trino.spi.connector.ConnectorSession;

import java.util.HexFormat;
import java.util.concurrent.ThreadLocalRandom;

import static java.util.Objects.requireNonNull;

public class TemporaryTables
{
private static final HexFormat hexFormat = HexFormat.of();
public static final String TEMPORARY_TABLE_NAME_PREFIX = "tmp_trino_";

public static String temporaryTableNamePrefix(String queryId)
{
requireNonNull(queryId, "queryId is null");
return String.format("%s%s_", TEMPORARY_TABLE_NAME_PREFIX, hexFormat.toHexDigits(queryId.hashCode()));
}

public static String generateTemporaryTableName(String queryId)
{
requireNonNull(queryId, "queryId is null");
return temporaryTableNamePrefix(queryId) + hexFormat.toHexDigits(ThreadLocalRandom.current().nextInt());
}

public static String generateTemporaryTableName(ConnectorSession session)
{
requireNonNull(session, "session is null");
return generateTemporaryTableName(session.getQueryId());
}

private TemporaryTables()
{
}
}
Expand Up @@ -60,7 +60,6 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;
Expand All @@ -74,6 +73,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.base.TemporaryTables.generateTemporaryTableName;
import static io.trino.plugin.jdbc.CaseSensitivity.CASE_INSENSITIVE;
import static io.trino.plugin.jdbc.CaseSensitivity.CASE_SENSITIVE;
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
Expand Down Expand Up @@ -566,10 +566,10 @@ public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
// Create the temporary table
ColumnMetadata pageSinkIdColumn = getPageSinkIdColumn(
tableMetadata.getColumns().stream().map(ColumnMetadata::getName).toList());
return createTable(session, tableMetadata, generateTemporaryTableName(), Optional.of(pageSinkIdColumn));
return createTable(session, tableMetadata, generateTemporaryTableName(session), Optional.of(pageSinkIdColumn));
}
else {
return createTable(session, tableMetadata, generateTemporaryTableName());
return createTable(session, tableMetadata, generateTemporaryTableName(session));
}
}
catch (SQLException e) {
Expand Down Expand Up @@ -698,7 +698,7 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTabl
Optional.empty());
}

String remoteTemporaryTableName = identifierMapping.toRemoteTableName(identity, connection, remoteSchema, generateTemporaryTableName());
String remoteTemporaryTableName = identifierMapping.toRemoteTableName(identity, connection, remoteSchema, generateTemporaryTableName(session));
copyTableSchema(session, connection, catalog, remoteSchema, remoteTable, remoteTemporaryTableName, columnNames.build());

Optional<ColumnMetadata> pageSinkIdColumn = Optional.empty();
Expand Down Expand Up @@ -743,11 +743,6 @@ protected void copyTableSchema(ConnectorSession session, Connection connection,
}
}

protected String generateTemporaryTableName()
{
return "tmp_trino_" + UUID.randomUUID().toString().replace("-", "");
}

@Override
public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle, Set<Long> pageSinkIds)
{
Expand Down Expand Up @@ -806,7 +801,7 @@ private RemoteTableName constructPageSinkIdsTable(ConnectorSession session, Conn
RemoteTableName pageSinkTable = new RemoteTableName(
Optional.ofNullable(handle.getCatalogName()),
Optional.ofNullable(handle.getSchemaName()),
generateTemporaryTableName());
generateTemporaryTableName(session));

int maxBatchSize = getWriteBatchSize(session);

Expand Down
Expand Up @@ -13,7 +13,7 @@
*/
package io.trino.plugin.bigquery;

import io.airlift.log.Logger;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
Expand All @@ -32,16 +32,12 @@
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;

public class BigQueryConnector
implements Connector
{
private static final Logger log = Logger.get(BigQueryConnector.class);

private final BigQueryMetadata metadata;
private final BigQueryTransactionManager transactionManager;
private final BigQuerySplitManager splitManager;
private final BigQueryPageSourceProvider pageSourceProvider;
private final BigQueryPageSinkProvider pageSinkProvider;
Expand All @@ -50,14 +46,14 @@ public class BigQueryConnector

@Inject
public BigQueryConnector(
BigQueryMetadata metadata,
BigQueryTransactionManager transactionManager,
BigQuerySplitManager splitManager,
BigQueryPageSourceProvider pageSourceProvider,
BigQueryPageSinkProvider pageSinkProvider,
Set<ConnectorTableFunction> connectorTableFunctions,
Set<SessionPropertiesProvider> sessionPropertiesProviders)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
Expand All @@ -70,15 +66,25 @@ public BigQueryConnector(
@Override
public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
{
log.debug("beginTransaction(isolationLevel=%s, readOnly=%s)", isolationLevel, readOnly);
checkConnectorSupports(READ_COMMITTED, isolationLevel);
return BigQueryTransactionHandle.INSTANCE;
return transactionManager.beginTransaction(isolationLevel, readOnly, autoCommit);
}

@Override
public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transaction)
{
return new ClassLoaderSafeConnectorMetadata(transactionManager.getMetadata(transaction), getClass().getClassLoader());
}

@Override
public void commit(ConnectorTransactionHandle transactionHandle)
{
transactionManager.commit(transactionHandle);
}

@Override
public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle)
public void rollback(ConnectorTransactionHandle transactionHandle)
{
return metadata;
transactionManager.rollback(transactionHandle);
}

@Override
Expand Down
Expand Up @@ -59,7 +59,8 @@ protected void setup(Binder binder)

// Connector implementation
binder.bind(BigQueryConnector.class).in(Scopes.SINGLETON);
binder.bind(BigQueryMetadata.class).in(Scopes.SINGLETON);
binder.bind(BigQueryMetadataFactory.class).to(DefaultBigQueryMetadataFactory.class).in(Scopes.SINGLETON);
binder.bind(BigQueryTransactionManager.class).in(Scopes.SINGLETON);
binder.bind(BigQuerySplitManager.class).in(Scopes.SINGLETON);
binder.bind(BigQueryPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(BigQueryPageSinkProvider.class).in(Scopes.SINGLETON);
Expand Down
Expand Up @@ -81,7 +81,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -94,6 +94,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.base.TemporaryTables.generateTemporaryTableName;
import static io.trino.plugin.bigquery.BigQueryClient.buildColumnHandles;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_DATASET_ERROR;
Expand All @@ -106,6 +107,7 @@
import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable;
import static io.trino.plugin.bigquery.BigQueryUtil.quote;
import static io.trino.plugin.bigquery.BigQueryUtil.quoted;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand All @@ -123,6 +125,7 @@ public class BigQueryMetadata
private static final String VIEW_DEFINITION_SYSTEM_TABLE_SUFFIX = "$view_definition";

private final BigQueryClientFactory bigQueryClientFactory;
private final AtomicReference<Runnable> rollbackAction = new AtomicReference<>();

@Inject
public BigQueryMetadata(BigQueryClientFactory bigQueryClientFactory)
Expand Down Expand Up @@ -400,6 +403,16 @@ public void dropSchema(ConnectorSession session, String schemaName)
client.dropSchema(DatasetId.of(projectId, remoteSchemaName));
}

private void setRollback(Runnable action)
{
checkState(rollbackAction.compareAndSet(null, action), "rollback action is already set");
}

public void rollback()
{
Optional.ofNullable(rollbackAction.getAndSet(null)).ifPresent(Runnable::run);
}

@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
Expand Down Expand Up @@ -450,12 +463,24 @@ private BigQueryOutputTableHandle createTable(ConnectorSession session, Connecto
String projectId = client.getProjectId();
String remoteSchemaName = getRemoteSchemaName(client, projectId, schemaName);

Closer closer = Closer.create();
setRollback(() -> {
try {
closer.close();
}
catch (IOException e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
}
});

TableId tableId = createTable(client, projectId, remoteSchemaName, tableName, fields.build(), tableMetadata.getComment());
closer.register(() -> bigQueryClientFactory.create(session).dropTable(tableId));

Optional<String> temporaryTableName = pageSinkIdColumn.map(column -> {
tempFields.add(toField(column.getName(), column.getType(), column.getComment()));
String tempTableName = generateTemporaryTableName();
createTable(client, projectId, remoteSchemaName, tempTableName, tempFields.build(), tableMetadata.getComment());
String tempTableName = generateTemporaryTableName(session);
TableId tempTableId = createTable(client, projectId, remoteSchemaName, tempTableName, tempFields.build(), tableMetadata.getComment());
closer.register(() -> bigQueryClientFactory.create(session).dropTable(tempTableId));
return tempTableName;
});

Expand Down Expand Up @@ -538,8 +563,9 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
String projectId = table.asPlainTable().getRemoteTableName().getProjectId();
String schemaName = table.asPlainTable().getRemoteTableName().getDatasetName();

String temporaryTableName = generateTemporaryTableName();
createTable(client, projectId, schemaName, temporaryTableName, tempFields.build(), Optional.empty());
String temporaryTableName = generateTemporaryTableName(session);
TableId temporaryTableId = createTable(client, projectId, schemaName, temporaryTableName, tempFields.build(), Optional.empty());
setRollback(() -> bigQueryClientFactory.create(session).dropTable(temporaryTableId));

return new BigQueryInsertTableHandle(
table.asPlainTable().getRemoteTableName(),
Expand All @@ -566,7 +592,7 @@ private Optional<ConnectorOutputMetadata> finishInsert(
RemoteTableName pageSinkTable = new RemoteTableName(
targetTable.getProjectId(),
targetTable.getDatasetName(),
generateTemporaryTableName());
generateTemporaryTableName(session));
createTable(client, pageSinkTable.getProjectId(), pageSinkTable.getDatasetName(), pageSinkTable.getTableName(), ImmutableList.of(toField(pageSinkIdColumnName, TRINO_PAGE_SINK_ID_COLUMN_TYPE, null)), Optional.empty());
closer.register(() -> bigQueryClientFactory.create(session).dropTable(pageSinkTable.toTableId()));

Expand Down Expand Up @@ -791,9 +817,4 @@ private static ColumnMetadata buildPageSinkIdColumn(List<String> otherColumnName
}
return new ColumnMetadata(columnName, TRINO_PAGE_SINK_ID_COLUMN_TYPE);
}

private static String generateTemporaryTableName()
{
return "tmp_trino_" + UUID.randomUUID().toString().replace("-", "");
}
}
@@ -0,0 +1,19 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.bigquery;

public interface BigQueryMetadataFactory
{
BigQueryMetadata create(BigQueryTransactionHandle transaction);
}
Expand Up @@ -13,10 +13,62 @@
*/
package io.trino.plugin.bigquery;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.connector.ConnectorTransactionHandle;

public enum BigQueryTransactionHandle
import java.util.Objects;
import java.util.UUID;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class BigQueryTransactionHandle
implements ConnectorTransactionHandle
{
INSTANCE
private final UUID uuid;

public BigQueryTransactionHandle()
{
this(UUID.randomUUID());
}

@JsonCreator
public BigQueryTransactionHandle(@JsonProperty("uuid") UUID uuid)
{
this.uuid = requireNonNull(uuid, "uuid is null");
}

@JsonProperty
public UUID getUuid()
{
return uuid;
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if ((obj == null) || (getClass() != obj.getClass())) {
return false;
}
BigQueryTransactionHandle other = (BigQueryTransactionHandle) obj;
return Objects.equals(uuid, other.uuid);
}

@Override
public int hashCode()
{
return Objects.hash(uuid);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("uuid", uuid)
.toString();
}
}

0 comments on commit 6e52968

Please sign in to comment.