Skip to content

Commit

Permalink
refactor(all): refactor StreamsExecutionEnvironment interface to supp…
Browse files Browse the repository at this point in the history
…ort additional implementations

This commit adds some significant changes to the azkarra API:
- add new interface ApplicationConfigEntryLoader

Breaking changes:
- rename DefaultStreamsExecutionEnvironment to LocalStreamsExecutionEnvironment
- remove methods from StreamsExecutionEnvironment interface
- rename KafkaStreamsContainerBuilder to LocalKafkaStreamsContainerBuilder and move it to module 'runtime'
- rename DefaultKafkaStreamsContainer to LocalKafkaStreamsContainer and move it to module 'runtime'
- add new LocalKafkaStreamsContainer#topology() method
  • Loading branch information
fhussonnois committed Jan 28, 2021
1 parent ae4b9be commit 8259fe1
Show file tree
Hide file tree
Showing 71 changed files with 2,527 additions and 1,130 deletions.
Expand Up @@ -96,7 +96,7 @@ public interface AzkarraContext extends ConfigurableComponentFactory, ComponentR
* @throws AlreadyExistsException if a {@link StreamsExecutionEnvironment}
* is already registered for the given name.
*/
AzkarraContext addExecutionEnvironment(final StreamsExecutionEnvironment environment)
AzkarraContext addExecutionEnvironment(final StreamsExecutionEnvironment<?> environment)
throws AlreadyExistsException;

/**
Expand Down Expand Up @@ -168,45 +168,47 @@ ApplicationId addTopology(final String type,
*
* @return a set of {@link TopologyDescriptor}.
*/
Set<TopologyDescriptor> topologyProviders();
Set<TopologyDescriptor> getTopologyDescriptors();

/**
* Gets all topologies registered into this {@link AzkarraContext} which are available for the given environment.
*
* @param environmentName the {@link StreamsExecutionEnvironment}
* @return a set of {@link TopologyDescriptor}.
*/
Set<TopologyDescriptor> getTopologyDescriptors(final String environmentName);


/**
* Gets all topologies registered into this {@link AzkarraContext} which are available for the given environment.
*
* @param env the {@link StreamsExecutionEnvironment}
* @return a set of {@link TopologyDescriptor}.
*/
Set<TopologyDescriptor> topologyProviders(final StreamsExecutionEnvironment env);
Set<TopologyDescriptor> getTopologyDescriptors(final StreamsExecutionEnvironment<?> env);

/**
* Gets all {@link StreamsExecutionEnvironment} registered to this context.
*
* @return a list of {@link StreamsExecutionEnvironment} instance.
*/
List<StreamsExecutionEnvironment> environments();
List<StreamsExecutionEnvironment<?>> getAllEnvironments();

/**
* Gets the {@link StreamsExecutionEnvironment} for the specified name or create a new one.
* Gets the {@link StreamsExecutionEnvironment} for the specified name.
*
* @param envName the environment name.
* @return a {@link StreamsExecutionEnvironment} instance with the specified name.
*/
StreamsExecutionEnvironment getEnvironmentForNameOrCreate(final String envName);
StreamsExecutionEnvironment<?> getEnvironmentForName(final String envName);

/**
* Gets the default {@link StreamsExecutionEnvironment}.
* This method can return {@code null} while the context is not started.
*
* @return a {@link StreamsExecutionEnvironment} instance.
*/
StreamsExecutionEnvironment defaultExecutionEnvironment();

/**
* Gets the topology for the specified class name or alias.
*
* @param type the topology type.
* @return the {@link TopologyDescriptor}.
*/
TopologyDescriptor getTopology(final String type);
StreamsExecutionEnvironment<?> getDefaultEnvironment();

/**
* Starts this {@link AzkarraContext} instance.
Expand Down
Expand Up @@ -18,9 +18,27 @@
*/
package io.streamthoughts.azkarra.api;

public interface AzkarraContextListener {
import io.streamthoughts.azkarra.api.components.Ordered;

void onContextStart(final AzkarraContext context);
/**
* @see AzkarraContext
*/
public interface AzkarraContextListener extends Ordered {


/**
* {@inheritDoc}
*/
@Override
default int order() {
return Ordered.LOWEST_ORDER - 100;
}

default void onContextStart(final AzkarraContext context) {

}

default void onContextStop(final AzkarraContext context) {

void onContextStop(final AzkarraContext context);
}
}
Expand Up @@ -20,7 +20,6 @@

import io.streamthoughts.azkarra.api.components.Qualifier;
import io.streamthoughts.azkarra.api.config.Conf;
import io.streamthoughts.azkarra.api.errors.InvalidStreamsStateException;
import io.streamthoughts.azkarra.api.errors.NoSuchComponentException;
import io.streamthoughts.azkarra.api.errors.NotFoundException;
import io.streamthoughts.azkarra.api.model.Environment;
Expand All @@ -31,10 +30,6 @@
import io.streamthoughts.azkarra.api.model.TopologyAndAliases;
import io.streamthoughts.azkarra.api.monad.Tuple;
import io.streamthoughts.azkarra.api.providers.TopologyDescriptor;
import io.streamthoughts.azkarra.api.query.Queried;
import io.streamthoughts.azkarra.api.query.QueryParams;
import io.streamthoughts.azkarra.api.query.internal.Query;
import io.streamthoughts.azkarra.api.query.result.QueryResult;
import io.streamthoughts.azkarra.api.streams.ApplicationId;
import io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer;
import io.streamthoughts.azkarra.api.streams.ServerMetadata;
Expand Down Expand Up @@ -201,13 +196,21 @@ TopologyDescriptor getTopologyByAliasAndQualifiers(final String alias,
*/
Set<Environment> getAllEnvironments();

/**
* Gets all supported environment types.
*
* @return the set of the environment types.
*/
Set<String> getSupportedEnvironmentTypes();

/**
* Adds a new environment to this application.
*
* @param name the environment name.
* @param type the environment type.
* @param conf the environment configuration.
*/
void addNewEnvironment(final String name, final Conf conf);
void addNewEnvironment(final String name, final String type, final Conf conf);

/**
* Gets all local and remote streams instances for the specified streams application.
Expand Down
Expand Up @@ -23,20 +23,24 @@
import io.streamthoughts.azkarra.api.streams.ApplicationId;
import io.streamthoughts.azkarra.api.streams.ApplicationIdBuilder;
import io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer;
import io.streamthoughts.azkarra.api.streams.KafkaStreamsFactory;
import io.streamthoughts.azkarra.api.streams.TopologyProvider;
import io.streamthoughts.azkarra.api.streams.errors.StreamThreadExceptionHandler;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.StateRestoreListener;

import java.time.Duration;
import java.util.Collection;
import java.util.Set;
import java.util.function.Supplier;

/**
* A StreamsExecutionEnvironment manages the lifecycle of {@link org.apache.kafka.streams.Topology} instances.
*/
public interface StreamsExecutionEnvironment {
public interface StreamsExecutionEnvironment<T extends StreamsExecutionEnvironment<T>> {

/**
* Gets the type of this {@link StreamsExecutionEnvironment}.
*
* @return the string type.
*/
String type();

/**
* Gets the name of this {@link StreamsExecutionEnvironment}.
Expand All @@ -46,80 +50,47 @@ public interface StreamsExecutionEnvironment {
String name();

/**
* Gets the state f this {@link StreamsExecutionEnvironment}.
* Gets the state of this {@link StreamsExecutionEnvironment}.
* @return the {@link State}.
*/
State state();

/**
* Adds a {@link KafkaStreams.StateListener} instance that will set to all {@link KafkaStreams} instance created
* in this {@link StreamsExecutionEnvironment}.
*
* @see KafkaStreams#setStateListener(KafkaStreams.StateListener).
*
* @param listener the {@link KafkaStreams.StateListener} instance.
* Check whether this {@link StreamsExecutionEnvironment} is marked as default.
*
* @throws IllegalStateException if this {@link StreamsExecutionEnvironment} instance is started.
*
* @return this {@link StreamsExecutionEnvironment} instance.
* @return {@code true} if this {@link StreamsExecutionEnvironment} is marked as default.
*/
StreamsExecutionEnvironment addStateListener(final KafkaStreams.StateListener listener);
boolean isDefault();

/**
* Adds a {@link StateRestoreListener} instance that will set to all {@link KafkaStreams} instance created
* in this {@link StreamsExecutionEnvironment}.
*
* @see KafkaStreams#setGlobalStateRestoreListener(StateRestoreListener) .
*
* @param listener the {@link StateRestoreListener} instance.
*
* @throws IllegalStateException if this {@link StreamsExecutionEnvironment} instance is started.
* Creates a new {@link StreamsTopologyExecution} to be applied on this {@link StreamsExecutionEnvironment}.
*
* @return this {@link StreamsExecutionEnvironment} instance.
*/
StreamsExecutionEnvironment addGlobalStateListener(final StateRestoreListener listener);

/**
* Adds a streams interceptor that will set to all {@link KafkaStreams} instance created
* in this {@link StreamsExecutionEnvironment}.
* The interceptors will be executed in the order in which they were added.
*
* @param interceptor the {@link {@link StreamsLifecycleInterceptor}}.
* @return this {@link StreamsExecutionEnvironment} instance.
* @param meta the {@link StreamsTopologyMeta} to executed.
* @param executed the execution options.
* @return the new {@link StreamsTopologyExecution} instance.
*/
StreamsExecutionEnvironment addStreamsLifecycleInterceptor(final Supplier<StreamsLifecycleInterceptor> interceptor);
StreamsTopologyExecution newTopologyExecution(final StreamsTopologyMeta meta, final Executed executed);

/**
* Sets the {@link StreamThreadExceptionHandler} invoked when a StreamThread abruptly terminates
* due to an uncaught exception.
*
* @param handler the {@link StreamThreadExceptionHandler}.
* @return this {@link StreamsExecutionEnvironment} instance.
* Returns all containers for active Kafka Streams applications.
*
* @see KafkaStreams#setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler)
*/
StreamsExecutionEnvironment setStreamThreadExceptionHandler(final Supplier<StreamThreadExceptionHandler> handler);

/**
* Gets the {@link StreamThreadExceptionHandler}.
*
* @return the {@link Supplier<StreamThreadExceptionHandler>}, otherwise {@code null} if no handler is set.
* @return a collection of {@link KafkaStreamsContainer} applications.
*/
Supplier<StreamThreadExceptionHandler> getStreamThreadExceptionHandler();
Collection<KafkaStreamsContainer> applications();

/**
* Returns all {@link KafkaStreams} started applications.
* Returns all ids for active Kafka Streams applications.
*
* @return a collection of {@link KafkaStreamsContainer} applications.
*/
Collection<KafkaStreamsContainer> applications();
Set<String> applicationIds();

/**
* Sets this environment configuration.
*
* @return this {@link StreamsExecutionEnvironment} instance.
*/
StreamsExecutionEnvironment setConfiguration(final Conf configuration);
T setConfiguration(final Conf configuration);

/**
* Gets this environment configuration.
Expand All @@ -135,15 +106,15 @@ public interface StreamsExecutionEnvironment {
*
* @return this {@link StreamsExecutionEnvironment} instance.
*/
StreamsExecutionEnvironment setRocksDBConfig(final RocksDBConfig settings);
T setRocksDBConfig(final RocksDBConfig settings);

/**
* Sets the {@link ApplicationIdBuilder} that should be used for building streams {@code application.id}.
*
* @param supplier the {@link ApplicationIdBuilder} instance supplier.
* @return this {@link StreamsExecutionEnvironment} instance.
*/
StreamsExecutionEnvironment setApplicationIdBuilder(final Supplier<ApplicationIdBuilder> supplier);
T setApplicationIdBuilder(final Supplier<ApplicationIdBuilder> supplier);

/**
* Gets the {@link ApplicationIdBuilder}.
Expand All @@ -158,37 +129,7 @@ public interface StreamsExecutionEnvironment {
* @param settings the {@link Conf} instance.
* @return this {@link StreamsExecutionEnvironment} instance.
*/
StreamsExecutionEnvironment addFallbackConfiguration(final Conf settings);

/**
* Sets the {@link KafkaStreamsFactory} that will be used to provide
* the {@link KafkaStreams} to configure and start.
*
* @param kafkaStreamsFactory the {@link KafkaStreamsFactory} instance.
* @return this {@link StreamsExecutionEnvironment} instance.
*/
StreamsExecutionEnvironment setKafkaStreamsFactory(final Supplier<KafkaStreamsFactory> kafkaStreamsFactory);

/**
* Add a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment} to be started.
*
* @param provider the {@link TopologyProvider} supplier.
*
* @return this {@link ApplicationId} instance if the environment is already started,
* otherwise {@code null}.
*/
ApplicationId addTopology(final Supplier<TopologyProvider> provider);

/**
* Add a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment} to be started.
*
* @param provider the {@link TopologyProvider} supplier.
* @param executed the {@link Executed} instance.
*
* @return this {@link ApplicationId} instance if the environment is already started,
* otherwise {@code null}.
*/
ApplicationId addTopology(final Supplier<TopologyProvider> provider, final Executed executed);
T addFallbackConfiguration(final Conf settings);

/**
* Starts this {@link StreamsExecutionEnvironment} instance.
Expand Down
@@ -0,0 +1,53 @@
/*
* Copyright 2019-2021 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.streamthoughts.azkarra.api;

import io.streamthoughts.azkarra.api.config.Conf;

public interface StreamsExecutionEnvironmentFactory<T extends StreamsExecutionEnvironment<T>> {

/**
* Creates a new StreamsExecutionEnvironment from the specified name and an empty configuration.
*
* @param name the environment name.
* @return a new {@link StreamsExecutionEnvironment} of type {@link T}.
*/
default T create(final String name) {
return create(name, Conf.empty());
}

/**
* Creates a new StreamsExecutionEnvironment from the specified name and configuration.
*
* @param name the environment name.
* @param conf the environment configuration.
* @return a new {@link StreamsExecutionEnvironment} of type {@link T}.
*/
T create(final String name, final Conf conf);

/**
* Returns the string type associated with the {@link StreamsExecutionEnvironment} that can
* be created from this factory.
*
* @return the type of the {@link StreamsExecutionEnvironment}.
*/
String type();

}
Expand Up @@ -39,10 +39,10 @@ default String applicationId() {
}

/**
* @see KafkaStreamsContainer#topologyDescription()
* @see KafkaStreamsContainer#getTopology()
*/
default TopologyDescription topologyDescription() {
return container().topologyDescription();
return container().getTopology().describe();
}

/**
Expand Down

0 comments on commit 8259fe1

Please sign in to comment.