Skip to content

Commit

Permalink
Introduce deletion-mode for topics (LangStream#418)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Sep 15, 2023
1 parent d6332e4 commit 58d7cfe
Show file tree
Hide file tree
Showing 27 changed files with 363 additions and 105 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ jobs:
- name: Start minikube
id: minikube
uses: medyagh/setup-minikube@latest
with:
cpus: 4
memory: 8192
kubernetes-version: 1.26.3
- uses: azure/setup-helm@v3
with:
version: v3.7.0
Expand All @@ -149,9 +153,7 @@ jobs:
run: |
chmod +x mvnw
uname -m
eval $(minikube docker-env)
./docker/build.sh
eval $(minikube docker-env -u)
./dev/prepare-minikube-for-e2e-tests.sh
./mvnw install -pl langstream-e2e-tests -am -DskipTests
./mvnw verify -pl langstream-e2e-tests -De2eTests
Expand Down
21 changes: 21 additions & 0 deletions dev/prepare-minikube-for-e2e-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

minikube start --memory=8192 --cpus=4 --kubernetes-version=v1.26.3
eval $(minikube docker-env)
./docker/build.sh
eval $(minikube docker-env -u)
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public void deployAsset() throws Exception {
}
}
}

@Override
public void deleteAsset() throws Exception {
throw new UnsupportedOperationException();
}
}

private static class CassandraKeyspaceAssetManager extends BaseCassandraAssetManager {
Expand Down Expand Up @@ -147,6 +152,11 @@ public void deployAsset() throws Exception {
}
}
}

@Override
public void deleteAsset() throws Exception {
throw new UnsupportedOperationException();
}
}

private abstract static class BaseCassandraAssetManager implements AssetManager {
Expand Down Expand Up @@ -203,6 +213,11 @@ public void deployAsset() throws Exception {
}
}
}

@Override
public void deleteAsset() throws Exception {
throw new UnsupportedOperationException();
}
}

private static CassandraDataSource buildDataSource(AssetDefinition assetDefinition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,23 @@ public class TopicDefinition {
public static final String CREATE_MODE_NONE = "none";
public static final String CREATE_MODE_CREATE_IF_NOT_EXISTS = "create-if-not-exists";

public static final String DELETE_MODE_NONE = "none";
public static final String DELETE_MODE_DELETE = "delete";

public TopicDefinition() {
creationMode = CREATE_MODE_NONE;
deletionMode = DELETE_MODE_NONE;
}

public static TopicDefinition fromName(String name) {
return new TopicDefinition(
name, CREATE_MODE_NONE, false, 0, null, null, Map.of(), Map.of());
name, CREATE_MODE_NONE, DELETE_MODE_NONE, false, 0, null, null, Map.of(), Map.of());
}

public TopicDefinition(
String name,
String creationMode,
String deletionMode,
boolean implicit,
int partitions,
SchemaDefinition keySchema,
Expand All @@ -53,6 +58,7 @@ public TopicDefinition(
this();
this.name = name;
this.creationMode = Objects.requireNonNullElse(creationMode, CREATE_MODE_NONE);
this.deletionMode = Objects.requireNonNullElse(deletionMode, DELETE_MODE_NONE);
this.implicit = implicit;
this.partitions = partitions;
this.keySchema = keySchema;
Expand All @@ -67,6 +73,9 @@ public TopicDefinition(
@JsonProperty("creation-mode")
private String creationMode;

@JsonProperty("deletion-mode")
private String deletionMode;

// Kafka Admin special configuration options
private Map<String, Object> config;
private Map<String, Object> options;
Expand All @@ -87,10 +96,21 @@ private void validateCreationMode() {
}
}

private void validateDeletionMode() {
switch (deletionMode) {
case DELETE_MODE_DELETE:
case DELETE_MODE_NONE:
break;
default:
throw new IllegalArgumentException("Invalid deletion mode: " + deletionMode);
}
}

public TopicDefinition copy() {
TopicDefinition copy = new TopicDefinition();
copy.setName(name);
copy.setCreationMode(creationMode);
copy.setDeletionMode(deletionMode);
copy.setImplicit(implicit);
copy.setPartitions(partitions);
copy.setKeySchema(keySchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ public interface AssetManager {

void deployAsset() throws Exception;

void deleteAsset() throws Exception;

default void close() throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public void deployAsset() throws Exception {
executeWithContextClassloader(agentCode -> agentCode.deployAsset());
}

@Override
public void deleteAsset() throws Exception {
executeWithContextClassloader(agentCode -> agentCode.deleteAsset());
}

@Override
public void close() throws Exception {
executeWithContextClassloader(agentCode -> agentCode.close());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ private Topic buildImplicitTopicForDeadletterQueue(
new TopicDefinition(
name,
creationMode,
TopicDefinition.CREATE_MODE_NONE,
inputTopicDefinition.isImplicit(),
inputTopicDefinition.getPartitions(),
inputTopicDefinition.getKeySchema(),
Expand All @@ -372,18 +373,20 @@ protected Topic buildImplicitTopicForAgent(
AgentConfiguration agentConfiguration,
StreamingClusterRuntime streamingClusterRuntime) {
// connecting two agents requires an intermediate topic
String name = "agent-" + agentConfiguration.getId() + "-input";
final String name = "agent-" + agentConfiguration.getId() + "-input";
log.info(
"Automatically creating topic {} in order to connect as input for agent {}",
name,
agentConfiguration.getId());
// short circuit...the Pulsar Runtime works only with Pulsar Topics on the same Pulsar
// Cluster
String creationMode = TopicDefinition.CREATE_MODE_CREATE_IF_NOT_EXISTS;
final String creationMode = TopicDefinition.CREATE_MODE_CREATE_IF_NOT_EXISTS;
final String deletionMode = TopicDefinition.DELETE_MODE_NONE;
TopicDefinition topicDefinition =
new TopicDefinition(
name,
creationMode,
deletionMode,
true,
DEFAULT_PARTITIONS_FOR_IMPLICIT_TOPICS,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,46 @@ public Object deploy(
}

/**
* Delete the application instance and all the resources associated with it.
* Undeploy the application and delete all the agents.
*
* @param physicalApplicationInstance the application instance
* @param tenant
* @param executionPlan the application plan
* @param codeStorageArchiveId the code storage archive id
*/
public void delete(
String tenant, ExecutionPlan physicalApplicationInstance, String codeStorageArchiveId) {
Application applicationInstance = physicalApplicationInstance.getApplication();
public void delete(String tenant, ExecutionPlan executionPlan, String codeStorageArchiveId) {
Application applicationInstance = executionPlan.getApplication();
ComputeClusterRuntime clusterRuntime =
registry.getClusterRuntime(applicationInstance.getInstance().computeCluster());
StreamingClusterRuntime streamingClusterRuntime =
registry.getStreamingClusterRuntime(
applicationInstance.getInstance().streamingCluster());
clusterRuntime.delete(
tenant,
physicalApplicationInstance,
executionPlan,
streamingClusterRuntime,
codeStorageArchiveId,
deployContext);
}

/**
* Cleanup all the resources associated with an application.
*
* @param tenant
* @param executionPlan the application instance
*/
public void cleanup(String tenant, ExecutionPlan executionPlan) {
cleanupTopics(executionPlan);
}

private void cleanupTopics(ExecutionPlan executionPlan) {
TopicConnectionsRuntime topicConnectionsRuntime =
topicConnectionsRuntimeRegistry
.getTopicConnectionsRuntime(
executionPlan.getApplication().getInstance().streamingCluster())
.asTopicConnectionsRuntime();
topicConnectionsRuntime.delete(executionPlan);
}

@Override
public void close() {
registry.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ private static void parsePipelineFile(String filename, String content, Applicati
new TopicDefinition(
topicDefinition.getName(),
topicDefinition.getCreationMode(),
topicDefinition.getDeletionMode(),
false,
topicDefinition.getPartitions(),
topicDefinition.getKeySchema(),
Expand Down Expand Up @@ -631,6 +632,9 @@ public static final class TopicDefinitionModel {
@JsonProperty("creation-mode")
private String creationMode;

@JsonProperty("deletion-mode")
private String deletionMode;

private SchemaDefinition schema;

private int partitions = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtensionContext;
Expand All @@ -79,6 +80,7 @@ public class BaseEndToEndTest implements TestWatcher {
public static final File TEST_LOGS_DIR = new File("target", "e2e-test-logs");
protected static final String TENANT_NAMESPACE_PREFIX = "ls-tenant-";
protected static final ObjectMapper MAPPER = new ObjectMapper(new YAMLFactory());
protected static final String KAFKA_NAMESPACE = "kafka-ns";

interface KubeServer {
void start();
Expand All @@ -98,14 +100,7 @@ public void start() {}
public void ensureImage(String image) {}

@Override
public void stop() {
try (final KubernetesClient client =
new KubernetesClientBuilder()
.withConfig(Config.fromKubeconfig(kubeServer.getKubeConfig()))
.build()) {
client.namespaces().withName(namespace).delete();
}
}
public void stop() {}

@Override
@SneakyThrows
Expand Down Expand Up @@ -305,6 +300,11 @@ private static void runProcess(String[] allArgs, boolean allowFailures)

public static CompletableFuture<String> execInPod(
String podName, String containerName, String... cmds) {
return execInPodInNamespace(namespace, podName, containerName, cmds);
}

public static CompletableFuture<String> execInPodInNamespace(
String namespace, String podName, String containerName, String... cmds) {

final String cmd = String.join(" ", cmds);
log.info(
Expand Down Expand Up @@ -521,6 +521,12 @@ public void setupSingleTest() {
.serverSideApply();
}

@AfterEach
public void cleanupAfterEach() {
cleanupAllEndToEndTestsNamespaces();
execInKafkaPod("rpk topic delete -r \".*\"");
}

private static void cleanupAllEndToEndTestsNamespaces() {
client.namespaces().withLabel("app", "ls-test").delete();
client.namespaces().list().getItems().stream()
Expand Down Expand Up @@ -676,7 +682,7 @@ private static void installKafka() {
client.resource(
new NamespaceBuilder()
.withNewMetadata()
.withName("kafka-ns")
.withName(KAFKA_NAMESPACE)
.endMetadata()
.build())
.serverSideApply();
Expand Down Expand Up @@ -852,12 +858,19 @@ private static List<String> getAllUserNamespaces() {

private static void dumpResources(
String filePrefix, Class<? extends HasMetadata> clazz, List<String> namespaces) {
for (String namespace : namespaces) {
client.resources(clazz)
.inNamespace(namespace)
.list()
.getItems()
.forEach(resource -> dumpResource(filePrefix, resource));
try {
for (String namespace : namespaces) {
client.resources(clazz)
.inNamespace(namespace)
.list()
.getItems()
.forEach(resource -> dumpResource(filePrefix, resource));
}
} catch (Throwable t) {
log.warn(
"failed to dump resources of type {}: {}",
clazz.getSimpleName(),
t.getMessage());
}
}

Expand Down Expand Up @@ -928,4 +941,30 @@ protected static void dumpEvents(String filePrefix) {
log.error("failed to write events logs to file {}", outputFile, e);
}
}

@SneakyThrows
protected static List<String> getAllTopicsFromKafka() {
final String result = execInKafkaPod("rpk topic list");
if (result == null) {
throw new IllegalStateException("failed to get topics from kafka");
}

final List<String> topics = new ArrayList<>();
final List<String> lines = result.lines().collect(Collectors.toList());
boolean first = true;
for (String line : lines) {
if (first) {
first = false;
continue;
}
topics.add(line.split(" ")[0]);
}
return topics;
}

@SneakyThrows
private static String execInKafkaPod(String cmd) {
return execInPodInNamespace(KAFKA_NAMESPACE, "redpanda-0", "redpanda", cmd.split(" "))
.get(1, TimeUnit.MINUTES);
}
}
Loading

0 comments on commit 58d7cfe

Please sign in to comment.