Skip to content

Commit

Permalink
feat(core & webserver): bring lightweight version of namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
brian-mulier-p committed Jul 5, 2024
1 parent 4eaccd2 commit da500b3
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 0 deletions.
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ tasks.named('check') {
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
}

tasks.named('testCodeCoverageReport') {
dependsOn ':core:copyGradleProperties'
dependsOn ':ui:assembleFrontend'
}

/**********************************************************************************************************************\
* Sonar
**********************************************************************************************************************/
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/io/kestra/core/models/namespaces/Namespace.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.kestra.core.models.namespaces;

import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
import lombok.experimental.SuperBuilder;

@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@EqualsAndHashCode
public class Namespace implements NamespaceInterface {
@NotNull
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*")
protected String id;

@NotNull
@Builder.Default
boolean deleted = false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.kestra.core.models.namespaces;

import io.kestra.core.models.DeletedInterface;

public interface NamespaceInterface extends DeletedInterface {
String getId();

default String uid() {
return this.getId();
}
}
15 changes: 15 additions & 0 deletions core/src/main/java/io/kestra/core/utils/NamespaceUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.core.utils;

import java.util.*;

public abstract class NamespaceUtils {
public static List<String> asTree(String namespace) {
List<String> split = Arrays.asList(namespace.split("\\."));
List<String> terms = new ArrayList<>();
for (int i = 0; i < split.size(); i++) {
terms.add(String.join(".", split.subList(0, i + 1)));
}

return terms;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package io.kestra.webserver.controllers.api;

import io.kestra.core.models.namespaces.Namespace;
import io.kestra.core.models.topologies.FlowTopologyGraph;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.NamespaceUtils;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.webserver.models.namespaces.NamespaceWithDisabled;
import io.kestra.webserver.responses.PagedResults;
import io.kestra.webserver.utils.PageableUtils;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import io.micronaut.http.annotation.*;
import io.micronaut.http.exceptions.HttpStatusException;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.validation.Validated;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import jakarta.inject.Inject;

import java.util.*;
import java.util.stream.Collectors;

@Validated
@Controller("/api/v1/namespaces")
public class NamespaceController implements NamespaceControllerInterface<Namespace, NamespaceWithDisabled> {
@Inject
private TenantService tenantService;

@Inject
private FlowRepositoryInterface flowRepository;

@Inject
private FlowTopologyService flowTopologyService;

@Get(uri = "{id}")
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Namespaces"}, summary = "Get a namespace")
public Namespace index(
@Parameter(description = "The namespace id") @PathVariable String id
) {
return Namespace.builder().id(id).build();
}

@Get(uri = "/search")
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Namespaces"}, summary = "Search for namespaces")
public PagedResults<NamespaceWithDisabled> find(
@Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query,
@Parameter(description = "The current page") @QueryValue(defaultValue = "1") int page,
@Parameter(description = "The current page size") @QueryValue(defaultValue = "10") int size,
@Parameter(description = "The sort of current page") @Nullable @QueryValue List<String> sort,
@Parameter(description = "Return only existing namespace") @Nullable @QueryValue(value = "existing", defaultValue = "false") Boolean existingOnly
) throws HttpStatusException {
List<String> distinctNamespaces = flowRepository.findDistinctNamespace(tenantService.resolveTenant()).stream()
.flatMap(n -> NamespaceUtils.asTree(n).stream())
.sorted()
.distinct()
.collect(Collectors.toList());

if (query != null) {
distinctNamespaces = distinctNamespaces
.stream()
.filter(s -> s.toLowerCase(Locale.ROOT).contains(query.toLowerCase(Locale.ROOT)))
.collect(Collectors.toList());
}

var total = distinctNamespaces.size();

Pageable pageable = PageableUtils.from(page, size, sort);

if (sort != null) {
Sort.Order.Direction direction = pageable.getSort().getOrderBy().getFirst().getDirection();

if (direction.equals(Sort.Order.Direction.ASC)) {
Collections.sort(distinctNamespaces);
} else {
Collections.reverse(distinctNamespaces);
}
}

if (distinctNamespaces.size() > pageable.getSize()) {
distinctNamespaces = distinctNamespaces.subList(
(int) pageable.getOffset() - pageable.getSize(),
Math.min((int) pageable.getOffset(), distinctNamespaces.size())
);
}

return PagedResults.of(new ArrayListTotal<>(
distinctNamespaces
.stream()
.<NamespaceWithDisabled>map(s -> NamespaceWithDisabled.builder()
.id(s)
.disabled(true)
.build()
).toList(),
total
));
}

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "{namespace}/dependencies")
@Operation(tags = {"Flows"}, summary = "Get flow dependencies")
public FlowTopologyGraph dependencies(
@Parameter(description = "The flow namespace") @PathVariable String namespace,
@Parameter(description = "if true, list only destination dependencies, otherwise list also source dependencies") @QueryValue(defaultValue = "false") boolean destinationOnly
) {
return flowTopologyService.namespaceGraph(tenantService.resolveTenant(), namespace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.kestra.webserver.controllers.api;

import io.kestra.core.models.namespaces.NamespaceInterface;
import io.kestra.core.models.topologies.FlowTopologyGraph;
import io.kestra.webserver.models.namespaces.DisabledInterface;
import io.kestra.webserver.responses.PagedResults;
import io.micronaut.http.exceptions.HttpStatusException;

import java.util.List;

public interface NamespaceControllerInterface<N extends NamespaceInterface, D extends NamespaceInterface & DisabledInterface> {
N index(String id);

PagedResults<D> find(String query, int page, int size, List<String> sort, Boolean existingOnly) throws HttpStatusException;

FlowTopologyGraph dependencies(String namespace, boolean destinationOnly);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.kestra.webserver.models.namespaces;

public interface DisabledInterface {
boolean isDisabled();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.kestra.webserver.models.namespaces;

import io.kestra.core.models.namespaces.Namespace;
import io.micronaut.core.annotation.Introspected;
import lombok.*;
import lombok.experimental.SuperBuilder;

@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class NamespaceWithDisabled extends Namespace implements DisabledInterface {
boolean disabled;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package io.kestra.webserver.controllers.api;

import com.devskiller.friendly_id.FriendlyId;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.namespaces.Namespace;
import io.kestra.core.models.topologies.FlowNode;
import io.kestra.core.models.topologies.FlowRelation;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.topologies.FlowTopologyGraph;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.plugin.core.log.Log;
import io.kestra.webserver.models.namespaces.NamespaceWithDisabled;
import io.kestra.webserver.responses.PagedResults;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@KestraTest
public class NamespaceControllerTest {
@Inject
@Client("/")
private ReactorHttpClient client;

@Inject
private FlowRepositoryInterface flowRepository;

@Inject
private FlowTopologyRepositoryInterface flowTopologyRepository;

@BeforeEach
void reset() {
flowRepository.findAllForAllTenants().forEach(flowRepository::delete);
}

@Test
void get() {
flow("my.ns");
Namespace namespace = client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/namespaces/my.ns"),
Namespace.class
);

assertThat(namespace.getId(), is("my.ns"));
assertThat(namespace.isDeleted(), is(false));
}

@SuppressWarnings("unchecked")
@Test
void list() {
flow("my.ns");
flow("my.ns.flow");
flow("another.ns");

PagedResults<NamespaceWithDisabled> list = client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/namespaces/search"),
Argument.of(PagedResults.class, NamespaceWithDisabled.class)
);
assertThat(list.getTotal(), is(5L));
assertThat(list.getResults().size(), is(5));
assertThat(list.getResults(), everyItem(hasProperty("disabled", is(true))));
assertThat(list.getResults().map(NamespaceWithDisabled::getId), containsInAnyOrder(
"my", "my.ns", "my.ns.flow",
"another", "another.ns"
));


list = client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/namespaces/search?size=2&sort=id:desc"),
Argument.of(PagedResults.class, NamespaceWithDisabled.class)
);
assertThat(list.getTotal(), is(5L));
assertThat(list.getResults().size(), is(2));
assertThat(list.getResults().getFirst().getId(), is("my.ns.flow"));
assertThat(list.getResults().get(1).getId(), is("my.ns"));

list = client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/namespaces/search?page=2&size=2&sort=id:desc"),
Argument.of(PagedResults.class, NamespaceWithDisabled.class)
);
assertThat(list.getTotal(), is(5L));
assertThat(list.getResults().size(), is(2));
assertThat(list.getResults().getFirst().getId(), is("my"));
assertThat(list.getResults().get(1).getId(), is("another.ns"));

list = client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/namespaces/search?q=ns"),
Argument.of(PagedResults.class, NamespaceWithDisabled.class)
);
assertThat(list.getTotal(), is(3L));
assertThat(list.getResults().size(), is(3));
}

@Test
void namespaceTopology() {
flowTopologyRepository.save(createSimpleFlowTopology("flow-a", "flow-b"));
flowTopologyRepository.save(createSimpleFlowTopology("flow-a", "flow-c"));

FlowTopologyGraph retrieve = client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/namespaces/topology.namespace/dependencies"),
Argument.of(FlowTopologyGraph.class)
);

assertThat(retrieve.getNodes().size(), is(3));
assertThat(retrieve.getEdges().size(), is(2));
}

protected Flow flow(String namespace) {
Flow flow = Flow.builder()
.id("flow-" + FriendlyId.createFriendlyId())
.namespace(namespace)
.tasks(List.of(
Log.builder()
.id("log")
.type(Log.class.getName())
.message("Hello")
.build()
))
.build();
return flowRepository.create(flow, flow.generateSource(), flow);
}

protected FlowTopology createSimpleFlowTopology(String flowA, String flowB) {
return FlowTopology.builder()
.relation(FlowRelation.FLOW_TASK)
.source(FlowNode.builder()
.id(flowA)
.namespace("topology.namespace")
.uid(flowA)
.build()
)
.destination(FlowNode.builder()
.id(flowB)
.namespace("topology.namespace")
.uid(flowB)
.build()
)
.build();
}

}

0 comments on commit da500b3

Please sign in to comment.