Skip to content

Commit

Permalink
Use index table repair_run_by_cluster_v2 to obtain UUIDs for cluste…
Browse files Browse the repository at this point in the history
…r and state and then query the main `repair_runs` table using the found UUIDs.
  • Loading branch information
Miles-Garnsey committed Jan 13, 2023
1 parent d61b8d6 commit def2799
Showing 1 changed file with 34 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ private void prepareStatements() {
getRepairRunForClusterPrepStmt = session.prepare(
"SELECT * FROM repair_run_by_cluster_v2 WHERE cluster_name = ? limit ?");
getRepairRunForClusterWhereStatusPrepStmt = session.prepare(
"SELECT * FROM repair_run_by_cluster_v2 WHERE cluster_name = ? AND repair_run_state = ? limit ?");
"SELECT id FROM repair_run_by_cluster_v2 WHERE cluster_name = ? AND repair_run_state = ? limit ?");
getRepairRunForUnitPrepStmt = session.prepare("SELECT * FROM repair_run_by_unit WHERE repair_unit_id = ?");
deleteRepairRunPrepStmt = session.prepare("DELETE FROM repair_run WHERE id = ?");
deleteRepairRunByClusterPrepStmt
Expand Down Expand Up @@ -962,29 +962,52 @@ public Collection<RepairRun> getRepairRunsForCluster(String clusterName, Optiona

@Override
public List<RepairRun> getRepairRunsForClusterPrioritiseRunning(String clusterName, Optional<Integer> limit) {
List<ResultSetFuture> repairRunFutures = Lists.<ResultSetFuture>newArrayList();
List<ResultSetFuture> repairUuidFuturesByState = Lists.<ResultSetFuture>newArrayList();
// We've set up the RunState enum so that values are declared in order of "interestingness",
// we iterate over the table via the secondary index according to that ordering.
for (RunState state : RepairRun.RunState.values()) {
repairRunFutures.add(
repairUuidFuturesByState.add(
// repairUUIDFutures will be a List of resultSetFutures, each of which contains a ResultSet of
// UUIDs for one status.
session
.executeAsync(getRepairRunForClusterWhereStatusPrepStmt
.bind(clusterName, state.toString(), limit.orElse(MAX_RETURNED_REPAIR_RUNS)
)
)
);
}
List<RepairRun> flattenedRows = Lists.<RepairRun>newArrayList();
repairRunFutures
List<UUID> flattenedUuids = Lists.<UUID>newArrayList();
// Flatten the UUIDs from each status down into a single array and trim.
for (ResultSetFuture idResSetFuture : repairUuidFuturesByState) {
idResSetFuture
.getUninterruptibly()
.forEach(
row -> flattenedUuids.add(row.getUUID("id"))
);
}
flattenedUuids.subList(0, limit.orElse(MAX_RETURNED_REPAIR_RUNS));
// Run an async query on each UUID in the flattened list, against the main repair_run table with
// all columns required as an input to `buildRepairRunFromRow`.
List<ResultSetFuture> repairRunFutures = Lists.<ResultSetFuture>newArrayList();
flattenedUuids.forEach(uuid ->
repairRunFutures.add(
session
.executeAsync(getRepairRunPrepStmt.bind(uuid)
)
)
);
// Defuture the repair_run rows and build the strongly typed RepairRun objects from the contents.
return repairRunFutures
.stream()
.forEach(
resSet -> resSet.getUninterruptibly().forEach(
row -> flattenedRows.add(buildRepairRunFromRow(row, row.getUUID("id")))
)
);
return flattenedRows.subList(0, limit.orElse(MAX_RETURNED_REPAIR_RUNS));
.map(
row -> {
Row extractedRow = row.getUninterruptibly().one();
return buildRepairRunFromRow(extractedRow, extractedRow.getUUID("id"));
}
).collect(Collectors.toList());
}


@Override
public Collection<RepairRun> getRepairRunsForUnit(UUID repairUnitId) {
List<ResultSetFuture> repairRunFutures = Lists.<ResultSetFuture>newArrayList();
Expand Down

0 comments on commit def2799

Please sign in to comment.