Skip to content

Commit

Permalink
Add ExecutorService for metric retrieval
Browse files Browse the repository at this point in the history
This service will add parallelism to the retrieval of metrics
from the index metrics table, decreasing the overall planning time.
A new property, index_short_circuit_cardinality_fetch, is used to enable
a new feature which causes the metrics retrieval to return early if one
of the cardinalities retrieved is less than a given threshold.  The
remaining tasks will complete as planned, caching the results in the
Guava cache, but the connector can use the smallest cardinality
column in the meantime.

We also add a polling duration because some cached results are
returned a few milliseconds before a significantly lower result.
By adding a sleep prior to polling the tasks, we are adding 'waves'
of result retrieval.  The results of any completed tasks are taken
and the smallest cardinality, if below the threshold, is used while
the other tasks complete.
  • Loading branch information
adamjshook authored and electrum committed May 9, 2017
1 parent 792e2d4 commit 5cd5fb6
Show file tree
Hide file tree
Showing 8 changed files with 385 additions and 262 deletions.
6 changes: 6 additions & 0 deletions presto-accumulo/pom.xml
Expand Up @@ -317,5 +317,11 @@
<artifactId>testng</artifactId> <artifactId>testng</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>
Expand Up @@ -101,17 +101,17 @@ public AccumuloClient(
Connector connector, Connector connector,
AccumuloConfig config, AccumuloConfig config,
ZooKeeperMetadataManager metaManager, ZooKeeperMetadataManager metaManager,
AccumuloTableManager tableManager) AccumuloTableManager tableManager,
IndexLookup indexLookup)
throws AccumuloException, AccumuloSecurityException throws AccumuloException, AccumuloSecurityException
{ {
this.connector = requireNonNull(connector, "connector is null"); this.connector = requireNonNull(connector, "connector is null");
this.username = requireNonNull(config, "config is null").getUsername(); this.username = requireNonNull(config, "config is null").getUsername();
this.metaManager = requireNonNull(metaManager, "metaManager is null"); this.metaManager = requireNonNull(metaManager, "metaManager is null");
this.tableManager = requireNonNull(tableManager, "tableManager is null"); this.tableManager = requireNonNull(tableManager, "tableManager is null");
this.auths = connector.securityOperations().getUserAuthorizations(username); this.indexLookup = requireNonNull(indexLookup, "indexLookup is null");


// Create the index lookup utility this.auths = connector.securityOperations().getUserAuthorizations(username);
this.indexLookup = new IndexLookup(connector, config, this.auths);
} }


public AccumuloTable createTable(ConnectorTableMetadata meta) public AccumuloTable createTable(ConnectorTableMetadata meta)
Expand Down Expand Up @@ -440,9 +440,6 @@ public void dropTable(AccumuloTable table)
{ {
SchemaTableName tableName = new SchemaTableName(table.getSchema(), table.getTable()); SchemaTableName tableName = new SchemaTableName(table.getSchema(), table.getTable());


// Drop cardinality cache from index lookup
indexLookup.dropCache(tableName.getSchemaName(), tableName.getTableName());

// Remove the table metadata from Presto // Remove the table metadata from Presto
if (metaManager.getTable(tableName) != null) { if (metaManager.getTable(tableName) != null) {
metaManager.deleteTableMetadata(tableName); metaManager.deleteTableMetadata(tableName);
Expand Down
Expand Up @@ -16,6 +16,8 @@
import com.facebook.presto.accumulo.conf.AccumuloConfig; import com.facebook.presto.accumulo.conf.AccumuloConfig;
import com.facebook.presto.accumulo.conf.AccumuloSessionProperties; import com.facebook.presto.accumulo.conf.AccumuloSessionProperties;
import com.facebook.presto.accumulo.conf.AccumuloTableProperties; import com.facebook.presto.accumulo.conf.AccumuloTableProperties;
import com.facebook.presto.accumulo.index.ColumnCardinalityCache;
import com.facebook.presto.accumulo.index.IndexLookup;
import com.facebook.presto.accumulo.io.AccumuloPageSinkProvider; import com.facebook.presto.accumulo.io.AccumuloPageSinkProvider;
import com.facebook.presto.accumulo.io.AccumuloRecordSetProvider; import com.facebook.presto.accumulo.io.AccumuloRecordSetProvider;
import com.facebook.presto.accumulo.metadata.AccumuloTable; import com.facebook.presto.accumulo.metadata.AccumuloTable;
Expand Down Expand Up @@ -96,6 +98,8 @@ public void configure(Binder binder)
binder.bind(AccumuloTableProperties.class).in(Scopes.SINGLETON); binder.bind(AccumuloTableProperties.class).in(Scopes.SINGLETON);
binder.bind(ZooKeeperMetadataManager.class).in(Scopes.SINGLETON); binder.bind(ZooKeeperMetadataManager.class).in(Scopes.SINGLETON);
binder.bind(AccumuloTableManager.class).in(Scopes.SINGLETON); binder.bind(AccumuloTableManager.class).in(Scopes.SINGLETON);
binder.bind(IndexLookup.class).in(Scopes.SINGLETON);
binder.bind(ColumnCardinalityCache.class).in(Scopes.SINGLETON);
binder.bind(Connector.class).toProvider(ConnectorProvider.class); binder.bind(Connector.class).toProvider(ConnectorProvider.class);


configBinder(binder).bindConfig(AccumuloConfig.class); configBinder(binder).bindConfig(AccumuloConfig.class);
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.session.PropertyMetadata; import com.facebook.presto.spi.session.PropertyMetadata;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import io.airlift.units.Duration;


import javax.inject.Inject; import javax.inject.Inject;


Expand All @@ -25,6 +26,7 @@
import static com.facebook.presto.spi.session.PropertyMetadata.doubleSessionProperty; import static com.facebook.presto.spi.session.PropertyMetadata.doubleSessionProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerSessionProperty; import static com.facebook.presto.spi.session.PropertyMetadata.integerSessionProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringSessionProperty; import static com.facebook.presto.spi.session.PropertyMetadata.stringSessionProperty;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;


/** /**
* Class contains all session-based properties for the Accumulo connector. * Class contains all session-based properties for the Accumulo connector.
Expand All @@ -44,6 +46,8 @@ public final class AccumuloSessionProperties
private static final String INDEX_LOWEST_CARDINALITY_THRESHOLD = "index_lowest_cardinality_threshold"; private static final String INDEX_LOWEST_CARDINALITY_THRESHOLD = "index_lowest_cardinality_threshold";
private static final String INDEX_METRICS_ENABLED = "index_metrics_enabled"; private static final String INDEX_METRICS_ENABLED = "index_metrics_enabled";
private static final String SCAN_USERNAME = "scan_username"; private static final String SCAN_USERNAME = "scan_username";
private static final String INDEX_SHORT_CIRCUIT_CARDINALITY_FETCH = "index_short_circuit_cardinality_fetch";
private static final String INDEX_CARDINALITY_CACHE_POLLING_DURATION = "index_cardinality_cache_polling_duration";


private final List<PropertyMetadata<?>> sessionProperties; private final List<PropertyMetadata<?>> sessionProperties;


Expand Down Expand Up @@ -94,7 +98,22 @@ public AccumuloSessionProperties()
true, true,
false); false);


sessionProperties = ImmutableList.of(s1, s2, s3, s4, s5, s6, s7, s8); PropertyMetadata<Boolean> s9 = booleanSessionProperty(
INDEX_SHORT_CIRCUIT_CARDINALITY_FETCH,
"Short circuit the retrieval of index metrics once any column is less than the lowest cardinality threshold. Default true",
true,
false);

PropertyMetadata<String> s10 = new PropertyMetadata<>(
INDEX_CARDINALITY_CACHE_POLLING_DURATION,
"Sets the cardinality cache polling duration for short circuit retrieval of index metrics. Default 10ms",
VARCHAR, String.class,
"10ms",
false,
duration -> Duration.valueOf(duration.toString()).toString(),
object -> object);

sessionProperties = ImmutableList.of(s1, s2, s3, s4, s5, s6, s7, s8, s9, s10);
} }


public List<PropertyMetadata<?>> getSessionProperties() public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -132,6 +151,11 @@ public static double getIndexSmallCardThreshold(ConnectorSession session)
return session.getProperty(INDEX_LOWEST_CARDINALITY_THRESHOLD, Double.class); return session.getProperty(INDEX_LOWEST_CARDINALITY_THRESHOLD, Double.class);
} }


public static Duration getIndexCardinalityCachePollingDuration(ConnectorSession session)
{
return Duration.valueOf(session.getProperty(INDEX_CARDINALITY_CACHE_POLLING_DURATION, String.class));
}

public static boolean isIndexMetricsEnabled(ConnectorSession session) public static boolean isIndexMetricsEnabled(ConnectorSession session)
{ {
return session.getProperty(INDEX_METRICS_ENABLED, Boolean.class); return session.getProperty(INDEX_METRICS_ENABLED, Boolean.class);
Expand All @@ -141,4 +165,9 @@ public static String getScanUsername(ConnectorSession session)
{ {
return session.getProperty(SCAN_USERNAME, String.class); return session.getProperty(SCAN_USERNAME, String.class);
} }

public static boolean isIndexShortCircuitEnabled(ConnectorSession session)
{
return session.getProperty(INDEX_SHORT_CIRCUIT_CARDINALITY_FETCH, Boolean.class);
}
} }

0 comments on commit 5cd5fb6

Please sign in to comment.