Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Cruise Control system tests #9574

Merged
merged 6 commits into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters;
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlEndpoints;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.resources.crd.KafkaResource;
import io.strimzi.systemtest.resources.crd.KafkaTopicResource;
import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils;
import io.strimzi.test.TestUtils;
import io.strimzi.test.executor.ExecResult;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -35,49 +35,55 @@ public class CruiseControlUtils {
public static final String CRUISE_CONTROL_MODEL_TRAINING_SAMPLES_TOPIC = "strimzi.cruisecontrol.modeltrainingsamples"; // partitions 32 , rf - 2
public static final String CRUISE_CONTROL_PARTITION_METRICS_SAMPLES_TOPIC = "strimzi.cruisecontrol.partitionmetricsamples"; // partitions 32 , rf - 2

private static final int CRUISE_CONTROL_DEFAULT_PORT = 9090;
private static final int CRUISE_CONTROL_METRICS_PORT = 9404;
public static final int CRUISE_CONTROL_DEFAULT_PORT = 9090;
public static final int CRUISE_CONTROL_METRICS_PORT = 9404;

private static final String CONTAINER_NAME = "cruise-control";

private CruiseControlUtils() { }

public enum SupportedHttpMethods {
public enum HttpMethod {
GET,
POST
}

public enum SupportedSchemes {
public enum Scheme {
HTTP,
HTTPS
}

public static String callApi(String namespaceName, SupportedHttpMethods method, CruiseControlEndpoints endpoint, SupportedSchemes scheme, Boolean withCredentials) {
return callApi(namespaceName, method, endpoint, scheme, withCredentials, "");
}
public static class ApiResult {
private String responseText;
private int responseCode;

@SuppressWarnings("Regexp")
@SuppressFBWarnings("DM_CONVERT_CASE")
public static String callApi(String namespaceName, SupportedHttpMethods method, CruiseControlEndpoints endpoint, SupportedSchemes scheme, Boolean withCredentials, String endpointSuffix) {
String ccPodName = PodUtils.getFirstPodNameContaining(namespaceName, CONTAINER_NAME);
String args = " -k ";
public ApiResult(ExecResult execResult) {
this.responseText = execResult.out();
this.responseCode = responseCode(execResult.out());
}

if (withCredentials) {
args = " --cacert /etc/cruise-control/cc-certs/cruise-control.crt"
+ " --user admin:$(cat /opt/cruise-control/api-auth-config/cruise-control.apiAdminPassword) ";
private int responseCode(String responseText) {
responseText = responseText.replaceAll("\n", "");
return Integer.parseInt(responseText.substring(responseText.length() - 3));
}

public String getResponseText() {
return responseText;
}

return cmdKubeClient(namespaceName).execInPodContainer(Level.DEBUG, ccPodName, CONTAINER_NAME, "/bin/bash", "-c",
"curl -X " + method.name() + args + " " + scheme + "://localhost:" + CRUISE_CONTROL_DEFAULT_PORT + endpoint.toString() + endpointSuffix).out();
public int getResponseCode() {
return responseCode;
}
}

@SuppressWarnings("Regexp")
@SuppressFBWarnings("DM_CONVERT_CASE")
public static String callApi(String namespaceName, SupportedHttpMethods method, String endpoint) {
public static ApiResult callApi(String namespaceName, HttpMethod method, Scheme scheme, int port, String endpoint, String endpointParameters, boolean withCredentials) {
String ccPodName = PodUtils.getFirstPodNameContaining(namespaceName, CONTAINER_NAME);
String args = " -k -w \"%{http_code}\" ";

if (withCredentials) {
args += " --user admin:$(cat /opt/cruise-control/api-auth-config/cruise-control.apiAdminPassword) ";
}

return cmdKubeClient(namespaceName).execInPodContainer(Level.DEBUG, ccPodName, CONTAINER_NAME, "/bin/bash", "-c",
"curl -X" + method.name() + " localhost:" + CRUISE_CONTROL_METRICS_PORT + endpoint).out();
String curl = "curl -X " + method.name() + " " + args + " " + scheme + "://localhost:" + port + endpoint + endpointParameters;
return new ApiResult(cmdKubeClient(namespaceName).execInPodContainer(Level.DEBUG, ccPodName, CONTAINER_NAME, "/bin/bash", "-c", curl));
}

@SuppressWarnings("BooleanExpressionComplexity")
Expand Down Expand Up @@ -166,17 +172,6 @@ public static Properties getKafkaCruiseControlMetricsReporterConfiguration(Strin
return cruiseControlProperties;
}

public static void waitForRebalanceEndpointIsReady(String namespaceName) {
TestUtils.waitFor("rebalance endpoint to be ready",
TestConstants.API_CRUISE_CONTROL_POLL, TestConstants.API_CRUISE_CONTROL_TIMEOUT, () -> {
String response = callApi(namespaceName, SupportedHttpMethods.POST, CruiseControlEndpoints.REBALANCE, SupportedSchemes.HTTPS, true);
LOGGER.debug("API response: {}", response);
return !response.contains("Error processing POST request '/rebalance' due to: " +
"'com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException: " +
"com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException: ");
});
}

/**
* Returns user defined network capacity value without KiB/s suffix.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,11 @@
*/
package io.strimzi.systemtest.cruisecontrol;

import io.strimzi.api.kafka.model.rebalance.KafkaRebalance;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlEndpoints;
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlUserTaskStatus;
import io.strimzi.systemtest.AbstractST;
import io.strimzi.systemtest.Environment;
import io.strimzi.systemtest.annotations.KRaftWithoutUTONotSupported;
import io.strimzi.systemtest.annotations.ParallelNamespaceTest;
import io.strimzi.systemtest.storage.TestStorage;
import io.strimzi.systemtest.templates.crd.KafkaRebalanceTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTemplates;
import io.strimzi.systemtest.utils.kafkaUtils.KafkaRebalanceUtils;
import io.strimzi.systemtest.utils.specific.CruiseControlUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -30,9 +22,9 @@
import static io.strimzi.systemtest.TestConstants.ACCEPTANCE;
import static io.strimzi.systemtest.TestConstants.CRUISE_CONTROL;
import static io.strimzi.systemtest.TestConstants.REGRESSION;
import static io.strimzi.systemtest.utils.specific.CruiseControlUtils.CRUISE_CONTROL_DEFAULT_PORT;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;

@Tag(REGRESSION)
Expand All @@ -44,91 +36,6 @@ public class CruiseControlApiST extends AbstractST {
private static final String CRUISE_CONTROL_NAME = "Cruise Control";
private final String cruiseControlApiClusterName = "cruise-control-api-cluster-name";

@ParallelNamespaceTest
@KRaftWithoutUTONotSupported()
void testCruiseControlBasicAPIRequests(ExtensionContext extensionContext) {
final TestStorage testStorage = new TestStorage(extensionContext);

resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaWithCruiseControl(testStorage.getClusterName(), 3, 3).build());

LOGGER.info("----> CRUISE CONTROL DEPLOYMENT STATE ENDPOINT <----");

String response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.POST, CruiseControlEndpoints.STATE, CruiseControlUtils.SupportedSchemes.HTTPS, true);

assertThat(response, is("Unrecognized endpoint in request '/state'\n" +
"Supported POST endpoints: [ADD_BROKER, REMOVE_BROKER, FIX_OFFLINE_REPLICAS, REBALANCE, STOP_PROPOSAL_EXECUTION, PAUSE_SAMPLING, " +
"RESUME_SAMPLING, DEMOTE_BROKER, ADMIN, REVIEW, TOPIC_CONFIGURATION, RIGHTSIZE, REMOVE_DISKS]\n"));

response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.GET, CruiseControlEndpoints.STATE, CruiseControlUtils.SupportedSchemes.HTTPS, true);

LOGGER.info("Verifying that {} REST API is available", CRUISE_CONTROL_NAME);

assertThat(response, not(containsString("404")));
assertThat(response, containsString("RUNNING"));
assertThat(response, containsString("NO_TASK_IN_PROGRESS"));

// https://github.com/strimzi/strimzi-kafka-operator/issues/8864
if (!Environment.isUnidirectionalTopicOperatorEnabled()) {
CruiseControlUtils.verifyThatCruiseControlTopicsArePresent(testStorage.getNamespaceName());
}
LOGGER.info("----> KAFKA REBALANCE <----");

response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.GET, CruiseControlEndpoints.REBALANCE, CruiseControlUtils.SupportedSchemes.HTTPS, true);

assertThat(response, is("Unrecognized endpoint in request '/rebalance'\n" +
"Supported GET endpoints: [BOOTSTRAP, TRAIN, LOAD, PARTITION_LOAD, PROPOSALS, STATE, KAFKA_CLUSTER_STATE, USER_TASKS, REVIEW_BOARD, PERMISSIONS]\n"));

LOGGER.info("Waiting for CC will have for enough metrics to be recorded to make a proposal ");
CruiseControlUtils.waitForRebalanceEndpointIsReady(testStorage.getNamespaceName());

response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.POST, CruiseControlEndpoints.REBALANCE, CruiseControlUtils.SupportedSchemes.HTTPS, true);

// all goals stats that contains
assertCCGoalsInResponse(response);

assertThat(response, containsString("Cluster load after rebalance"));

LOGGER.info("----> EXECUTION OF STOP PROPOSAL <----");

response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.GET, CruiseControlEndpoints.STOP, CruiseControlUtils.SupportedSchemes.HTTPS, true);

assertThat(response, is("Unrecognized endpoint in request '/stop_proposal_execution'\n" +
"Supported GET endpoints: [BOOTSTRAP, TRAIN, LOAD, PARTITION_LOAD, PROPOSALS, STATE, KAFKA_CLUSTER_STATE, USER_TASKS, REVIEW_BOARD, PERMISSIONS]\n"));

response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.POST, CruiseControlEndpoints.STOP, CruiseControlUtils.SupportedSchemes.HTTPS, true);

assertThat(response, containsString("Proposal execution stopped."));

LOGGER.info("----> USER TASKS <----");

response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.POST, CruiseControlEndpoints.USER_TASKS, CruiseControlUtils.SupportedSchemes.HTTPS, true);

assertThat(response, is("Unrecognized endpoint in request '/user_tasks'\n" +
"Supported POST endpoints: [ADD_BROKER, REMOVE_BROKER, FIX_OFFLINE_REPLICAS, REBALANCE, STOP_PROPOSAL_EXECUTION, PAUSE_SAMPLING, " +
"RESUME_SAMPLING, DEMOTE_BROKER, ADMIN, REVIEW, TOPIC_CONFIGURATION, RIGHTSIZE, REMOVE_DISKS]\n"));

response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.GET, CruiseControlEndpoints.USER_TASKS, CruiseControlUtils.SupportedSchemes.HTTPS, true);

assertThat(response, containsString("GET"));
assertThat(response, containsString(CruiseControlEndpoints.STATE.toString()));
assertThat(response, containsString("POST"));
assertThat(response, containsString(CruiseControlEndpoints.REBALANCE.toString()));
assertThat(response, containsString(CruiseControlEndpoints.STOP.toString()));
assertThat(response, containsString(CruiseControlUserTaskStatus.COMPLETED.toString()));


LOGGER.info("Verifying that {} REST API doesn't allow HTTP requests", CRUISE_CONTROL_NAME);

response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.GET, CruiseControlEndpoints.STATE, CruiseControlUtils.SupportedSchemes.HTTP, false);
assertThat(response, not(containsString("RUNNING")));
assertThat(response, not(containsString("NO_TASK_IN_PROGRESS")));

LOGGER.info("Verifying that {} REST API doesn't allow unauthenticated requests", CRUISE_CONTROL_NAME);

response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.GET, CruiseControlEndpoints.STATE, CruiseControlUtils.SupportedSchemes.HTTPS, false);
assertThat(response, containsString("401 Unauthorized"));
}

@ParallelNamespaceTest
void testCruiseControlBasicAPIRequestsWithSecurityDisabled(ExtensionContext extensionContext) {
final TestStorage testStorage = new TestStorage(extensionContext);
Expand All @@ -146,81 +53,15 @@ void testCruiseControlBasicAPIRequestsWithSecurityDisabled(ExtensionContext exte
.build());

LOGGER.info("----> CRUISE CONTROL DEPLOYMENT STATE ENDPOINT <----");

String response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.GET, CruiseControlEndpoints.STATE, CruiseControlUtils.SupportedSchemes.HTTP, false);
CruiseControlUtils.ApiResult response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.HttpMethod.GET,
CruiseControlUtils.Scheme.HTTP, CRUISE_CONTROL_DEFAULT_PORT, CruiseControlEndpoints.STATE.toString(), "", false);
String responseText = response.getResponseText();
int responseCode = response.getResponseCode();

LOGGER.info("Verifying that {} REST API is available using HTTP request without credentials", CRUISE_CONTROL_NAME);

assertThat(response, not(containsString("404")));
assertThat(response, containsString("RUNNING"));
assertThat(response, containsString("NO_TASK_IN_PROGRESS"));
}

@ParallelNamespaceTest
void testCruiseControlAPIForScalingBrokersUpAndDown(ExtensionContext extensionContext) {
final TestStorage testStorage = new TestStorage(extensionContext);

resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaWithCruiseControl(testStorage.getClusterName(), 5, 3).build());

LOGGER.info("Checking if we are able to execute GET request on {} and {} endpoints", CruiseControlEndpoints.ADD_BROKER, CruiseControlEndpoints.REMOVE_BROKER);

String response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.GET, CruiseControlEndpoints.ADD_BROKER, CruiseControlUtils.SupportedSchemes.HTTPS, true);

assertThat(response, is("Unrecognized endpoint in request '/add_broker'\n" +
"Supported GET endpoints: [BOOTSTRAP, TRAIN, LOAD, PARTITION_LOAD, PROPOSALS, STATE, KAFKA_CLUSTER_STATE, USER_TASKS, REVIEW_BOARD, PERMISSIONS]\n"));

response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.GET, CruiseControlEndpoints.REMOVE_BROKER, CruiseControlUtils.SupportedSchemes.HTTPS, true);

assertThat(response, is("Unrecognized endpoint in request '/remove_broker'\n" +
"Supported GET endpoints: [BOOTSTRAP, TRAIN, LOAD, PARTITION_LOAD, PROPOSALS, STATE, KAFKA_CLUSTER_STATE, USER_TASKS, REVIEW_BOARD, PERMISSIONS]\n"));

LOGGER.info("Waiting for CC will have for enough metrics to be recorded to make a proposal ");
CruiseControlUtils.waitForRebalanceEndpointIsReady(testStorage.getNamespaceName());

response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.POST, CruiseControlEndpoints.ADD_BROKER, CruiseControlUtils.SupportedSchemes.HTTPS, true, "?brokerid=3,4");

assertCCGoalsInResponse(response);
assertThat(response, containsString("Cluster load after adding broker [3, 4]"));

response = CruiseControlUtils.callApi(testStorage.getNamespaceName(), CruiseControlUtils.SupportedHttpMethods.POST, CruiseControlEndpoints.REMOVE_BROKER, CruiseControlUtils.SupportedSchemes.HTTPS, true, "?brokerid=3,4");

assertCCGoalsInResponse(response);
assertThat(response, containsString("Cluster load after removing broker [3, 4]"));
}

@ParallelNamespaceTest
void testKafkaRebalanceAutoApprovalMechanism(ExtensionContext extensionContext) {
final TestStorage testStorage = new TestStorage(extensionContext);

resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaWithCruiseControl(testStorage.getClusterName(), 3, 3).build());

// KafkaRebalance with auto-approval
resourceManager.createResourceWithWait(extensionContext, KafkaRebalanceTemplates.kafkaRebalance(testStorage.getClusterName())
.editMetadata()
.addToAnnotations(Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL, "true")
.endMetadata()
.build());

KafkaRebalanceUtils.doRebalancingProcessWithAutoApproval(new Reconciliation("test", KafkaRebalance.RESOURCE_KIND,
testStorage.getNamespaceName(), testStorage.getClusterName()), testStorage.getNamespaceName(), testStorage.getClusterName());
}

private void assertCCGoalsInResponse(String response) {
assertThat(response, containsString("RackAwareGoal"));
assertThat(response, containsString("ReplicaCapacityGoal"));
assertThat(response, containsString("DiskCapacityGoal"));
assertThat(response, containsString("NetworkInboundCapacityGoal"));
assertThat(response, containsString("NetworkOutboundCapacityGoal"));
assertThat(response, containsString("CpuCapacityGoal"));
assertThat(response, containsString("ReplicaDistributionGoal"));
assertThat(response, containsString("DiskUsageDistributionGoal"));
assertThat(response, containsString("NetworkInboundUsageDistributionGoal"));
assertThat(response, containsString("NetworkOutboundUsageDistributionGoal"));
assertThat(response, containsString("CpuUsageDistributionGoal"));
assertThat(response, containsString("TopicReplicaDistributionGoal"));
assertThat(response, containsString("LeaderReplicaDistributionGoal"));
assertThat(response, containsString("LeaderBytesInDistributionGoal"));
assertThat(response, containsString("PreferredLeaderElectionGoal"));
assertThat(responseCode, is(200));
assertThat(responseText, containsString("RUNNING"));
assertThat(responseText, containsString("NO_TASK_IN_PROGRESS"));
}

@BeforeAll
Expand Down