Skip to content

Commit

Permalink
Get table comments from RDBMS in bulk
Browse files Browse the repository at this point in the history
Improve speed of getting table comments in JDBC connectors manifold. Get
them all at once.
  • Loading branch information
findepi committed Mar 25, 2024
1 parent 7c5d56e commit 7eb6b09
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
Expand Down Expand Up @@ -205,6 +206,34 @@ public List<SchemaTableName> getTableNames(ConnectorSession session, Optional<St
}
}

@Override
public List<RelationCommentMetadata> getAllTableComments(ConnectorSession session, Optional<String> schema)
{
try (Connection connection = connectionFactory.openConnection(session)) {
ConnectorIdentity identity = session.getIdentity();
Optional<String> remoteSchema = schema.map(schemaName -> identifierMapping.toRemoteSchemaName(getRemoteIdentifiers(connection), identity, schemaName));
if (remoteSchema.isPresent() && !filterSchema(remoteSchema.get())) {
return ImmutableList.of();
}

try (ResultSet resultSet = getTables(connection, remoteSchema, Optional.empty())) {
ImmutableList.Builder<RelationCommentMetadata> list = ImmutableList.builder();
while (resultSet.next()) {
String remoteSchemaFromResultSet = getTableSchemaName(resultSet);
String tableSchema = identifierMapping.fromRemoteSchemaName(remoteSchemaFromResultSet);
String tableName = identifierMapping.fromRemoteTableName(remoteSchemaFromResultSet, resultSet.getString("TABLE_NAME"));
if (filterSchema(tableSchema)) {
list.add(RelationCommentMetadata.forRelation(new SchemaTableName(tableSchema, tableName), getTableComment(resultSet)));
}
}
return list.build();
}
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

@Override
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
Expand Down Expand Up @@ -80,11 +81,12 @@ public class CachingJdbcClient
private final IdentityCacheMapping identityMapping;

private final Cache<IdentityCacheKey, Set<String>> schemaNamesCache;
private final Cache<TableNamesCacheKey, List<SchemaTableName>> tableNamesCache;
private final Cache<TableListingCacheKey, List<SchemaTableName>> tableNamesCache;
private final Cache<TableHandlesByNameCacheKey, Optional<JdbcTableHandle>> tableHandlesByNameCache;
private final Cache<TableHandlesByQueryCacheKey, JdbcTableHandle> tableHandlesByQueryCache;
private final Cache<ProcedureHandlesByQueryCacheKey, JdbcProcedureHandle> procedureHandlesByQueryCache;
private final Cache<ColumnsCacheKey, List<JdbcColumnHandle>> columnsCache;
private final Cache<TableListingCacheKey, List<RelationCommentMetadata>> tableCommentsCache;
private final Cache<JdbcTableHandle, TableStatistics> statisticsCache;

@Inject
Expand Down Expand Up @@ -132,6 +134,7 @@ public CachingJdbcClient(
tableHandlesByQueryCache = buildCache(ticker, cacheMaximumSize, metadataCachingTtl);
procedureHandlesByQueryCache = buildCache(ticker, cacheMaximumSize, metadataCachingTtl);
columnsCache = buildCache(ticker, cacheMaximumSize, metadataCachingTtl);
tableCommentsCache = buildCache(ticker, cacheMaximumSize, metadataCachingTtl);
statisticsCache = buildCache(ticker, cacheMaximumSize, statisticsCachingTtl);
}

Expand Down Expand Up @@ -163,7 +166,7 @@ public Set<String> getSchemaNames(ConnectorSession session)
@Override
public List<SchemaTableName> getTableNames(ConnectorSession session, Optional<String> schema)
{
TableNamesCacheKey key = new TableNamesCacheKey(getIdentityKey(session), schema);
TableListingCacheKey key = new TableListingCacheKey(getIdentityKey(session), schema);
return get(tableNamesCache, key, () -> delegate.getTableNames(session, schema));
}

Expand All @@ -177,6 +180,12 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
return get(columnsCache, key, () -> delegate.getColumns(session, tableHandle));
}

@Override
public List<RelationCommentMetadata> getAllTableComments(ConnectorSession session, Optional<String> schema)
{
return get(tableCommentsCache, new TableListingCacheKey(getIdentityKey(session), schema), () -> delegate.getAllTableComments(session, schema));
}

@Override
public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle)
{
Expand Down Expand Up @@ -625,6 +634,7 @@ public void flushCache()
tableHandlesByNameCache.invalidateAll();
tableHandlesByQueryCache.invalidateAll();
columnsCache.invalidateAll();
tableCommentsCache.invalidateAll();
statisticsCache.invalidateAll();
}

Expand Down Expand Up @@ -656,6 +666,7 @@ private void invalidateTableCaches(SchemaTableName schemaTableName)
invalidateAllIf(tableHandlesByNameCache, key -> key.tableName.equals(schemaTableName));
tableHandlesByQueryCache.invalidateAll();
invalidateAllIf(tableNamesCache, key -> key.schemaName.equals(Optional.of(schemaTableName.getSchemaName())));
invalidateAllIf(tableCommentsCache, key -> key.schemaName.equals(Optional.of(schemaTableName.getSchemaName())));
invalidateAllIf(statisticsCache, key -> key.mayReference(schemaTableName));
}

Expand Down Expand Up @@ -743,9 +754,9 @@ private record ProcedureHandlesByQueryCacheKey(IdentityCacheKey identity, Proced
}
}

private record TableNamesCacheKey(IdentityCacheKey identity, Optional<String> schemaName)
private record TableListingCacheKey(IdentityCacheKey identity, Optional<String> schemaName)
{
private TableNamesCacheKey
private TableListingCacheKey
{
requireNonNull(identity, "identity is null");
requireNonNull(schemaName, "schemaName is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -71,6 +72,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -80,6 +82,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;

import static com.google.common.base.Functions.identity;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -911,6 +914,12 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
return columns.buildOrThrow();
}

@Override
public Iterator<RelationCommentMetadata> streamRelationComments(ConnectorSession session, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
{
return jdbcClient.getAllTableComments(session, schemaName).iterator();
}

@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
Expand Down Expand Up @@ -105,6 +106,12 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
return delegate().getColumns(session, tableHandle);
}

@Override
public List<RelationCommentMetadata> getAllTableComments(ConnectorSession session, Optional<String> schema)
{
return delegate().getAllTableComments(session, schema);
}

@Override
public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
Expand Down Expand Up @@ -64,6 +65,8 @@ default boolean schemaExists(ConnectorSession session, String schema)

List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle tableHandle);

List<RelationCommentMetadata> getAllTableComments(ConnectorSession session, Optional<String> schema);

Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class JdbcClientStats
private final JdbcApiStats dropTable = new JdbcApiStats();
private final JdbcApiStats finishInsertTable = new JdbcApiStats();
private final JdbcApiStats getColumns = new JdbcApiStats();
private final JdbcApiStats getAllTableComments = new JdbcApiStats();
private final JdbcApiStats getConnectionWithHandle = new JdbcApiStats();
private final JdbcApiStats getConnectionWithSplit = new JdbcApiStats();
private final JdbcApiStats getConnectionWithProcedure = new JdbcApiStats();
Expand Down Expand Up @@ -215,6 +216,13 @@ public JdbcApiStats getGetColumns()
return getColumns;
}

@Managed
@Nested
public JdbcApiStats getGetAllTableComments()
{
return getAllTableComments;
}

@Managed
@Nested
public JdbcApiStats getGetConnectionWithHandle()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
Expand Down Expand Up @@ -126,6 +127,12 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
return stats.getGetColumns().wrap(() -> delegate().getColumns(session, tableHandle));
}

@Override
public List<RelationCommentMetadata> getAllTableComments(ConnectorSession session, Optional<String> schema)
{
return stats.getGetAllTableComments().wrap(() -> delegate().getAllTableComments(session, schema));
}

@Override
public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle)
{
Expand Down

0 comments on commit 7eb6b09

Please sign in to comment.