Skip to content

Commit

Permalink
fix(sql): optimize reading executions by pipeline id (#2949)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher authored May 31, 2019
1 parent 8835c07 commit ba3d06b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,35 @@ class SqlExecutionRepository(
pipelineConfigId: String,
criteria: ExecutionCriteria
): Observable<Execution> {
val select = jooq.selectExecutions(
PIPELINE,
conditions = {
it.where(field("config_id").eq(pipelineConfigId))
.statusIn(criteria.statuses)
},
seek = {
it.orderBy(field("id").desc()).limit(criteria.pageSize)
}
)
// When not filtering by status, provide an index hint to ensure use of `pipeline_config_id_idx` which
// fully satisfies the where clause and order by. Without, some lookups by config_id matching thousands
// of executions triggered costly full table scans.
val select = if (criteria.statuses.isEmpty() || criteria.statuses.size == ExecutionStatus.values().size) {
jooq.selectExecutions(
PIPELINE,
usingIndex = "pipeline_config_id_idx",
conditions = {
it.where(field("config_id").eq(pipelineConfigId))
.statusIn(criteria.statuses)
},
seek = {
it.orderBy(field("id").desc()).limit(criteria.pageSize)
}
)
} else {
// When filtering by status, the above index hint isn't ideal. In this case, `pipeline_config_status_idx`
// appears to be used reliably without hinting.
jooq.selectExecutions(
PIPELINE,
conditions = {
it.where(field("config_id").eq(pipelineConfigId))
.statusIn(criteria.statuses)
},
seek = {
it.orderBy(field("id").desc()).limit(criteria.pageSize)
}
)
}

return Observable.from(select.fetchExecutions())
}
Expand Down Expand Up @@ -791,6 +810,18 @@ class SqlExecutionRepository(
.let { conditions(it) }
.let { seek(it) }

private fun DSLContext.selectExecutions(
type: ExecutionType,
fields: List<Field<Any>> = selectFields(),
usingIndex: String,
conditions: (SelectJoinStep<Record>) -> SelectConnectByStep<out Record>,
seek: (SelectConnectByStep<out Record>) -> SelectForUpdateStep<out Record>
) =
select(fields)
.from(type.tableName.forceIndex(usingIndex))
.let { conditions(it) }
.let { seek(it) }

private fun DSLContext.selectExecution(type: ExecutionType, fields: List<Field<Any>> = selectFields()) =
select(fields)
.from(type.tableName)
Expand Down
3 changes: 3 additions & 0 deletions orca-sql/src/main/resources/db/changelog-master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ databaseChangeLog:
- include:
file: changelog/20190516-index-correlated-execution-ids.yml
relativeToChangelogFile: true
- include:
file: changelog/20190530-pipeline-config-index.yml
relativeToChangelogFile: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
databaseChangeLog:
- changeSet:
id: 20190530-update-pipeline-config-indices
author: afeldman
changes:
- createIndex:
indexName: pipeline_config_id_idx
tableName: pipelines
columns:
- column:
name: config_id
- column:
name: id
- dropIndex:
indexName: pipeline_config_idx
tableName: pipelines
rollback:
- createIndex:
indexName: pipeline_config_idx
tableName: pipelines
columns:
- column:
name: config_id
- dropIndex:
indexName: pipeline_config_id_idx
tableName: pipelines

0 comments on commit ba3d06b

Please sign in to comment.