Skip to content

Commit

Permalink
feat(all): refactor and add new endpoints for topologies
Browse files Browse the repository at this point in the history
This commit adds new REST endpoints to query available topologies:

 - GET /api/v1/topologies
 - GET /api/v1/topologies/:type/versions
 - GET /api/v1/topologies/:type/versions/:version
  • Loading branch information
fhussonnois committed Oct 19, 2020
1 parent acb82f5 commit 4f67fc6
Show file tree
Hide file tree
Showing 11 changed files with 379 additions and 61 deletions.
Expand Up @@ -18,14 +18,17 @@
*/
package io.streamthoughts.azkarra.api;

import io.streamthoughts.azkarra.api.components.Qualifier;
import io.streamthoughts.azkarra.api.config.Conf;
import io.streamthoughts.azkarra.api.errors.InvalidStreamsStateException;
import io.streamthoughts.azkarra.api.errors.NoSuchComponentException;
import io.streamthoughts.azkarra.api.errors.NotFoundException;
import io.streamthoughts.azkarra.api.model.Environment;
import io.streamthoughts.azkarra.api.model.Metric;
import io.streamthoughts.azkarra.api.model.MetricGroup;
import io.streamthoughts.azkarra.api.model.StreamsStatus;
import io.streamthoughts.azkarra.api.model.StreamsTopologyGraph;
import io.streamthoughts.azkarra.api.model.TopologyAndAliases;
import io.streamthoughts.azkarra.api.monad.Tuple;
import io.streamthoughts.azkarra.api.providers.TopologyDescriptor;
import io.streamthoughts.azkarra.api.query.Queried;
Expand All @@ -35,10 +38,13 @@
import io.streamthoughts.azkarra.api.streams.ApplicationId;
import io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer;
import io.streamthoughts.azkarra.api.streams.ServerMetadata;
import io.streamthoughts.azkarra.api.streams.TopologyProvider;
import io.streamthoughts.azkarra.api.streams.consumer.ConsumerGroupOffsets;
import io.streamthoughts.azkarra.api.util.Version;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -119,6 +125,44 @@ ApplicationId startStreamsTopology(final String topologyType,
*/
Set<TopologyDescriptor> getTopologyProviders();

/**
* Gets the list of all topologies.
*
* @return the list {@link TopologyAndAliases}.
*/
List<TopologyAndAliases> getAllTopologies();

/**
* Gets the {@link TopologyDescriptor} for the specified alias and version.
*
* @param alias the topology alias.
* @param version the topology version.
* @return the {@link TopologyDescriptor}.
* @throws NoSuchComponentException if no topology exist for the given parameters.
* @throws IllegalArgumentException if the component for the given parameters is not a Topology.
*/
TopologyDescriptor getTopologyByAliasAndVersion(final String alias, final String version);

/**
* Gets the {@link TopologyDescriptor} for the specified alias and qualifier.
*
* @param alias the topology alias.
* @param qualifier the topology qualifier.
* @return the {@link TopologyDescriptor}.
* @throws NoSuchComponentException if no topology exist for the given parameters.
* @throws IllegalArgumentException if the component for the given parameters is not a Topology.
*/
TopologyDescriptor getTopologyByAliasAndQualifiers(final String alias,
final Qualifier<? extends TopologyProvider> qualifier);

/**
* Gets all versions of {@link TopologyDescriptor} for the specified alias.
*
* @param alias the topology alias.
* @throws NoSuchComponentException if no topology exist for the given parameters.
*/
List<Version> getTopologyVersionsByAlias(final String alias);

/**
* Gets all metrics for the specified streams application.
*
Expand Down
Expand Up @@ -16,28 +16,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.azkarra.http.handler;

import io.streamthoughts.azkarra.http.ExchangeHelper;
import io.streamthoughts.azkarra.api.AzkarraStreamsService;
import io.undertow.server.HttpServerExchange;
package io.streamthoughts.azkarra.api.errors;

public class TopologyListHandler extends AbstractStreamHttpHandler {
import io.streamthoughts.azkarra.api.components.Qualifier;

/**
* Creates a new {@link TopologyListHandler} instance.
*
* @param service the {@link AzkarraStreamsService} instance.
*/
public TopologyListHandler(final AzkarraStreamsService service) {
super(service);
public class NoSuchComponentException extends NotFoundException {

public static NoSuchComponentException forAlias(final String alias) {
return new NoSuchComponentException(
"No such component for alias '" + alias + "'");
}

public static NoSuchComponentException forAliasAndQualifier(final String alias, final Qualifier qualifier) {
return new NoSuchComponentException(
"No such component for alias '" + alias + "' and qualifier '" + qualifier + "'");
}

/**
* {@inheritDoc}
* Creates a new {@link NotFoundException} instance.
*
* @param message the error message.
*/
@Override
public void handleRequest(final HttpServerExchange exchange) {
ExchangeHelper.sendJsonResponse(exchange, service.getTopologyProviders());
public NoSuchComponentException(final String message) {
super(message);
}
}
@@ -0,0 +1,67 @@
/*
* Copyright 2019-2020 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.streamthoughts.azkarra.api.model;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;
import java.util.Set;

public class TopologyAndAliases {

private String type;
private Set<String> aliases;

public TopologyAndAliases(final String type,
final Set<String> aliases) {
this.type = Objects.requireNonNull(type, "type cannot be null");
this.aliases = Objects.requireNonNull(aliases, "aliases cannot be null");;
}

@JsonProperty("type")
public String type() {
return type;
}

@JsonProperty("aliases")
public Set<String> aliases() {
return aliases;
}

/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof TopologyAndAliases)) return false;
TopologyAndAliases that = (TopologyAndAliases) o;
return Objects.equals(type, that.type) &&
Objects.equals(aliases, that.aliases);
}

/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
return Objects.hash(type, aliases);
}
}
@@ -0,0 +1,137 @@
/*
* Copyright 2019-2020 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.streamthoughts.azkarra.runtime.service;

import io.streamthoughts.azkarra.api.AzkarraContext;
import io.streamthoughts.azkarra.api.AzkarraStreamsService;
import io.streamthoughts.azkarra.api.components.ComponentDescriptor;
import io.streamthoughts.azkarra.api.components.ComponentFactory;
import io.streamthoughts.azkarra.api.components.Qualifier;
import io.streamthoughts.azkarra.api.components.qualifier.Qualifiers;
import io.streamthoughts.azkarra.api.errors.NoSuchComponentException;
import io.streamthoughts.azkarra.api.model.TopologyAndAliases;
import io.streamthoughts.azkarra.api.providers.TopologyDescriptor;
import io.streamthoughts.azkarra.api.streams.TopologyProvider;
import io.streamthoughts.azkarra.api.util.Version;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

public abstract class AbstractAzkarraStreamsService implements AzkarraStreamsService {

protected final AzkarraContext context;
/**
* Creates a new {@link AbstractAzkarraStreamsService} instance.
* @param context the {@link AzkarraContext} instance.
*/
AbstractAzkarraStreamsService(final AzkarraContext context) {
this.context = Objects.requireNonNull(context, "context cannot be null");
}

/**
* {@inheritDoc}
*/
@Override
public Set<TopologyDescriptor> getTopologyProviders() {
return context.topologyProviders();
}

public List<TopologyAndAliases> getAllTopologies() {
Map<String, Set<String>> topologies = new HashMap<>();
var factory = context.getComponentFactory();
var descriptors = factory.findAllDescriptorsByClass(TopologyProvider.class);

for (ComponentDescriptor<TopologyProvider> descriptor : descriptors) {
TopologyDescriptor<?> topology = new TopologyDescriptor<>(descriptor);
Set<String> aliases = topologies.get(topology.className());
if (aliases == null) {
aliases = topology.aliases();
} else {
aliases.retainAll(topology.aliases());
}
topologies.put(topology.className(), aliases);
}

return topologies.entrySet()
.stream()
.map(entry -> new TopologyAndAliases(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
}

/**
* {@inheritDoc}
*/
@Override
public TopologyDescriptor getTopologyByAliasAndVersion(final String alias,
final String version) {
Objects.requireNonNull(alias, "alias cannot be null");
Objects.requireNonNull(version, "version cannot be null");

Qualifier<? extends TopologyProvider> qualifier = version.equalsIgnoreCase("latest")
? Qualifiers.byLatestVersion() : Qualifiers.byVersion(version);
return getTopologyByAliasAndQualifiers(alias, qualifier);
}

/**
* {@inheritDoc}
*/
@Override
@SuppressWarnings("unchecked")
public TopologyDescriptor getTopologyByAliasAndQualifiers(final String alias,
final Qualifier<? extends TopologyProvider> qualifier) {
ComponentFactory factory = context.getComponentFactory();
var optional = factory.findDescriptorByAlias(alias, qualifier);
if (optional.isEmpty()) {
throw NoSuchComponentException.forAliasAndQualifier(alias, qualifier);
}

var descriptor = optional.get();
if (! TopologyProvider.class.isAssignableFrom(descriptor.type())) {
throw new NoSuchComponentException(
"Component for alias '"+ alias + "' and '" + qualifier + "' is not a sub-type of TopologyProvider");
}
return new TopologyDescriptor(descriptor);
}


/**
* {@inheritDoc}
*/
@Override
public List<Version> getTopologyVersionsByAlias(final String alias) {
Objects.requireNonNull(alias, "alias cannot be null");
ComponentFactory factory = context.getComponentFactory();
Collection<ComponentDescriptor<Object>> descriptors = factory.findAllDescriptorsByAlias(alias);
if (descriptors.isEmpty()) {
throw NoSuchComponentException.forAlias(alias);
}

return descriptors
.stream()
.map(ComponentDescriptor::version)
.sorted()
.collect(Collectors.toList());
}
}
Expand Up @@ -32,7 +32,6 @@
import io.streamthoughts.azkarra.api.model.StreamsStatus;
import io.streamthoughts.azkarra.api.model.StreamsTopologyGraph;
import io.streamthoughts.azkarra.api.monad.Tuple;
import io.streamthoughts.azkarra.api.providers.TopologyDescriptor;
import io.streamthoughts.azkarra.api.query.DistributedQuery;
import io.streamthoughts.azkarra.api.query.Queried;
import io.streamthoughts.azkarra.api.query.QueryParams;
Expand Down Expand Up @@ -67,9 +66,7 @@
/**
* The default {@link AzkarraStreamsService} implementations.
*/
public class LocalAzkarraStreamsService implements AzkarraStreamsService {

private final AzkarraContext context;
public class LocalAzkarraStreamsService extends AbstractAzkarraStreamsService {

private RemoteQueryClient remoteQueryClient;

Expand All @@ -81,10 +78,8 @@ public class LocalAzkarraStreamsService implements AzkarraStreamsService {
*/
public LocalAzkarraStreamsService(final AzkarraContext context,
final RemoteQueryClient remoteQueryClient) {
Objects.requireNonNull(context, "context cannot be null");
Objects.requireNonNull(remoteQueryClient, "remoteQueryClient cannot be null");
this.context = context;;
this.remoteQueryClient = remoteQueryClient;
super(context);
this.remoteQueryClient = Objects.requireNonNull(remoteQueryClient, "remoteQueryClient cannot be null");
}

/**
Expand Down Expand Up @@ -154,14 +149,6 @@ public ApplicationId startStreamsTopology(final String topologyType,
return context.addTopology(topologyType, topologyVersion, env, executed);
}

/**
* {@inheritDoc}
*/
@Override
public Set<TopologyDescriptor> getTopologyProviders() {
return context.topologyProviders();
}

/**
* {@inheritDoc}
*/
Expand Down
1 change: 0 additions & 1 deletion azkarra-server/pom.xml
Expand Up @@ -31,7 +31,6 @@
<checkstyle.config.location>${project.parent.basedir}</checkstyle.config.location>
<rest.assured.version>4.1.2</rest.assured.version>
<okhttp.version>4.9.0</okhttp.version>
<jersey.version>2.30.1</jersey.version>
</properties>

<artifactId>azkarra-server</artifactId>
Expand Down

0 comments on commit 4f67fc6

Please sign in to comment.