Skip to content

Commit

Permalink
Collect column statistics on table write: SPI
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Aug 2, 2018
1 parent 05fbe72 commit 6df8ff2
Show file tree
Hide file tree
Showing 25 changed files with 412 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -102,7 +103,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments)
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
clearRollback();
return Optional.empty();
Expand Down Expand Up @@ -212,7 +213,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments)
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
clearRollback();
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
Expand Down Expand Up @@ -173,7 +174,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments)
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
JdbcOutputTableHandle handle = (JdbcOutputTableHandle) tableHandle;
jdbcClient.commitCreateTable(handle);
Expand Down Expand Up @@ -203,7 +204,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle tableHandle, Collection<Slice> fragments)
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
JdbcOutputTableHandle jdbcInsertHandle = (JdbcOutputTableHandle) tableHandle;
jdbcClient.finishInsertTable(jdbcInsertHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -159,7 +160,7 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
ConnectorOutputTableHandle outputTableHandle = beginCreateTable(session, tableMetadata, Optional.empty());
finishCreateTable(session, outputTableHandle, ImmutableList.of());
finishCreateTable(session, outputTableHandle, ImmutableList.of(), ImmutableList.of());
}

@Override
Expand Down Expand Up @@ -220,7 +221,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments)
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
BlackHoleOutputTableHandle blackHoleOutputTableHandle = (BlackHoleOutputTableHandle) tableHandle;
BlackHoleTableHandle table = blackHoleOutputTableHandle.getTable();
Expand All @@ -236,7 +237,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments)
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void tableIsCreatedAfterCommits()

assertThatNoTableIsCreated();

metadata.finishCreateTable(SESSION, table, ImmutableList.of());
metadata.finishCreateTable(SESSION, table, ImmutableList.of(), ImmutableList.of());

List<SchemaTableName> tables = metadata.listTables(SESSION, Optional.empty());
assertTrue(tables.size() == 1, "Expected only one table.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -312,7 +313,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments)
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return Optional.empty();
}
Expand All @@ -339,7 +340,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments)
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import com.facebook.presto.spi.security.GrantInfo;
import com.facebook.presto.spi.security.Privilege;
import com.facebook.presto.spi.security.PrivilegeInfo;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
Expand Down Expand Up @@ -875,7 +876,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments)
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
HiveOutputTableHandle handle = (HiveOutputTableHandle) tableHandle;

Expand Down Expand Up @@ -1089,7 +1090,7 @@ public HiveInsertTableHandle beginInsert(ConnectorSession session, ConnectorTabl
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments)
public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
HiveInsertTableHandle handle = (HiveInsertTableHandle) insertHandle;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ protected void doTestMismatchSchemaTable(
sink.appendPage(dataBefore.toPage());
Collection<Slice> fragments = getFutureValue(sink.finish());

metadata.finishInsert(session, insertTableHandle, fragments);
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());

transaction.commit();
}
Expand Down Expand Up @@ -1113,7 +1113,7 @@ protected void doTestMismatchSchemaTable(
sink.appendPage(dataAfter.toPage());
Collection<Slice> fragments = getFutureValue(sink.finish());

metadata.finishInsert(session, insertTableHandle, fragments);
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());

transaction.commit();

Expand Down Expand Up @@ -2323,7 +2323,7 @@ private void doTestBucketSortedTables(SchemaTableName table)
}

// finish creating table
metadata.finishCreateTable(session, outputHandle, fragments);
metadata.finishCreateTable(session, outputHandle, fragments, ImmutableList.of());

transaction.commit();
}
Expand Down Expand Up @@ -2725,7 +2725,7 @@ private void createDummyTable(SchemaTableName tableName)
List<ColumnMetadata> columns = ImmutableList.of(new ColumnMetadata("dummy", createUnboundedVarcharType()));
ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(tableName, columns, createTableProperties(TEXTFILE));
ConnectorOutputTableHandle handle = metadata.beginCreateTable(session, tableMetadata, Optional.empty());
metadata.finishCreateTable(session, handle, ImmutableList.of());
metadata.finishCreateTable(session, handle, ImmutableList.of(), ImmutableList.of());

transaction.commit();
}
Expand Down Expand Up @@ -2945,7 +2945,7 @@ protected void doCreateTable(SchemaTableName tableName, HiveStorageFormat storag
}

// commit the table
metadata.finishCreateTable(session, outputHandle, fragments);
metadata.finishCreateTable(session, outputHandle, fragments, ImmutableList.of());

transaction.commit();
}
Expand Down Expand Up @@ -3097,7 +3097,7 @@ private void doInsert(HiveStorageFormat storageFormat, SchemaTableName tableName
sink.appendPage(CREATE_TABLE_DATA.toPage());
sink.appendPage(CREATE_TABLE_DATA.toPage());
Collection<Slice> fragments = getFutureValue(sink.finish());
metadata.finishInsert(session, insertTableHandle, fragments);
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());

// statistics, visible from within transaction
HiveBasicStatistics tableStatistics = getBasicStatisticsForTable(transaction, tableName);
Expand Down Expand Up @@ -3307,7 +3307,7 @@ private void doInsertIntoNewPartition(HiveStorageFormat storageFormat, SchemaTab
ConnectorPageSink sink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, insertTableHandle);
sink.appendPage(CREATE_TABLE_PARTITIONED_DATA_2ND.toPage());
Collection<Slice> fragments = getFutureValue(sink.finish());
metadata.finishInsert(session, insertTableHandle, fragments);
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());

// verify all temp files start with the unique prefix
HdfsContext context = new HdfsContext(session, tableName.getSchemaName(), tableName.getTableName());
Expand Down Expand Up @@ -3422,7 +3422,7 @@ private void doInsertIntoExistingPartition(HiveStorageFormat storageFormat, Sche
sink.appendPage(CREATE_TABLE_PARTITIONED_DATA.toPage());
sink.appendPage(CREATE_TABLE_PARTITIONED_DATA.toPage());
Collection<Slice> fragments = getFutureValue(sink.finish());
metadata.finishInsert(session, insertTableHandle, fragments);
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());

// verify all temp files start with the unique prefix
HdfsContext context = new HdfsContext(session, tableName.getSchemaName(), tableName.getTableName());
Expand Down Expand Up @@ -3566,7 +3566,7 @@ private String insertData(SchemaTableName tableName, MaterializedResult data)
Collection<Slice> fragments = getFutureValue(sink.finish());

// commit the insert
metadata.finishInsert(session, insertTableHandle, fragments);
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());
transaction.commit();
}

Expand Down Expand Up @@ -4456,7 +4456,7 @@ private void doTestTransactionDeleteInsert(
rollbackIfEquals(tag, ROLLBACK_AFTER_APPEND_PAGE);
Collection<Slice> fragments = getFutureValue(sink.finish());
rollbackIfEquals(tag, ROLLBACK_AFTER_SINK_FINISH);
metadata.finishInsert(session, insertTableHandle, fragments);
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());
rollbackIfEquals(tag, ROLLBACK_AFTER_FINISH_INSERT);

assertEquals(tag, COMMIT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private void createTable(SchemaTableName tableName, HiveStorageFormat storageFor
Collection<Slice> fragments = getFutureValue(sink.finish());

// commit the table
metadata.finishCreateTable(session, outputHandle, fragments);
metadata.finishCreateTable(session, outputHandle, fragments, ImmutableList.of());

transaction.commit();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.security.GrantInfo;
import com.facebook.presto.spi.security.Privilege;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.spi.statistics.TableStatisticsMetadata;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spi.type.TypeSignature;
Expand Down Expand Up @@ -167,10 +169,15 @@ public interface Metadata
/**
* Finish a table creation with data after the data is written.
*/
Optional<ConnectorOutputMetadata> finishCreateTable(Session session, OutputTableHandle tableHandle, Collection<Slice> fragments);
Optional<ConnectorOutputMetadata> finishCreateTable(Session session, OutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics);

Optional<NewTableLayout> getInsertLayout(Session session, TableHandle target);

/**
* Describes statistics that must be collected during a write.
*/
TableStatisticsMetadata getStatisticsCollectionMetadata(Session session, String catalogName, ConnectorTableMetadata tableMetadata);

/**
* Start a SELECT/UPDATE/INSERT/DELETE query
*/
Expand All @@ -190,7 +197,7 @@ public interface Metadata
/**
* Finish insert query
*/
Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments);
Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics);

/**
* Get the row ID column handle used with UpdatablePageSource.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.security.GrantInfo;
import com.facebook.presto.spi.security.Privilege;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.spi.statistics.TableStatisticsMetadata;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spi.type.TypeSignature;
Expand Down Expand Up @@ -576,6 +578,15 @@ public Optional<NewTableLayout> getInsertLayout(Session session, TableHandle tab
.map(layout -> new NewTableLayout(connectorId, catalogMetadata.getTransactionHandleFor(connectorId), layout));
}

@Override
public TableStatisticsMetadata getStatisticsCollectionMetadata(Session session, String catalogName, ConnectorTableMetadata tableMetadata)
{
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogName);
ConnectorMetadata metadata = catalogMetadata.getMetadata();
ConnectorId connectorId = catalogMetadata.getConnectorId();
return metadata.getStatisticsCollectionMetadata(session.toConnectorSession(connectorId), tableMetadata);
}

@Override
public Optional<NewTableLayout> getNewTableLayout(Session session, String catalogName, ConnectorTableMetadata tableMetadata)
{
Expand Down Expand Up @@ -638,11 +649,11 @@ public OutputTableHandle beginCreateTable(Session session, String catalogName, C
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(Session session, OutputTableHandle tableHandle, Collection<Slice> fragments)
public Optional<ConnectorOutputMetadata> finishCreateTable(Session session, OutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
ConnectorId connectorId = tableHandle.getConnectorId();
ConnectorMetadata metadata = getMetadata(session, connectorId);
return metadata.finishCreateTable(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle(), fragments);
return metadata.finishCreateTable(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle(), fragments, computedStatistics);
}

@Override
Expand All @@ -657,11 +668,11 @@ public InsertTableHandle beginInsert(Session session, TableHandle tableHandle)
}

@Override
public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments)
public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
ConnectorId connectorId = tableHandle.getConnectorId();
ConnectorMetadata metadata = getMetadata(session, connectorId);
return metadata.finishInsert(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle(), fragments);
return metadata.finishInsert(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle(), fragments, computedStatistics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2529,10 +2529,10 @@ private static TableFinisher createTableFinisher(Session session, TableFinishNod
WriterTarget target = node.getTarget();
return fragments -> {
if (target instanceof CreateHandle) {
return metadata.finishCreateTable(session, ((CreateHandle) target).getHandle(), fragments);
return metadata.finishCreateTable(session, ((CreateHandle) target).getHandle(), fragments, ImmutableList.of());
}
else if (target instanceof InsertHandle) {
return metadata.finishInsert(session, ((InsertHandle) target).getHandle(), fragments);
return metadata.finishInsert(session, ((InsertHandle) target).getHandle(), fragments, ImmutableList.of());
}
else if (target instanceof DeleteHandle) {
metadata.finishDelete(session, ((DeleteHandle) target).getHandle(), fragments);
Expand Down

0 comments on commit 6df8ff2

Please sign in to comment.