Skip to content

Commit

Permalink
馃敡 Add support for multiple configuration contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
helpermethod committed Aug 20, 2021
1 parent ca861db commit 2c110e1
Show file tree
Hide file tree
Showing 25 changed files with 321 additions and 43 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,25 @@ It is recommended to install the bash/zsh completion script _kcctl_completion_:
. kcctl_completion
```

## Quickstart

Before you can start using _kcctl_ you need to create a configuration context.
A configuration context is a set of configuration parameters, grouped
by a name.
To create a configuration context named `local`, with the Kafka Connect cluster URL set to
`http://localhost:8083`, issue the following command

```shell script
kcctl set-context local --cluster http://localhost:8083
```

:exclamation: Note that certain commands will require additional parameters, like `bootstrap-servers` and
`offset-topic`.

Type `kcctl info` to display some information about the Kafka connect cluster.
The command will use the currently active context, `local` in this case, to
resolve the cluster URL.

## Usage

Display the help to learn about using _kcctl_:
Expand Down
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.javacrumbs.json-unit</groupId>
<artifactId>json-unit-assertj</artifactId>
<version>2.27.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/dev/morling/kccli/command/ApplyCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ApplyCommand implements Callable<Integer> {
@Override
public Integer call() throws Exception {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

if (!file.exists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ConnectorNamesCompletionCandidateCommand implements Runnable {
@Override
public void run() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

List<String> connectors = kafkaConnectApi.getConnectors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class DeleteConnectorCommand implements Runnable {
@Override
public void run() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

kafkaConnectApi.deleteConnector(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class DescribeConnectorCommand implements Callable<Integer> {
@Override
public Integer call() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

if (includeTasksConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class GetConnectorsCommand implements Runnable {
@Override
public void run() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

List<String> connectors = kafkaConnectApi.getConnectors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class GetContextCommand implements Runnable {

@Override
public void run() {
String clusterUri = context.getCluster().toASCIIString();
String clusterUri = context.getContext().getCluster().toASCIIString();
System.out.println("Current context is set to " + clusterUri);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class GetLoggerCommand implements Runnable {
@Override
public void run() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

String[][] data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class GetLoggersCommand implements Runnable {
@Override
public void run() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

ObjectNode connectorLoggers = kafkaConnectApi.getLoggers("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class GetPluginsCommand implements Runnable {
@Override
public void run() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

List<ConnectorPlugin> connectorPlugins = kafkaConnectApi.getConnectorPlugins();
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/dev/morling/kccli/command/InfoCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public class InfoCommand implements Runnable {
@Override
public void run() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

KafkaConnectInfo workerInfo = kafkaConnectApi.getWorkerInfo();
System.out.println("URL: " + context.getCluster());
System.out.println("URL: " + context.getContext().getCluster());
System.out.println("Version: " + workerInfo.version);
System.out.println("Commit: " + workerInfo.commit);
System.out.println("Kafka Cluster ID: " + workerInfo.kafka_cluster_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class LoggerNamesCompletionCandidateCommand implements Runnable {
@Override
public void run() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

ObjectNode connectorLoggers = kafkaConnectApi.getLoggers("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class PatchConnectorCommand implements Callable<Integer> {
public Integer call() throws JsonProcessingException {

KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

Map<String, String> connectorParameters = kafkaConnectApi.getConnectorConfig(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class PatchLogLevelCommand implements Callable {
@Override
public Object call() throws Exception {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

ObjectMapper mapper = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class PauseConnectorCommand implements Runnable {
@Override
public void run() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

kafkaConnectApi.pauseConnector(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class RestartConnectorCommand implements Runnable {
@Override
public void run() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

kafkaConnectApi.restartConnector(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class RestartTaskCommand implements Runnable {
@Override
public void run() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

String[] parts = name.split("\\/");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class ResumeConnectorCommand implements Runnable {
@Override
public void run() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

kafkaConnectApi.resumeConnector(name);
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/dev/morling/kccli/command/SetContextCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,32 @@
*/
package dev.morling.kccli.command;

import dev.morling.kccli.service.Context;
import dev.morling.kccli.util.ConfigurationContext;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

import java.net.URI;

@Command(name = "set-context", description = "Set the context to cluster provided in arguments")
public class SetContextCommand implements Runnable {
@Parameters(index = "0", description = "Context name")
String contextName;

@Option(names = { "--cluster" }, description = "URL of the Kafka Connect cluster to connect to", required = true)
@Option(names = {"--cluster"}, defaultValue = "http://localhost:8083", description = "URL of the Kafka Connect cluster to connect to")
String cluster;

@Option(names = {"--bootstrap-servers"}, defaultValue = "localhost:9092", description = "Comma-separated list of Kafka broker URLs")
String bootstrapServers;

@Option(names = {"--offset-topic"}, description = "Name of the offset topic")
String offsetTopic;

@Override
public void run() {
ConfigurationContext context = new ConfigurationContext();
context.setConfiguration(cluster);
System.out.println("Successfully set context to " + cluster);
context.setContext(contextName, new Context(URI.create(cluster), bootstrapServers, offsetTopic));
System.out.println("Using context " + contextName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TaskNamesCompletionCandidateCommand implements Runnable {
@Override
public void run() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getCluster())
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);

List<String> connectors = kafkaConnectApi.getConnectors();
Expand Down
51 changes: 51 additions & 0 deletions src/main/java/dev/morling/kccli/service/Configuration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2021 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.kccli.service;

import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.LinkedHashMap;
import java.util.Map;

import static com.fasterxml.jackson.annotation.JsonCreator.Mode.PROPERTIES;

public class Configuration {
private final String currentContext;
private final Map<String, Context> configurationContexts = new LinkedHashMap<>();

public Configuration(@JsonProperty("currentContext") String currentContext) {
this.currentContext = currentContext;
}

public String getCurrentContext() {
return currentContext;
}

@JsonAnyGetter
public Map<String, Context> configurationContexts() {
return configurationContexts;
}

@JsonAnySetter
public Configuration addConfigurationContext(String contextName, Context configuration) {
configurationContexts.put(contextName, configuration);

return this;
}
}
48 changes: 48 additions & 0 deletions src/main/java/dev/morling/kccli/service/Context.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2021 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.kccli.service;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.net.URI;

public class Context {
private final URI cluster;
private final String bootstrapServers;
private final String offsetTopic;

public Context(
@JsonProperty("cluster") URI cluster,
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("offsetTopic") String offsetTopic
) {
this.cluster = cluster;
this.bootstrapServers = bootstrapServers;
this.offsetTopic = offsetTopic;
}

public URI getCluster() {
return cluster;
}

public String getBootstrapServers() {
return bootstrapServers;
}

public String getOffsetTopic() {
return offsetTopic;
}
}

0 comments on commit 2c110e1

Please sign in to comment.