Skip to content
Permalink
Browse files

Add "force" flag to cluster delete REST endpoint, allowing registered…

… clusters with schedules and repairs runs to be forcibly deleted.

Forcibly deleting a cluster entails first deleting those schedules, repair runs, and repair units belonging to the cluster, before also deleting the cluster.

 ref: #793
  • Loading branch information
michaelsembwever committed Nov 12, 2019
1 parent 5c4d811 commit 848e48bec6b2afe42cd9857abe1533e7d9f776d7
@@ -20,6 +20,7 @@
import io.cassandrareaper.AppContext;
import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.core.RepairRun;
import io.cassandrareaper.jmx.ClusterFacade;
import io.cassandrareaper.jmx.JmxProxy;
import io.cassandrareaper.resources.view.ClusterStatus;
@@ -307,28 +308,42 @@ private Cluster updateClusterSeeds(Cluster cluster, String seedHosts) throws Rea
/**
* Delete a Cluster object with given name.
*
* <p>Cluster can be only deleted when it hasn't any running RepairRuns or active RepairSchedule instances under
* <p>Cluster can only be forced deleted when it has any RepairRuns or RepairSchedule instances associated to it.
*/
@DELETE
@Path("/{cluster_name}")
public Response deleteCluster(@PathParam("cluster_name") String clusterName) {
public Response deleteCluster(
@PathParam("cluster_name") String clusterName,
@QueryParam("force") Optional<Boolean> force) {

LOG.info("delete cluster {}", clusterName);
try {
if (!context.storage.getRepairSchedulesForCluster(clusterName).isEmpty()) {
return Response.status(Response.Status.CONFLICT)
.entity("cluster \"" + clusterName + "\" cannot be deleted, as it has repair schedules")
.build();
if (!force.orElse(Boolean.FALSE)) {
if (!context.storage.getRepairSchedulesForCluster(clusterName).isEmpty()) {
return Response.status(Response.Status.CONFLICT)
.entity("cluster \"" + clusterName + "\" cannot be deleted, as it has repair schedules")
.build();
}
if (!context.storage.getRepairRunsForCluster(clusterName, Optional.empty()).isEmpty()) {
return Response.status(Response.Status.CONFLICT)
.entity("cluster \"" + clusterName + "\" cannot be deleted, as it has repair runs")
.build();
}
if (!context.storage.getEventSubscriptions(clusterName).isEmpty()) {
return Response.status(Response.Status.CONFLICT)
.entity("cluster \"" + clusterName + "\" cannot be deleted, as it has diagnostic events subscriptions")
.build();
}
}
if (!context.storage.getRepairRunsForCluster(clusterName, Optional.empty()).isEmpty()) {
if (context.storage.getRepairRunsWithState(RepairRun.RunState.RUNNING)
.stream()
.anyMatch(run -> "clusterName".equals(run.getClusterName()))) {

return Response.status(Response.Status.CONFLICT)
.entity("cluster \"" + clusterName + "\" cannot be deleted, as it has repair runs")
.entity("cluster \"" + clusterName + "\" cannot be deleted, as it has running repairs. Stop them first.")
.build();
}
if (!context.storage.getEventSubscriptions(clusterName).isEmpty()) {
return Response.status(Response.Status.CONFLICT)
.entity("cluster \"" + clusterName + "\" cannot be deleted, as it has diagnostic events subscriptions")
.build();
}

context.storage.deleteCluster(clusterName);
return Response.accepted().build();
} catch (IllegalArgumentException ex) {
@@ -148,6 +148,7 @@ public RepairUnit load(UUID repairUnitId) throws Exception {
private PreparedStatement getRepairRunForUnitPrepStmt;
private PreparedStatement deleteRepairRunPrepStmt;
private PreparedStatement deleteRepairRunByClusterPrepStmt;
private PreparedStatement deleteRepairRunByClusterByIdPrepStmt;
private PreparedStatement deleteRepairRunByUnitPrepStmt;
private PreparedStatement insertRepairUnitPrepStmt;
private PreparedStatement getRepairUnitPrepStmt;
@@ -168,11 +169,10 @@ public RepairUnit load(UUID repairUnitId) throws Exception {
private PreparedStatement getRepairScheduleByClusterAndKsPrepStmt;
private PreparedStatement insertRepairScheduleByClusterAndKsPrepStmt;
private PreparedStatement deleteRepairSchedulePrepStmt;
private PreparedStatement deleteRepairScheduleByClusterAndKsPrepStmt;
private PreparedStatement deleteRepairScheduleByClusterAndKsByIdPrepStmt;
private PreparedStatement takeLeadPrepStmt;
private PreparedStatement renewLeadPrepStmt;
private PreparedStatement releaseLeadPrepStmt;
private PreparedStatement forceReleaseLeadPrepStmt;
private PreparedStatement getRunningReapersCountPrepStmt;
private PreparedStatement saveHeartbeatPrepStmt;
private PreparedStatement storeNodeMetricsPrepStmt;
@@ -344,6 +344,8 @@ private void prepareStatements() {
getRepairRunForUnitPrepStmt = session.prepare("SELECT * FROM repair_run_by_unit WHERE repair_unit_id = ?");
deleteRepairRunPrepStmt = session.prepare("DELETE FROM repair_run WHERE id = ?");
deleteRepairRunByClusterPrepStmt
= session.prepare("DELETE FROM repair_run_by_cluster WHERE cluster_name = ?");
deleteRepairRunByClusterByIdPrepStmt
= session.prepare("DELETE FROM repair_run_by_cluster WHERE id = ? and cluster_name = ?");
deleteRepairRunByUnitPrepStmt = session.prepare("DELETE FROM repair_run_by_unit "
+ "WHERE id = ? and repair_unit_id= ?");
@@ -408,7 +410,7 @@ private void prepareStatements() {
"SELECT repair_schedule_id FROM repair_schedule_by_cluster_and_keyspace "
+ "WHERE cluster_name = ? and keyspace_name = ?");
deleteRepairSchedulePrepStmt = session.prepare("DELETE FROM repair_schedule_v1 WHERE id = ?");
deleteRepairScheduleByClusterAndKsPrepStmt = session.prepare(
deleteRepairScheduleByClusterAndKsByIdPrepStmt = session.prepare(
"DELETE FROM repair_schedule_by_cluster_and_keyspace "
+ "WHERE cluster_name = ? and keyspace_name = ? and repair_schedule_id = ?");
prepareLeaderElectionStatements(timeUdf);
@@ -461,7 +463,6 @@ private void prepareLeaderElectionStatements(final String timeUdf) {
"UPDATE leader USING TTL ? SET reaper_instance_id = ?, reaper_instance_host = ?,"
+ " last_heartbeat = " + timeUdf + "(now()) WHERE leader_id = ? IF reaper_instance_id = ?");
releaseLeadPrepStmt = session.prepare("DELETE FROM leader WHERE leader_id = ? IF reaper_instance_id = ?");
forceReleaseLeadPrepStmt = session.prepare("DELETE FROM leader WHERE leader_id = ?");
}

private void prepareMetricStatements() {
@@ -617,20 +618,21 @@ private Cluster parseCluster(Row row) throws IOException {

@Override
public Cluster deleteCluster(String clusterName) {
assert getRepairSchedulesForCluster(clusterName).isEmpty()
: StringUtils.join(getRepairSchedulesForCluster(clusterName));
getRepairSchedulesForCluster(clusterName).forEach(schedule -> deleteRepairSchedule(schedule.getId()));
session.executeAsync(deleteRepairRunByClusterPrepStmt.bind(clusterName));

assert getRepairRunsForCluster(clusterName, Optional.of(Integer.MAX_VALUE)).isEmpty()
: StringUtils.join(getRepairRunsForCluster(clusterName, Optional.of(Integer.MAX_VALUE)));
getEventSubscriptions(clusterName)
.stream()
.filter(subscription -> subscription.getId().isPresent())
.forEach(subscription -> deleteEventSubscription(subscription.getId().get()));

Statement stmt = new SimpleStatement(SELECT_REPAIR_UNIT);
stmt.setIdempotent(Boolean.TRUE);
stmt.setIdempotent(true);
ResultSet results = session.execute(stmt);
for (Row row : results) {
if (row.getString("cluster_name").equals(clusterName)) {
UUID id = row.getUUID("id");
assert getRepairRunsForUnit(id).isEmpty() : StringUtils.join(getRepairRunsForUnit(id));
session.execute(deleteRepairUnitPrepStmt.bind(id));
session.executeAsync(deleteRepairUnitPrepStmt.bind(id));
}
}
Cluster cluster = getCluster(clusterName);
@@ -858,7 +860,7 @@ public boolean updateRepairRun(RepairRun repairRun) {
Optional<RepairRun> repairRun = getRepairRun(id);
if (repairRun.isPresent()) {
session.execute(deleteRepairRunByUnitPrepStmt.bind(id, repairRun.get().getRepairUnitId()));
session.execute(deleteRepairRunByClusterPrepStmt.bind(id, repairRun.get().getClusterName()));
session.execute(deleteRepairRunByClusterByIdPrepStmt.bind(id, repairRun.get().getClusterName()));
}
session.execute(deleteRepairRunPrepStmt.bind(id));
return repairRun;
@@ -1307,15 +1309,15 @@ public boolean updateRepairSchedule(RepairSchedule newRepairSchedule) {
RepairUnit repairUnit = getRepairUnit(repairSchedule.get().getRepairUnitId());

session.execute(
deleteRepairScheduleByClusterAndKsPrepStmt.bind(
deleteRepairScheduleByClusterAndKsByIdPrepStmt.bind(
repairUnit.getClusterName(), repairUnit.getKeyspaceName(), repairSchedule.get().getId()));

session.execute(
deleteRepairScheduleByClusterAndKsPrepStmt.bind(
deleteRepairScheduleByClusterAndKsByIdPrepStmt.bind(
repairUnit.getClusterName(), " ", repairSchedule.get().getId()));

session.execute(
deleteRepairScheduleByClusterAndKsPrepStmt.bind(
deleteRepairScheduleByClusterAndKsByIdPrepStmt.bind(
" ", repairUnit.getKeyspaceName(), repairSchedule.get().getId()));

session.execute(deleteRepairSchedulePrepStmt.bind(repairSchedule.get().getId()));
@@ -124,11 +124,13 @@ public Cluster getCluster(String clusterName) {

@Override
public Cluster deleteCluster(String clusterName) {
assert getRepairSchedulesForCluster(clusterName).isEmpty()
: StringUtils.join(getRepairSchedulesForCluster(clusterName));
getRepairSchedulesForCluster(clusterName).forEach(schedule -> deleteRepairSchedule(schedule.getId()));
getRepairRunIdsForCluster(clusterName).forEach(runId -> deleteRepairRun(runId));

assert getRepairRunsForCluster(clusterName, Optional.of(Integer.MAX_VALUE)).isEmpty()
: StringUtils.join(getRepairRunsForCluster(clusterName, Optional.of(Integer.MAX_VALUE)));
getEventSubscriptions(clusterName)
.stream()
.filter(subscription -> subscription.getId().isPresent())
.forEach(subscription -> deleteEventSubscription(subscription.getId().get()));

repairUnits.values().stream()
.filter((unit) -> unit.getClusterName().equals(clusterName))
@@ -143,12 +143,13 @@ public Cluster getCluster(String clusterName) {

@Override
public Cluster deleteCluster(String clusterName) {
getRepairSchedulesForCluster(clusterName).forEach(schedule -> deleteRepairSchedule(schedule.getId()));
getRepairRunIdsForCluster(clusterName).forEach(runId -> deleteRepairRun(runId));

assert getRepairSchedulesForCluster(clusterName).isEmpty()
: StringUtils.join(getRepairSchedulesForCluster(clusterName));

assert getRepairRunsForCluster(clusterName, Optional.of(Integer.MAX_VALUE)).isEmpty()
: StringUtils.join(getRepairRunsForCluster(clusterName, Optional.of(Integer.MAX_VALUE)));
getEventSubscriptions(clusterName)
.stream()
.filter(subscription -> subscription.getId().isPresent())
.forEach(subscription -> deleteEventSubscription(subscription.getId().get()));

Cluster result = null;
try (Handle h = jdbi.open()) {
@@ -34,6 +34,7 @@
import io.cassandrareaper.storage.postgresql.DiagEventSubscriptionMapper;

import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@@ -791,18 +792,6 @@ public void all_added_schedules_are_deleted_for_the_last_added_cluster() throws

@And("^deleting cluster called \"([^\"]*)\" fails$")
public void deleting_cluster_called_fails(String clusterName) throws Throwable {
synchronized (BasicSteps.class) {
callAndExpect(
"DELETE",
"/cluster/" + clusterName,
EMPTY_PARAMS,
Optional.empty(),
Response.Status.CONFLICT);
}
}

@And("^deleting the last added cluster fails$")
public void deleting_the_last_added_cluster_fails() throws Throwable {
synchronized (BasicSteps.class) {
await().with().pollInterval(POLL_INTERVAL).atMost(1, MINUTES).until(() -> {
try {
@@ -821,41 +810,18 @@ public void deleting_the_last_added_cluster_fails() throws Throwable {
}
}

@And("^cluster called \"([^\"]*)\" is deleted$")
public void cluster_called_is_deleted(String clusterName) throws Throwable {
synchronized (BasicSteps.class) {
callAndExpect(
"DELETE",
"/cluster/" + clusterName,
EMPTY_PARAMS,
Optional.<String>empty(),
Response.Status.ACCEPTED,
Response.Status.NOT_FOUND);
@And("^the last added cluster is (force |)deleted$")
public void cluster_called_is_deleted(String force) throws Throwable {

await().with().pollInterval(POLL_INTERVAL).atMost(1, MINUTES).until(() -> {
try {
callAndExpect(
"GET",
"/cluster/" + clusterName,
EMPTY_PARAMS,
Optional.<String>empty(),
Response.Status.NOT_FOUND);
} catch (AssertionError ex) {
LOG.warn("GET /cluster/" + TestContext.TEST_CLUSTER + " failed: " + ex.getMessage());
return false;
}
return true;
});
}
}
Optional<Map<String,String>> params = "force ".equals(force)
? Optional.of(Collections.singletonMap("force", "true"))
: EMPTY_PARAMS;

@And("^the last added cluster is deleted$")
public void cluster_called_is_deleted() throws Throwable {
synchronized (BasicSteps.class) {
callAndExpect(
"DELETE",
"/cluster/" + TestContext.TEST_CLUSTER,
EMPTY_PARAMS,
params,
Optional.<String>empty(),
Response.Status.ACCEPTED,
Response.Status.NOT_FOUND);
@@ -481,6 +481,43 @@ public void addingAClusterAutomaticallySetupSchedulingRepairsWhenEnabled() throw
assertEquals(1, mocks.context.storage.getRepairSchedulesForClusterAndKeyspace(CLUSTER_NAME, "keyspace1").size());
}

@Test
public void testClusterDeleting() throws Exception {
final MockObjects mocks = initMocks();
when(mocks.jmxProxy.getLiveNodes()).thenReturn(Arrays.asList(SEED_HOST));
when(mocks.jmxProxy.getKeyspaces()).thenReturn(Lists.newArrayList("keyspace1"));

when(mocks.jmxProxy.getTablesForKeyspace("keyspace1"))
.thenReturn(Sets.newHashSet(Table.builder().withName("table1").withCompactionStrategy(STCS).build()));

mocks.context.config = TestRepairConfiguration.defaultConfigBuilder()
.withAutoScheduling(
TestRepairConfiguration.defaultAutoSchedulingConfigBuilder()
.thatIsEnabled()
.withTimeBeforeFirstSchedule(Duration.ofMinutes(1))
.build())
.build();

ClusterResource clusterResource = new ClusterResource(mocks.context, Executors.newFixedThreadPool(2));

Response response = clusterResource
.addOrUpdateCluster(mocks.uriInfo, Optional.of(SEED_HOST), Optional.of(Cluster.DEFAULT_JMX_PORT));

assertEquals(HttpStatus.CREATED_201, response.getStatus());
assertEquals(1, mocks.context.storage.getAllRepairSchedules().size());
assertEquals(1, mocks.context.storage.getRepairSchedulesForClusterAndKeyspace(CLUSTER_NAME, "keyspace1").size());

assertEquals(HttpStatus.CONFLICT_409, clusterResource.deleteCluster(CLUSTER_NAME, Optional.empty()).getStatus());

assertEquals(
HttpStatus.CONFLICT_409,
clusterResource.deleteCluster(CLUSTER_NAME, Optional.of(Boolean.FALSE)).getStatus());

assertEquals(
HttpStatus.ACCEPTED_202,
clusterResource.deleteCluster(CLUSTER_NAME, Optional.of(Boolean.TRUE)).getStatus());
}

@Test
public void testParseSeedHost() {
String seedHostStringList = "127.0.0.1 , 127.0.0.2, 127.0.0.3";
@@ -14,8 +14,10 @@

Feature: Access Control

Background:
Given cluster seed host "127.0.0.1" points to cluster with name "test"

Scenario Outline: Request to protected resource is redirected to login page when accessed without login
Given that we are going to use "127.0.0.1@test" as cluster seed host
When a <path> <request> is made
Then the response was redirected to the login page
Examples:
@@ -24,23 +26,20 @@ Feature: Access Control
| GET | /webui/index.html |

Scenario Outline: Request to public resource is allowed without login
Given that we are going to use "127.0.0.1@test" as cluster seed host
When a <path> <request> is made
Then a "OK" response is returned
Examples:
| path | request |
| GET | /webui/login.html |

Scenario Outline: Request to ping resource is allowed but not healthy
Given that we are going to use "127.0.0.1@test" as cluster seed host
When a <path> <request> is made
Then a "NOT_ACCEPTABLE" response is returned
Examples:
| path | request |
| GET | /ping |

Scenario Outline: Request to protected resource without login returns forbidden
Given that we are going to use "127.0.0.1@test" as cluster seed host
When a <path> <request> is made
Then a "FORBIDDEN" response is returned
Examples:

0 comments on commit 848e48b

Please sign in to comment.
You can’t perform that action at this time.