Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for stopping connectors #9095

Merged
merged 3 commits into from
Sep 12, 2023
Merged

Conversation

mimaison
Copy link
Contributor

@mimaison mimaison commented Sep 8, 2023

Type of change

  • Enhancement / new feature

Description

This implements Proposal 54

It adds a new field for connectors, state, that can be set to running (the default), paused or stopped. The existing pause field is now deprecated and if both fields are set, state takes precedence.

The proposal also mentions handling Kafka Connect < 3.5 and downgrade stop to pause in this case. From what I understand the next version of Strimzi will only support Kafka >= 3.5 so the PUT /connector/<CONN>/stop should be available. For that reason I've not added logic to handle Kafka Connect < 3.5.

Checklist

Please go through this checklist and make sure all applicable tasks have been done

  • Write tests
  • Make sure all tests pass
  • Update documentation
  • Check RBAC rights for Kubernetes / OpenShift roles
  • Try your changes from Pod inside your Kubernetes and OpenShift cluster, not just locally
  • Reference relevant issue(s) and close them after merging
  • Update CHANGELOG.md
  • Supply screenshots for visual changes, such as Grafana dashboards

@@ -707,11 +745,11 @@ protected boolean hasRestartAnnotation(CustomResource resource, String connector
}

/**
* Return the ID of the connector task to be restarted if the provided KafkaConnector resource instance has the strimzio.io/restart-task annotation
* Return the ID of the connector task to be restarted if the provided KafkaConnector resource instance has the strimzi.io/restart-task annotation
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 changes are unrelated. Let me know if you prefer to fix them in a separate PR.

Copy link
Member

@scholzj scholzj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some nits. But LGTM otherwise. Also, it looks like the CI failure is related to this:

[ERROR] Failures: 
[ERROR] io.strimzi.operator.cluster.operator.assembly.KafkaConnectApiIT.test(VertxTestContext)
[ERROR]   Run 1: KafkaConnectApiIT.test(VertxTestContext) io.vertx.core.impl.NoStackTraceThrowable: Unexpected status code 204 for GET request to localhost:37969/connectors/test/stop
[ERROR]   Run 2: KafkaConnectApiIT.test(VertxTestContext) io.vertx.core.impl.NoStackTraceThrowable: Unexpected status code 204 for GET request to localhost:46515/connectors/test/stop
[ERROR]   Run 3: KafkaConnectApiIT.test(VertxTestContext) io.vertx.core.impl.NoStackTraceThrowable: Unexpected status code 204 for GET request to localhost:34987/connectors/test/stop

switch (state) {
case "RUNNING" -> {
if (effectiveState == ConnectorState.PAUSED) {
LOGGER.debugCr(reconciliation, "Pausing connector {}", connectorName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to log all the state changes on an INFO level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed them to INFO level

ConnectorState desiredState = connectorSpec.getState();
@SuppressWarnings("deprecation")
Boolean shouldPause = connectorSpec.getPause();
ConnectorState effectiveState = desiredState != null ? desiredState :
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can find better names here. I first assumed effectiveState would be the current one. Maybe targetState? Or maybe its just me who got confused by it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the variable to targetState

@mimaison
Copy link
Contributor Author

In KafkaConnectApiImpl, in most calls to the Connect REST API, in case of errors, we seem to wrap the error into a ConnectRestException when failing the future. For example: https://github.com/strimzi/strimzi-kafka-operator/blob/main/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectApiImpl.java#L92

This was not done in [pauseResume()](https://github.com/strimzi/strimzi-kafka-operator/blob/main/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectApiImpl.java#L302) (now renamed to updateState()). Is there a specific reason or should we make it consistent?

@ppatierno
Copy link
Member

Is there a specific reason or should we make it consistent?

@mimaison I don't know I am afraid. I noticed there are a couple of other places where the wrapping doesn't happen (i.e. one in delete and one in list).

@scholzj
Copy link
Member

scholzj commented Sep 11, 2023

@tombentley wrote this if I remember it correctly. Maybe he would have an idea? Obviously, it is a long time ago.

@scholzj scholzj added this to the 0.38.0 milestone Sep 11, 2023
Copy link
Member

@scholzj scholzj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks.

@scholzj
Copy link
Member

scholzj commented Sep 11, 2023

/azp run regression

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Copy link
Member

@ppatierno ppatierno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks!

@mimaison
Copy link
Contributor Author

For the Future String/ConnectRestException inconsistency, I propose fixing that in a follow up PR.

@scholzj
Copy link
Member

scholzj commented Sep 12, 2023

@mimaison Can you please do one more thing and add a record to the CHANGELOG.md?

This implements Proposal 54: https://github.com/strimzi/proposals/blob/main/054-stopping-kafka-connect-connectors.md

It adds a new field for connectors, state, that can be set to running (the default), paused or stopped. The existing pause field is now deprecated and if both fields are set, state takes precendence.

Signed-off-by: Mickael Maison <mickael.maison@gmail.com>
Signed-off-by: Mickael Maison <mickael.maison@gmail.com>
Signed-off-by: Mickael Maison <mickael.maison@gmail.com>
@mimaison
Copy link
Contributor Author

@scholzj Done!

@scholzj scholzj merged commit 09d0f30 into strimzi:main Sep 12, 2023
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants