Skip to content

Commit

Permalink
fix(query-orchestrator): Athena got swamped by fetch schema requests
Browse files Browse the repository at this point in the history
Persist pre-aggregation tables schema cache for an hour and refresh it on reset
  • Loading branch information
paveltiunov committed May 7, 2019
1 parent 0296379 commit d8b5440
Showing 1 changed file with 27 additions and 9 deletions.
36 changes: 27 additions & 9 deletions packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js
Expand Up @@ -54,16 +54,34 @@ class PreAggregationLoadCache {
this.cacheDriver = preAggregations.cacheDriver;
}

async tablesFromCache(schema) {
let tables = await this.cacheDriver.get(this.tablesRedisKey());
async tablesFromCache(schema, forceRenew) {
let tables = forceRenew ? null : await this.cacheDriver.get(this.tablesRedisKey());
if (!tables) {
const client = await this.driverFactory();
tables = await client.getTablesQuery(schema);
await this.cacheDriver.set(this.tablesRedisKey(), tables, 120);
if (this.fetchTablesPromise) {
tables = await this.fetchTablesPromise;
} else {
this.fetchTablesPromise = this.fetchTables(schema);
try {
tables = await this.fetchTablesPromise;
} finally {
this.fetchTablesPromise = null;
}
}
}
return tables;
}

async fetchTables(schema) {
const client = await this.driverFactory();
const newTables = await client.getTablesQuery(schema);
await this.cacheDriver.set(
this.tablesRedisKey(),
newTables,
this.preAggregations.options.preAggregationsSchemaCacheDuration || 60 * 60
);
return newTables;
}

tablesRedisKey() {
return `SQL_PRE_AGGREGATIONS_TABLES_${this.redisPrefix}`;
}
Expand Down Expand Up @@ -103,11 +121,11 @@ class PreAggregationLoadCache {
return queue.getQueryStage(stageQueryKey, undefined, this.queryStageState);
}

async reset() {
async reset(schema) {
this.tables = undefined;
this.queryStageState = undefined;
this.versionEnries = undefined;
await this.cacheDriver.remove(this.tablesRedisKey());
await this.tablesFromCache(schema, true);
}
}

Expand Down Expand Up @@ -180,7 +198,7 @@ class PreAggregationLoader {
};

const mostRecentTargetTableName = async () => {
await this.loadCache.reset();
await this.loadCache.reset(this.preAggregation.preAggregationsSchema);
return this.targetTableName(
getVersionEntryByContentVersion(
await this.loadCache.getVersionEntries(this.preAggregation.preAggregationsSchema)
Expand Down Expand Up @@ -293,7 +311,7 @@ class PreAggregationLoader {
.filter(t => tablesToSave.indexOf(t) === -1);
this.logger('Dropping orphaned tables', { tablesToDrop: JSON.stringify(toDrop) });
await Promise.all(toDrop.map(table => client.dropTable(table)));
await this.loadCache.reset();
await this.loadCache.reset(this.preAggregation.preAggregationsSchema);
}

flushUsedTables() {
Expand Down

0 comments on commit d8b5440

Please sign in to comment.