diff --git a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/AzkarraStreamsService.java b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/AzkarraStreamsService.java index f5fda234..a533fe0b 100644 --- a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/AzkarraStreamsService.java +++ b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/AzkarraStreamsService.java @@ -18,14 +18,17 @@ */ package io.streamthoughts.azkarra.api; +import io.streamthoughts.azkarra.api.components.Qualifier; import io.streamthoughts.azkarra.api.config.Conf; import io.streamthoughts.azkarra.api.errors.InvalidStreamsStateException; +import io.streamthoughts.azkarra.api.errors.NoSuchComponentException; import io.streamthoughts.azkarra.api.errors.NotFoundException; import io.streamthoughts.azkarra.api.model.Environment; import io.streamthoughts.azkarra.api.model.Metric; import io.streamthoughts.azkarra.api.model.MetricGroup; import io.streamthoughts.azkarra.api.model.StreamsStatus; import io.streamthoughts.azkarra.api.model.StreamsTopologyGraph; +import io.streamthoughts.azkarra.api.model.TopologyAndAliases; import io.streamthoughts.azkarra.api.monad.Tuple; import io.streamthoughts.azkarra.api.providers.TopologyDescriptor; import io.streamthoughts.azkarra.api.query.Queried; @@ -35,10 +38,13 @@ import io.streamthoughts.azkarra.api.streams.ApplicationId; import io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer; import io.streamthoughts.azkarra.api.streams.ServerMetadata; +import io.streamthoughts.azkarra.api.streams.TopologyProvider; import io.streamthoughts.azkarra.api.streams.consumer.ConsumerGroupOffsets; +import io.streamthoughts.azkarra.api.util.Version; import org.apache.kafka.streams.StreamsConfig; import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; @@ -119,6 +125,44 @@ ApplicationId startStreamsTopology(final String topologyType, */ Set getTopologyProviders(); + /** + * Gets the list of all topologies. + * + * @return the list {@link TopologyAndAliases}. + */ + List getAllTopologies(); + + /** + * Gets the {@link TopologyDescriptor} for the specified alias and version. + * + * @param alias the topology alias. + * @param version the topology version. + * @return the {@link TopologyDescriptor}. + * @throws NoSuchComponentException if no topology exist for the given parameters. + * @throws IllegalArgumentException if the component for the given parameters is not a Topology. + */ + TopologyDescriptor getTopologyByAliasAndVersion(final String alias, final String version); + + /** + * Gets the {@link TopologyDescriptor} for the specified alias and qualifier. + * + * @param alias the topology alias. + * @param qualifier the topology qualifier. + * @return the {@link TopologyDescriptor}. + * @throws NoSuchComponentException if no topology exist for the given parameters. + * @throws IllegalArgumentException if the component for the given parameters is not a Topology. + */ + TopologyDescriptor getTopologyByAliasAndQualifiers(final String alias, + final Qualifier qualifier); + + /** + * Gets all versions of {@link TopologyDescriptor} for the specified alias. + * + * @param alias the topology alias. + * @throws NoSuchComponentException if no topology exist for the given parameters. + */ + List getTopologyVersionsByAlias(final String alias); + /** * Gets all metrics for the specified streams application. * diff --git a/azkarra-server/src/main/java/io/streamthoughts/azkarra/http/handler/TopologyListHandler.java b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/errors/NoSuchComponentException.java similarity index 52% rename from azkarra-server/src/main/java/io/streamthoughts/azkarra/http/handler/TopologyListHandler.java rename to azkarra-api/src/main/java/io/streamthoughts/azkarra/api/errors/NoSuchComponentException.java index 7ab7b9e5..1f151f29 100644 --- a/azkarra-server/src/main/java/io/streamthoughts/azkarra/http/handler/TopologyListHandler.java +++ b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/errors/NoSuchComponentException.java @@ -16,28 +16,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.streamthoughts.azkarra.http.handler; -import io.streamthoughts.azkarra.http.ExchangeHelper; -import io.streamthoughts.azkarra.api.AzkarraStreamsService; -import io.undertow.server.HttpServerExchange; +package io.streamthoughts.azkarra.api.errors; -public class TopologyListHandler extends AbstractStreamHttpHandler { +import io.streamthoughts.azkarra.api.components.Qualifier; - /** - * Creates a new {@link TopologyListHandler} instance. - * - * @param service the {@link AzkarraStreamsService} instance. - */ - public TopologyListHandler(final AzkarraStreamsService service) { - super(service); +public class NoSuchComponentException extends NotFoundException { + + public static NoSuchComponentException forAlias(final String alias) { + return new NoSuchComponentException( + "No such component for alias '" + alias + "'"); + } + + public static NoSuchComponentException forAliasAndQualifier(final String alias, final Qualifier qualifier) { + return new NoSuchComponentException( + "No such component for alias '" + alias + "' and qualifier '" + qualifier + "'"); } /** - * {@inheritDoc} + * Creates a new {@link NotFoundException} instance. + * + * @param message the error message. */ - @Override - public void handleRequest(final HttpServerExchange exchange) { - ExchangeHelper.sendJsonResponse(exchange, service.getTopologyProviders()); + public NoSuchComponentException(final String message) { + super(message); } } diff --git a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/model/TopologyAndAliases.java b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/model/TopologyAndAliases.java new file mode 100644 index 00000000..847782b2 --- /dev/null +++ b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/model/TopologyAndAliases.java @@ -0,0 +1,67 @@ +/* + * Copyright 2019-2020 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.streamthoughts.azkarra.api.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; +import java.util.Set; + +public class TopologyAndAliases { + + private String type; + private Set aliases; + + public TopologyAndAliases(final String type, + final Set aliases) { + this.type = Objects.requireNonNull(type, "type cannot be null"); + this.aliases = Objects.requireNonNull(aliases, "aliases cannot be null");; + } + + @JsonProperty("type") + public String type() { + return type; + } + + @JsonProperty("aliases") + public Set aliases() { + return aliases; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof TopologyAndAliases)) return false; + TopologyAndAliases that = (TopologyAndAliases) o; + return Objects.equals(type, that.type) && + Objects.equals(aliases, that.aliases); + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + return Objects.hash(type, aliases); + } +} diff --git a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/service/AbstractAzkarraStreamsService.java b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/service/AbstractAzkarraStreamsService.java new file mode 100644 index 00000000..374514e4 --- /dev/null +++ b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/service/AbstractAzkarraStreamsService.java @@ -0,0 +1,137 @@ +/* + * Copyright 2019-2020 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.streamthoughts.azkarra.runtime.service; + +import io.streamthoughts.azkarra.api.AzkarraContext; +import io.streamthoughts.azkarra.api.AzkarraStreamsService; +import io.streamthoughts.azkarra.api.components.ComponentDescriptor; +import io.streamthoughts.azkarra.api.components.ComponentFactory; +import io.streamthoughts.azkarra.api.components.Qualifier; +import io.streamthoughts.azkarra.api.components.qualifier.Qualifiers; +import io.streamthoughts.azkarra.api.errors.NoSuchComponentException; +import io.streamthoughts.azkarra.api.model.TopologyAndAliases; +import io.streamthoughts.azkarra.api.providers.TopologyDescriptor; +import io.streamthoughts.azkarra.api.streams.TopologyProvider; +import io.streamthoughts.azkarra.api.util.Version; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public abstract class AbstractAzkarraStreamsService implements AzkarraStreamsService { + + protected final AzkarraContext context; + /** + * Creates a new {@link AbstractAzkarraStreamsService} instance. + * @param context the {@link AzkarraContext} instance. + */ + AbstractAzkarraStreamsService(final AzkarraContext context) { + this.context = Objects.requireNonNull(context, "context cannot be null"); + } + + /** + * {@inheritDoc} + */ + @Override + public Set getTopologyProviders() { + return context.topologyProviders(); + } + + public List getAllTopologies() { + Map> topologies = new HashMap<>(); + var factory = context.getComponentFactory(); + var descriptors = factory.findAllDescriptorsByClass(TopologyProvider.class); + + for (ComponentDescriptor descriptor : descriptors) { + TopologyDescriptor topology = new TopologyDescriptor<>(descriptor); + Set aliases = topologies.get(topology.className()); + if (aliases == null) { + aliases = topology.aliases(); + } else { + aliases.retainAll(topology.aliases()); + } + topologies.put(topology.className(), aliases); + } + + return topologies.entrySet() + .stream() + .map(entry -> new TopologyAndAliases(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); + } + + /** + * {@inheritDoc} + */ + @Override + public TopologyDescriptor getTopologyByAliasAndVersion(final String alias, + final String version) { + Objects.requireNonNull(alias, "alias cannot be null"); + Objects.requireNonNull(version, "version cannot be null"); + + Qualifier qualifier = version.equalsIgnoreCase("latest") + ? Qualifiers.byLatestVersion() : Qualifiers.byVersion(version); + return getTopologyByAliasAndQualifiers(alias, qualifier); + } + + /** + * {@inheritDoc} + */ + @Override + @SuppressWarnings("unchecked") + public TopologyDescriptor getTopologyByAliasAndQualifiers(final String alias, + final Qualifier qualifier) { + ComponentFactory factory = context.getComponentFactory(); + var optional = factory.findDescriptorByAlias(alias, qualifier); + if (optional.isEmpty()) { + throw NoSuchComponentException.forAliasAndQualifier(alias, qualifier); + } + + var descriptor = optional.get(); + if (! TopologyProvider.class.isAssignableFrom(descriptor.type())) { + throw new NoSuchComponentException( + "Component for alias '"+ alias + "' and '" + qualifier + "' is not a sub-type of TopologyProvider"); + } + return new TopologyDescriptor(descriptor); + } + + + /** + * {@inheritDoc} + */ + @Override + public List getTopologyVersionsByAlias(final String alias) { + Objects.requireNonNull(alias, "alias cannot be null"); + ComponentFactory factory = context.getComponentFactory(); + Collection> descriptors = factory.findAllDescriptorsByAlias(alias); + if (descriptors.isEmpty()) { + throw NoSuchComponentException.forAlias(alias); + } + + return descriptors + .stream() + .map(ComponentDescriptor::version) + .sorted() + .collect(Collectors.toList()); + } +} diff --git a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/service/LocalAzkarraStreamsService.java b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/service/LocalAzkarraStreamsService.java index 474a2402..f0cbfdf3 100644 --- a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/service/LocalAzkarraStreamsService.java +++ b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/service/LocalAzkarraStreamsService.java @@ -32,7 +32,6 @@ import io.streamthoughts.azkarra.api.model.StreamsStatus; import io.streamthoughts.azkarra.api.model.StreamsTopologyGraph; import io.streamthoughts.azkarra.api.monad.Tuple; -import io.streamthoughts.azkarra.api.providers.TopologyDescriptor; import io.streamthoughts.azkarra.api.query.DistributedQuery; import io.streamthoughts.azkarra.api.query.Queried; import io.streamthoughts.azkarra.api.query.QueryParams; @@ -67,9 +66,7 @@ /** * The default {@link AzkarraStreamsService} implementations. */ -public class LocalAzkarraStreamsService implements AzkarraStreamsService { - - private final AzkarraContext context; +public class LocalAzkarraStreamsService extends AbstractAzkarraStreamsService { private RemoteQueryClient remoteQueryClient; @@ -81,10 +78,8 @@ public class LocalAzkarraStreamsService implements AzkarraStreamsService { */ public LocalAzkarraStreamsService(final AzkarraContext context, final RemoteQueryClient remoteQueryClient) { - Objects.requireNonNull(context, "context cannot be null"); - Objects.requireNonNull(remoteQueryClient, "remoteQueryClient cannot be null"); - this.context = context;; - this.remoteQueryClient = remoteQueryClient; + super(context); + this.remoteQueryClient = Objects.requireNonNull(remoteQueryClient, "remoteQueryClient cannot be null"); } /** @@ -154,14 +149,6 @@ public ApplicationId startStreamsTopology(final String topologyType, return context.addTopology(topologyType, topologyVersion, env, executed); } - /** - * {@inheritDoc} - */ - @Override - public Set getTopologyProviders() { - return context.topologyProviders(); - } - /** * {@inheritDoc} */ diff --git a/azkarra-server/pom.xml b/azkarra-server/pom.xml index f202b5a8..cc06c05f 100644 --- a/azkarra-server/pom.xml +++ b/azkarra-server/pom.xml @@ -31,7 +31,6 @@ ${project.parent.basedir} 4.1.2 4.9.0 - 2.30.1 azkarra-server diff --git a/azkarra-server/src/main/java/io/streamthoughts/azkarra/http/handler/AbstractStreamHttpHandler.java b/azkarra-server/src/main/java/io/streamthoughts/azkarra/http/handler/AbstractStreamHttpHandler.java index d94b29f4..36b37651 100644 --- a/azkarra-server/src/main/java/io/streamthoughts/azkarra/http/handler/AbstractStreamHttpHandler.java +++ b/azkarra-server/src/main/java/io/streamthoughts/azkarra/http/handler/AbstractStreamHttpHandler.java @@ -23,7 +23,7 @@ import java.util.Objects; -abstract class AbstractStreamHttpHandler implements HttpHandler { +public abstract class AbstractStreamHttpHandler implements HttpHandler { protected final AzkarraStreamsService service; @@ -32,7 +32,7 @@ abstract class AbstractStreamHttpHandler implements HttpHandler { * * @param service the {@link AzkarraStreamsService} instance. */ - AbstractStreamHttpHandler(final AzkarraStreamsService service) { + public AbstractStreamHttpHandler(final AzkarraStreamsService service) { Objects.requireNonNull(service, "service cannot be null"); this.service = service; } diff --git a/azkarra-server/src/main/java/io/streamthoughts/azkarra/http/routes/ApiTopologyRoutes.java b/azkarra-server/src/main/java/io/streamthoughts/azkarra/http/routes/ApiTopologyRoutes.java index 92929400..64b0a5be 100644 --- a/azkarra-server/src/main/java/io/streamthoughts/azkarra/http/routes/ApiTopologyRoutes.java +++ b/azkarra-server/src/main/java/io/streamthoughts/azkarra/http/routes/ApiTopologyRoutes.java @@ -19,12 +19,19 @@ package io.streamthoughts.azkarra.http.routes; import io.streamthoughts.azkarra.api.AzkarraStreamsService; +import io.streamthoughts.azkarra.api.util.Version; import io.streamthoughts.azkarra.http.APIVersions; -import io.streamthoughts.azkarra.http.handler.TopologyListHandler; +import io.streamthoughts.azkarra.http.ExchangeHelper; +import io.streamthoughts.azkarra.http.handler.AbstractStreamHttpHandler; import io.streamthoughts.azkarra.http.spi.RoutingHandlerProvider; import io.undertow.Handlers; +import io.undertow.server.HttpHandler; +import io.undertow.server.HttpServerExchange; import io.undertow.server.RoutingHandler; +import java.util.function.Consumer; +import java.util.stream.Collectors; + /** * This class defines all routes for API '/topologies'. */ @@ -40,9 +47,44 @@ public ApiTopologyRoutes() { * {@inheritDoc} */ @Override - public RoutingHandler handler(final AzkarraStreamsService service){ + public RoutingHandler handler(final AzkarraStreamsService service) { return Handlers.routing() - .get(APIVersions.PATH_V1 + "/topologies", new TopologyListHandler(service)); + .get(APIVersions.PATH_V1 + "/topologies", + handlerFor(service, exchange -> { + ExchangeHelper.sendJsonResponse(exchange, service.getAllTopologies()); + }) + ) + .get(APIVersions.PATH_V1 + "/topologies/{type}/versions", + handlerFor(service, exchange -> { + var type = ExchangeHelper.getQueryParam(exchange, "type"); + var versions = service.getTopologyVersionsByAlias(type) + .stream() + .map(Version::toString) + .collect(Collectors.toSet()); + ExchangeHelper.sendJsonResponse(exchange, versions); + }) + ) + .get(APIVersions.PATH_V1 + "/topologies/{type}/versions/{version}", + handlerFor(service, exchange -> { + var type = ExchangeHelper.getQueryParam(exchange, "type"); + var version = ExchangeHelper.getQueryParam(exchange, "version"); + ExchangeHelper.sendJsonResponse(exchange, + service.getTopologyByAliasAndVersion(type, version) + ); + }) + ); + } + + private HttpHandler handlerFor(final AzkarraStreamsService service, + final Consumer handler) { + return new AbstractStreamHttpHandler(service) { + @Override + public void handleRequest(HttpServerExchange exchange) { + handler.accept(exchange); + } + }; } + + } diff --git a/azkarra-ui/src/components/TopologyList.vue b/azkarra-ui/src/components/TopologyList.vue index c11262c3..27e17567 100644 --- a/azkarra-ui/src/components/TopologyList.vue +++ b/azkarra-ui/src/components/TopologyList.vue @@ -26,24 +26,31 @@ Type - Version - Description Aliases + Versions -