From 21ffbc022fe443483733dbca6abeb908cb2e0849 Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Fri, 5 Feb 2021 18:15:17 +0100 Subject: [PATCH] sub-taks(all): refactor StreamsExecutionEnvironment Changes: - add new methods registerTopology, addConfiguration and addStreamsConfiguration to LocalStreamsExecutionEnvironment class - update StreamsTopologyExecution to return an optional - add builder class for StreamsTopologyMeta --- .../api/StreamsExecutionEnvironment.java | 38 ++- .../api/StreamsExecutionEnvironmentAware.java | 2 +- .../azkarra/api/StreamsTopologyExecution.java | 7 +- .../azkarra/api/StreamsTopologyMeta.java | 109 +++++-- .../azkarra/api/config/Conf.java | 3 +- .../azkarra/api/config/MapConf.java | 8 +- .../azkarra/api/config/RocksDBConfig.java | 30 +- .../azkarra/api/streams/TopologyProvider.java | 26 ++ .../context/DefaultAzkarraContext.java | 194 ++++++------ .../runtime/env/LocalStreamsExecution.java | 72 ++--- .../env/LocalStreamsExecutionEnvironment.java | 289 ++++++++++++------ .../CompositeStreamsInterceptor.java | 2 +- .../streams/DefaultApplicationIdBuilder.java | 4 +- .../CloseKafkaStreamsOnThreadException.java | 4 +- .../LocalStreamsExecutionEnvironmentTest.java | 3 +- .../env/LocalStreamsExecutionTest.java | 21 +- 16 files changed, 507 insertions(+), 305 deletions(-) diff --git a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsExecutionEnvironment.java b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsExecutionEnvironment.java index 65c78013..7f81fd81 100644 --- a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsExecutionEnvironment.java +++ b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsExecutionEnvironment.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.streamthoughts.azkarra.api.config.Conf; -import io.streamthoughts.azkarra.api.config.RocksDBConfig; import io.streamthoughts.azkarra.api.errors.NotFoundException; import io.streamthoughts.azkarra.api.model.HasId; import io.streamthoughts.azkarra.api.model.HasName; @@ -29,6 +28,7 @@ import io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer; import org.apache.kafka.streams.KafkaStreams; +import java.io.Serializable; import java.time.Duration; import java.util.Collection; import java.util.Objects; @@ -37,7 +37,10 @@ import java.util.function.Supplier; /** - * A StreamsExecutionEnvironment manages the lifecycle of {@link org.apache.kafka.streams.Topology} instances. + * A {@code StreamsExecutionEnvironment} manages the execution and the lifecycle of one or many {@link KafkaStreams} + * instances that run either locally or remotely. + * + * @see KafkaStreamsContainer */ public interface StreamsExecutionEnvironment> extends HasName { @@ -107,7 +110,7 @@ default KafkaStreamsContainer getContainerById(final ContainerId id) { .findFirst() .orElseThrow(() -> new NotFoundException( - "Fail to find running KafkaStreams instance for container id '" + "Failed to find running KafkaStreams instance for container id '" + id + "' in environment '" + name() + "'") ); @@ -136,26 +139,32 @@ default KafkaStreamsContainer getContainerById(final ContainerId id) { Optional getApplicationById(final ApplicationId id); /** - * Sets this environment configuration. + * Adds a new configuration to this environment. + * This method can be invoked multiple time. The supplied configuration will override all prior configurations. + * + * @see #addConfiguration(Supplier) * * @return this {@link StreamsExecutionEnvironment} instance. */ - T setConfiguration(final Conf configuration); + default T addConfiguration(final Conf configuration) { + return addConfiguration(() -> configuration); + } /** - * Gets this environment configuration. + * Adds a new configuration to this environment. + * This method can be invoked multiple time. The supplied configuration will override all prior configurations. * - * @return the {@link Conf} instance. + * + * @return this {@link StreamsExecutionEnvironment} instance. */ - Conf getConfiguration(); + T addConfiguration(final Supplier configuration); /** - * Sets the {@link RocksDBConfig} streamsConfig used by topology persistent stores. + * Gets this environment configuration. * - * @param settings the {@link RocksDBConfig} instance. - * @return this {@link StreamsExecutionEnvironment} instance. + * @return the {@link Conf} instance. */ - T setRocksDBConfig(final RocksDBConfig settings); + Conf getConfiguration(); /** * Sets the {@link ApplicationIdBuilder} that should be used for building streams {@code application.id}. @@ -173,7 +182,8 @@ default KafkaStreamsContainer getContainerById(final ContainerId id) { Supplier getApplicationIdBuilder(); /** - * Adds streamsConfig that will be used in fallback if not present in defined environment streamsConfig. + * Adds settings to this environment that will be used in fallback if not present + * in the defined environment configuration. * * @param settings the {@link Conf} instance. * @return this {@link StreamsExecutionEnvironment} instance. @@ -321,7 +331,7 @@ default View describe() { * A {@code Environment} is used to describe the current state of a * {@link io.streamthoughts.azkarra.api.StreamsExecutionEnvironment} instance. */ - class View implements HasName { + class View implements HasName, Serializable { private final StreamsExecutionEnvironment environment; diff --git a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsExecutionEnvironmentAware.java b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsExecutionEnvironmentAware.java index b9f4a6b2..ce090ac7 100644 --- a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsExecutionEnvironmentAware.java +++ b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsExecutionEnvironmentAware.java @@ -25,5 +25,5 @@ public interface StreamsExecutionEnvironmentAware { * * @param environment the {@link StreamsExecutionEnvironment} instance */ - void setExecutionEnvironment(final StreamsExecutionEnvironment environment); + void setExecutionEnvironment(final StreamsExecutionEnvironment environment); } diff --git a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsTopologyExecution.java b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsTopologyExecution.java index 77844fee..fc854eac 100644 --- a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsTopologyExecution.java +++ b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsTopologyExecution.java @@ -19,6 +19,7 @@ package io.streamthoughts.azkarra.api; +import java.util.Optional; import java.util.concurrent.Callable; /** @@ -26,20 +27,20 @@ * * @see StreamsExecutionEnvironment */ -public interface StreamsTopologyExecution extends Callable { +public interface StreamsTopologyExecution extends Callable> { /** * Starts the streams-topology encapsulated by this object. * * @return the {@link ApplicationId}. */ - ApplicationId start(); + Optional start(); /** * {@inheritDoc} */ @Override - default ApplicationId call() { + default Optional call() { return start(); } } diff --git a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsTopologyMeta.java b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsTopologyMeta.java index ac53cd5d..1b27bb59 100644 --- a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsTopologyMeta.java +++ b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsTopologyMeta.java @@ -20,10 +20,12 @@ package io.streamthoughts.azkarra.api; import io.streamthoughts.azkarra.api.config.Conf; +import io.streamthoughts.azkarra.api.providers.TopologyDescriptor; import io.streamthoughts.azkarra.api.streams.TopologyProvider; import io.streamthoughts.azkarra.api.util.Version; import java.util.Objects; +import java.util.Optional; public class StreamsTopologyMeta { @@ -34,29 +36,28 @@ public class StreamsTopologyMeta { private final ClassLoader classLoader; private final Conf conf; - /** * Creates a new {@link StreamsTopologyMeta} instance. * - * @param name the name of the topology. - * @param version the version of the topology - * @param description the description of the topology. - * @param type the topology {@link Class}. - * @param classLoader the topology {@link ClassLoader}. - * @param conf the default {@link Conf} for the topology. + * @param name the name of the topology. + * @param version the version of the topology + * @param description the description of the topology. + * @param type the topology {@link Class}. + * @param classLoader the topology {@link ClassLoader}. + * @param conf the default {@link Conf} for the topology. */ - public StreamsTopologyMeta(final String name, - final Version version, - final String description, - final Class type, - final ClassLoader classLoader, - final Conf conf) { - this.name = name; - this.version = version; + private StreamsTopologyMeta(final String name, + final Version version, + final String description, + final Class type, + final ClassLoader classLoader, + final Conf conf) { + this.name = Objects.requireNonNull(name, "name should not be null"); + this.version = Objects.requireNonNull(version, "version should not be null"); + this.type = Objects.requireNonNull(type, "version should not be null"); + this.classLoader = Objects.requireNonNull(classLoader, "classLoader should not be null"); + this.conf = Objects.requireNonNull(conf, "conf should not be null"); this.description = description; - this.type = type; - this.classLoader = classLoader; - this.conf = conf; } public String name() { @@ -121,4 +122,76 @@ public String toString() { ", conf=" + conf + '}'; } + + /** + * Helper method to create a new {@link Builder} instance. + * + * @return a new {@link Builder} instance + */ + public static Builder create() { + return new Builder(); + } + + public static class Builder { + + private String name; + private Version version; + private String description; + private Class type; + private ClassLoader classLoader; + private Conf conf; + + public Builder from(final TopologyDescriptor descriptor) { + return this + .setName(descriptor.name()) + .setVersion(descriptor.version()) + .setDescription(descriptor.description()) + .setType(descriptor.type()) + .setClassLoader(descriptor.classLoader()) + .setConf(descriptor.configuration()); + } + + public Builder setName(final String name) { + this.name = Objects.requireNonNull(name, "name should not be null"); + return this; + } + + public Builder setVersion(final Version version) { + this.version = Objects.requireNonNull(version, "version should not be null"); + ; + return this; + } + + public Builder setDescription(final String description) { + this.description = description; + return this; + } + + public Builder setType(final Class type) { + this.type = Objects.requireNonNull(type, "type should not be null"); + ; + return this; + } + + public Builder setClassLoader(final ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + public Builder setConf(final Conf conf) { + this.conf = conf; + return this; + } + + public StreamsTopologyMeta build() { + return new StreamsTopologyMeta( + name, + version, + description, + type, + Optional.ofNullable(classLoader).orElse(type.getClassLoader()), + Optional.ofNullable(conf).orElse(Conf.empty()) + ); + } + } } diff --git a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/config/Conf.java b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/config/Conf.java index 448fc56b..833725a5 100644 --- a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/config/Conf.java +++ b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/config/Conf.java @@ -28,8 +28,9 @@ import java.util.Set; /** - * Class which can be used for configuring components. + * A {@code Conf} represents an immutable object which can be used for configuring components. * + * @see Configurable * @see io.streamthoughts.azkarra.api.AzkarraContext * @see io.streamthoughts.azkarra.api.StreamsExecutionEnvironment * @see io.streamthoughts.azkarra.api.streams.TopologyProvider diff --git a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/config/MapConf.java b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/config/MapConf.java index 94a09585..de7c466c 100644 --- a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/config/MapConf.java +++ b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/config/MapConf.java @@ -42,7 +42,7 @@ */ public class MapConf extends AbstractConf { - protected final Map parameters; + private final Map parameters; private final Conf fallback; @@ -52,7 +52,7 @@ public class MapConf extends AbstractConf { * @return a new {@link MapConf} instance. */ static MapConf empty() { - return new MapConf(Collections.emptyMap(), null, true); + return new MapConf(Collections.emptyMap()); } /** @@ -61,7 +61,7 @@ static MapConf empty() { * @return a new {@link MapConf} instance. */ static MapConf singletonConf(final String key, final Object value) { - return new MapConf(Collections.singletonMap(key, value), null, true); + return new MapConf(Collections.singletonMap(key, value)); } /** @@ -83,7 +83,7 @@ static MapConf singletonConf(final String key, final Object value) { this(parameters, null, explode); } - protected MapConf(final Map parameters, + private MapConf(final Map parameters, final Conf fallback, final boolean explode) { Objects.requireNonNull(parameters, "parameters cannot be null"); diff --git a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/config/RocksDBConfig.java b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/config/RocksDBConfig.java index 921f89ff..bf38e2dd 100644 --- a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/config/RocksDBConfig.java +++ b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/config/RocksDBConfig.java @@ -21,11 +21,12 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.function.Supplier; /** * Configuration class for setting internal RocksDB options. */ -public class RocksDBConfig { +public class RocksDBConfig implements Supplier { public static String ROCKS_DB_MAX_WRITE_BUFFER_NUMBER_CONFIG = "rocksdb.max.write.buffer.number"; public static String ROCKS_DB_WRITE_BUFFER_SIZE_CONFIG = "rocksdb.write.buffer.size"; @@ -35,12 +36,22 @@ public class RocksDBConfig { public static String ROCKS_DB_LOG_LEVEL_CONFIG = "rocksdb.log.level"; public static String ROCKS_DB_LOG_MAX_FILE_SIZE_CONFIG = "rocksdb.log.max.file.size"; - private Map configs = new HashMap<>(); + private final Map configs = new HashMap<>(); + /** + * Helper method to create a new {@link RocksDBConfig} object with statistics enable. + * + * @return a new {@link RocksDBConfig}. + */ public static RocksDBConfig withStatsEnable() { return new RocksDBConfig(true); } + /** + * Helper method to create a new {@link RocksDBConfig} object with statistics disable. + * + * @return a new {@link RocksDBConfig}. + */ public static RocksDBConfig withStatsDisable() { return new RocksDBConfig(false); } @@ -82,12 +93,19 @@ public RocksDBConfig withLogMaxFileSize(final int logMaxFileSize) { return this; } - public Conf conf() { - return new MapConf(configs); - } - + /** + * {@inheritDoc} + */ @Override public String toString() { return configs.toString(); } + + /** + * {@inheritDoc} + */ + @Override + public Conf get() { + return Conf.of(configs); + } } diff --git a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/streams/TopologyProvider.java b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/streams/TopologyProvider.java index 775b8926..216c0003 100644 --- a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/streams/TopologyProvider.java +++ b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/streams/TopologyProvider.java @@ -19,8 +19,11 @@ package io.streamthoughts.azkarra.api.streams; import io.streamthoughts.azkarra.api.providers.Provider; +import io.streamthoughts.azkarra.api.util.Version; import org.apache.kafka.streams.Topology; +import java.util.Objects; + /** * The default interface to supply a Kafka Streams {@link Topology} instance. */ @@ -40,4 +43,27 @@ public interface TopologyProvider extends Provider { * @return the {@link Topology} instance. */ Topology topology(); + + /** + * An helper method to create a static {@link TopologyProvider} returning the provided {@link Topology}. + * + * @param topology the {@link Topology} instance. + * @param version the topology's version. + * @return a new {@link TopologyProvider} instance. + */ + static TopologyProvider of(final Topology topology, final Version version) { + Objects.requireNonNull(topology, "topology should not be null"); + Objects.requireNonNull(version, "version should not be null"); + return new TopologyProvider() { + @Override + public String version() { + return version.toString(); + } + + @Override + public Topology topology() { + return topology; + } + }; + } } diff --git a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/context/DefaultAzkarraContext.java b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/context/DefaultAzkarraContext.java index 619b06f7..1a731b34 100644 --- a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/context/DefaultAzkarraContext.java +++ b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/context/DefaultAzkarraContext.java @@ -150,7 +150,7 @@ public static DefaultAzkarraContext create(final Conf configuration) { /** * Creates a new {@link DefaultAzkarraContext} instance. * - * @param configuration the context {@link Conf} instance. + * @param configuration the context {@link Conf} instance. */ private DefaultAzkarraContext(final Conf configuration, final ComponentFactory componentFactory) { Objects.requireNonNull(configuration, "configuration cannot be null"); @@ -167,50 +167,50 @@ private DefaultAzkarraContext(final Conf configuration, final ComponentFactory c private void initialize() { // Register all built-in StreamsLifeCycleInterceptor implementations. registerComponent( - AutoCreateTopicsInterceptor.class, - withConditions(onPropertyTrue(AUTO_CREATE_TOPICS_ENABLE_CONFIG)) + AutoCreateTopicsInterceptor.class, + withConditions(onPropertyTrue(AUTO_CREATE_TOPICS_ENABLE_CONFIG)) ); registerComponent( - MonitoringStreamsInterceptor.class, - withConditions(onPropertyTrue(MONITORING_STREAMS_INTERCEPTOR_ENABLE_CONFIG)) + MonitoringStreamsInterceptor.class, + withConditions(onPropertyTrue(MONITORING_STREAMS_INTERCEPTOR_ENABLE_CONFIG)) ); registerComponent( - KafkaBrokerReadyInterceptor.class, - withConditions(onPropertyTrue(KAFKA_READY_INTERCEPTOR_ENABLE_CONFIG)), - withOrder(Ordered.HIGHEST_ORDER) + KafkaBrokerReadyInterceptor.class, + withConditions(onPropertyTrue(KAFKA_READY_INTERCEPTOR_ENABLE_CONFIG)), + withOrder(Ordered.HIGHEST_ORDER) ); registerComponent( - WaitForSourceTopicsInterceptor.class, - withConditions(any( - onPropertyTrue(WAIT_FOR_TOPICS_ENABLE_CONFIG), - onPropertyTrue(ENABLE_WAIT_FOR_TOPICS__CONFIG)) - ), - withOrder(Ordered.LOWEST_ORDER) + WaitForSourceTopicsInterceptor.class, + withConditions(any( + onPropertyTrue(WAIT_FOR_TOPICS_ENABLE_CONFIG), + onPropertyTrue(ENABLE_WAIT_FOR_TOPICS__CONFIG)) + ), + withOrder(Ordered.LOWEST_ORDER) ); registerComponent( - StreamThreadExceptionHandler.class, - new DefaultStreamThreadExceptionHandlerFactory(), - withConditions(onMissingComponent(List.of(StreamThreadExceptionHandler.class))) + StreamThreadExceptionHandler.class, + new DefaultStreamThreadExceptionHandlerFactory(), + withConditions(onMissingComponent(List.of(StreamThreadExceptionHandler.class))) ); registerSingleton( - AzkarraStreamsService.class, - LocalAzkarraStreamsService::new, - withConditions(onMissingComponent(List.of(AzkarraStreamsService.class))) + AzkarraStreamsService.class, + LocalAzkarraStreamsService::new, + withConditions(onMissingComponent(List.of(AzkarraStreamsService.class))) ); registerSingleton( - StreamsExecutionEnvironmentFactory.class, - LocalStreamsExecutionEnvironmentFactory::new, - withConditions(onMissingComponent(List.of(StreamsExecutionEnvironmentFactory.class))) + StreamsExecutionEnvironmentFactory.class, + LocalStreamsExecutionEnvironmentFactory::new, + withConditions(onMissingComponent(List.of(StreamsExecutionEnvironmentFactory.class))) ); registerComponent( - StreamsExecutionEnvironmentAbstractFactory.class, - () -> new StreamsExecutionEnvironmentAbstractFactory(getAllComponents(StreamsExecutionEnvironmentFactory.class)) + StreamsExecutionEnvironmentAbstractFactory.class, + () -> new StreamsExecutionEnvironmentAbstractFactory(getAllComponents(StreamsExecutionEnvironmentFactory.class)) ); registerSingleton( - DefaultInteractiveQueryService.class, - new InteractiveQueryServiceModule(), - withConditions(onMissingComponent(List.of(InteractiveQueryService.class))) + DefaultInteractiveQueryService.class, + new InteractiveQueryServiceModule(), + withConditions(onMissingComponent(List.of(InteractiveQueryService.class))) ); } @@ -281,15 +281,15 @@ public AzkarraContext addExecutionEnvironment(final StreamsExecutionEnvironment< environments.put(env.name(), env); Map confAsMap = new TreeMap<>(env.getConfiguration().getConfAsMap()); final String configLogs = confAsMap.entrySet() - .stream() - .map(e -> e.getKey() + " = " + e.getValue()) - .collect(Collectors.joining("\n\t")); + .stream() + .map(e -> e.getKey() + " = " + e.getValue()) + .collect(Collectors.joining("\n\t")); LOG.info("Registered new streams environment for name '{}' and default config :\n\t{}", - env.name(), - configLogs + env.name(), + configLogs ); if (env instanceof AzkarraContextAware) - ((AzkarraContextAware)env).setAzkarraContext(this); + ((AzkarraContextAware) env).setAzkarraContext(this); if (state == State.STARTED) { env.start(); @@ -321,8 +321,8 @@ public Optional addTopology(final Class addTopology(final Class type, - final String environment, - final Executed executed) { + final String environment, + final Executed executed) { return addTopology(type.getName(), environment, executed); } @@ -344,15 +344,14 @@ public Optional addTopology(final String type, final String version, final String environment, final Executed executed) { - final TopologyRegistration registration = new TopologyRegistration(type, version, environment, executed); - - ApplicationId res = null; if (state == State.STARTED) - res = mayAddTopologyToEnvironment(registration); - else + return addTopologyToEnvironment(registration); + else { topologyRegistrations.add(registration); - return Optional.ofNullable(res); + return Optional.empty(); + } + } public void setState(final State state) { @@ -362,25 +361,24 @@ public void setState(final State state) { /** * Build and add a {@link TopologyProvider} to a target {@link StreamsExecutionEnvironment}. * - * @param registration the {@link TopologyRegistration} - * @return the {@link ApplicationId} instance if the environment is already started, - * otherwise {@code null}. + * @param registration the {@link TopologyRegistration} * + * @return the {@link ApplicationId} instance if the environment is already started, otherwise {@code null}. * @throws AzkarraContextException if no component is registered for the topology to register. * if target environment doesn't exists. */ - private ApplicationId mayAddTopologyToEnvironment(final TopologyRegistration registration) { + private Optional addTopologyToEnvironment(final TopologyRegistration registration) { Objects.requireNonNull(registration, "registration cannot be null"); - final StreamsExecutionEnvironment environment; + final StreamsExecutionEnvironment environment; if (registration.environment != null) { checkIfEnvironmentExists( - registration.environment, - String.format( - "Error while adding topology '%s', environment '%s' not found", - registration.type, - registration.environment - )); + registration.environment, + String.format( + "Error while adding topology '%s', environment '%s' not found", + registration.type, + registration.environment + )); environment = environments.get(registration.environment); } else { environment = getDefaultEnvironment(); @@ -397,33 +395,26 @@ private ApplicationId mayAddTopologyToEnvironment(final TopologyRegistration reg final var streamConfig = registration.executed.config(); final var conditionalContext = new ConfigConditionalContext<>( - streamConfig - .withFallback(environment.getConfiguration()) - .withFallback(getConfiguration()) + streamConfig + .withFallback(environment.getConfiguration()) + .withFallback(getConfiguration()) ); final Optional> opt = registration.version == null ? - componentFactory.findDescriptorByAlias(registration.type, conditionalContext, Qualifiers.byLatestVersion()) : - componentFactory.findDescriptorByAlias(registration.type, conditionalContext, Qualifiers.byVersion(registration.version)); + componentFactory.findDescriptorByAlias(registration.type, conditionalContext, Qualifiers.byLatestVersion()) : + componentFactory.findDescriptorByAlias(registration.type, conditionalContext, Qualifiers.byVersion(registration.version)); if (opt.isPresent()) { final TopologyDescriptor descriptor = new TopologyDescriptor<>(opt.get()); - final StreamsTopologyMeta meta = new StreamsTopologyMeta( - descriptor.name(), - descriptor.version(), - descriptor.description(), - descriptor.type(), - descriptor.classLoader(), - descriptor.configuration() - ); + final StreamsTopologyMeta meta = StreamsTopologyMeta.create().from(descriptor).build(); return environment.newTopologyExecution(meta, registration.executed).start(); } else { final String loggedVersion = registration.version != null ? registration.version : "latest"; throw new AzkarraContextException( - "Failed to register topology to environment '" + environment.name() + "'." - + " Cannot find any topology provider for type='" + registration.type - + "', version='" + loggedVersion +" '." + "Failed to register topology to environment '" + environment.name() + "'." + + " Cannot find any topology provider for type='" + registration.type + + "', version='" + loggedVersion + " '." ); } } @@ -434,7 +425,7 @@ private ApplicationId mayAddTopologyToEnvironment(final TopologyRegistration reg @Override public Set getTopologyDescriptors() { Collection> descriptors = - componentFactory.findAllDescriptorsByClass(TopologyProvider.class); + componentFactory.findAllDescriptorsByClass(TopologyProvider.class); return descriptors.stream().map(TopologyDescriptor::new).collect(Collectors.toSet()); } @@ -455,13 +446,13 @@ public Set getTopologyDescriptors(final StreamsExecutionEnvi .withFallback(getConfiguration()); var conditionalContext = new ConfigConditionalContext<>(componentConfig); Collection> descriptors = - componentFactory.findAllDescriptorsByClass( - TopologyProvider.class, - conditionalContext, Qualifiers.byAnyRestrictions( - Restriction.application(), - Restriction.env(env.name()) - ) - ); + componentFactory.findAllDescriptorsByClass( + TopologyProvider.class, + conditionalContext, Qualifiers.byAnyRestrictions( + Restriction.application(), + Restriction.env(env.name()) + ) + ); return descriptors.stream().map(TopologyDescriptor::new).collect(Collectors.toSet()); } @@ -489,10 +480,10 @@ public StreamsExecutionEnvironment getEnvironmentForName(final String name) { @Override public StreamsExecutionEnvironment getDefaultEnvironment() { return getAllEnvironments() - .stream() - .filter(env -> env.isDefault() || env.name().equals(DEFAULT_ENV_NAME)) - .findFirst() - .orElse(null); + .stream() + .filter(env -> env.isDefault() || env.name().equals(DEFAULT_ENV_NAME)) + .findFirst() + .orElse(null); } @@ -503,7 +494,7 @@ public StreamsExecutionEnvironment getDefaultEnvironment() { public void start() { if (state != State.CREATED) { throw new IllegalStateException( - "The context is either already started or already stopped, cannot re-start"); + "The context is either already started or already stopped, cannot re-start"); } LOG.info("Starting AzkarraContext"); componentFactory.init(getConfiguration()); @@ -542,9 +533,9 @@ public void stop(boolean cleanUp) { listener.onContextStop(this); } catch (Exception e) { LOG.error( - "Unexpected error happens while invoking listener '{}#onContextStop' : ", - listener.getClass().getName(), - e + "Unexpected error happens while invoking listener '{}#onContextStop' : ", + listener.getClass().getName(), + e ); } }); @@ -564,7 +555,7 @@ void preStart() { initDefaultStreamsExecutionEnvironment(); if (!topologyRegistrations.isEmpty()) { LOG.info("Adding all registered topologies to declared streams environments"); - topologyRegistrations.forEach(this::mayAddTopologyToEnvironment); + topologyRegistrations.forEach(this::addTopologyToEnvironment); } } @@ -576,25 +567,30 @@ private void checkIfEnvironmentExists(final String name, final String errorMessa private void initDefaultStreamsExecutionEnvironment() { final List> defaults = environments - .values() - .stream() - .filter(env -> env.isDefault() || env.name().equals(DEFAULT_ENV_NAME)) - .collect(Collectors.toList()); + .values() + .stream() + .filter(env -> env.isDefault() || env.name().equals(DEFAULT_ENV_NAME)) + .collect(Collectors.toList()); if (defaults.size() > 1) { final String strDefault = defaults - .stream() - .map(StreamsExecutionEnvironment::name) - .collect(Collectors.joining(",", "[", "]")); + .stream() + .map(StreamsExecutionEnvironment::name) + .collect(Collectors.joining(",", "[", "]")); throw new AzkarraContextException("Too many default environments " + strDefault); } + if (defaults.size() == 1) { + // Ensure the environment named 'default' is flagged as default. + defaults.get(0).isDefault(true); + } + if (defaults.isEmpty()) { LOG.warn("No default environment can be found, initializing a new one with name {}", DEFAULT_ENV_NAME); addExecutionEnvironment( - getComponent(StreamsExecutionEnvironmentFactory.class) - .create(DEFAULT_ENV_NAME) - .isDefault(true) + getComponent(StreamsExecutionEnvironmentFactory.class) + .create(DEFAULT_ENV_NAME) + .isDefault(true) ); } } @@ -685,9 +681,9 @@ public boolean equals(Object o) { if (!(o instanceof TopologyRegistration)) return false; TopologyRegistration that = (TopologyRegistration) o; return Objects.equals(type, that.type) && - Objects.equals(version, that.version) && - Objects.equals(environment, that.environment) && - Objects.equals(executed, that.executed); + Objects.equals(version, that.version) && + Objects.equals(environment, that.environment) && + Objects.equals(executed, that.executed); } /** diff --git a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecution.java b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecution.java index 0aae1164..b30e28ae 100644 --- a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecution.java +++ b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecution.java @@ -53,10 +53,10 @@ public class LocalStreamsExecution extends AbstractTopologyStreamsExecution start() { // Gets user-defined streams name or fallback on descriptor cannot be null). final var streamName = executed.nameOrElseGet(meta.name()); @@ -79,10 +79,10 @@ public ApplicationId start() { // Gets user-defined configuration and fallback on inherited configurations. final var streamsConfig = Conf.of( - executed.config(), // (1) Executed - environment.getConfiguration(), // (2) Environment - context.getConfiguration(), // (3) Context - meta.configuration() // (4) Streams (i.e: default) + executed.config(), // (1) Executed + environment.getConfiguration(), // (2) Environment + context.getConfiguration(), // (3) Context + meta.configuration() // (4) Streams (i.e: default) ); final var interceptors = executed.interceptors(); @@ -91,34 +91,34 @@ public ApplicationId start() { // Get and register all StreamsLifeCycleInterceptors component for any scopes: Application, Env, Streams interceptors.addAll(findLifecycleInterceptors( - streamsConfig, - Restriction.application(), - Restriction.env(environment.name()), - Restriction.streams(streamName)) + streamsConfig, + Restriction.application(), + Restriction.env(environment.name()), + Restriction.streams(streamName)) ); // Get and register KafkaStreamsFactory for one the scopes: Application, Env, Streams - final var factory = executed.factory() - .or(() -> findKafkaStreamsFactory(streamsConfig, Restriction.streams(streamName))) - .or(() -> findKafkaStreamsFactory(streamsConfig, Restriction.env(environment.name()))) - .or(() -> findKafkaStreamsFactory(streamsConfig, Restriction.application())) - .orElse(() -> KafkaStreamsFactory.DEFAULT); + final var factory = executed.factory() + .or(() -> findKafkaStreamsFactory(streamsConfig, Restriction.streams(streamName))) + .or(() -> findKafkaStreamsFactory(streamsConfig, Restriction.env(environment.name()))) + .or(() -> findKafkaStreamsFactory(streamsConfig, Restriction.application())) + .orElse(() -> KafkaStreamsFactory.DEFAULT); LOG.info( - "Registered new topology to environment '{}' for type='{}', version='{}', name='{}'.", - environment.name(), - meta.type().getName(), - meta.version(), - streamName + "Registered new topology to environment '{}' for type='{}', version='{}', name='{}'.", + environment.name(), + meta.type().getName(), + meta.version(), + streamName ); final var completedExecuted = Executed.as(streamName) - .withConfig(streamsConfig) - .withDescription(Optional.ofNullable(description).orElse("")) - .withInterceptors(interceptors) - .withKafkaStreamsFactory( - () -> new ClassLoaderAwareKafkaStreamsFactory(factory.get(), meta.classLoader()) - ); + .withConfig(streamsConfig) + .withDescription(Optional.ofNullable(description).orElse("")) + .withInterceptors(interceptors) + .withKafkaStreamsFactory( + () -> new ClassLoaderAwareKafkaStreamsFactory(factory.get(), meta.classLoader()) + ); return environment.addTopology(new ContextAwareTopologySupplier(context, meta), completedExecuted); } @@ -127,17 +127,17 @@ private List> findLifecycleInterceptors(fi final Restriction... restrictions) { final Qualifier qualifier = Qualifiers.byAnyRestrictions(restrictions); return context.getComponentFactory() - .getAllComponentProviders(StreamsLifecycleInterceptor.class, qualifier) - .stream() - .filter(ConfigConditionalContext.of(componentConfig)) - .map(provider -> new ContextAwareLifecycleInterceptorSupplier(context, provider)) - .collect(Collectors.toList()); + .getAllComponentProviders(StreamsLifecycleInterceptor.class, qualifier) + .stream() + .filter(ConfigConditionalContext.of(componentConfig)) + .map(provider -> new ContextAwareLifecycleInterceptorSupplier(context, provider)) + .collect(Collectors.toList()); } private Optional> findKafkaStreamsFactory(final Conf componentConfig, final Restriction restriction) { return new RestrictedComponentFactory(context.getComponentFactory()) - .findComponentByRestriction(KafkaStreamsFactory.class, componentConfig, restriction) - .map(gettable -> new ContextAwareKafkaStreamsFactorySupplier(context, gettable)); + .findComponentByRestriction(KafkaStreamsFactory.class, componentConfig, restriction) + .map(gettable -> new ContextAwareKafkaStreamsFactorySupplier(context, gettable)); } } diff --git a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironment.java b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironment.java index 04921f2c..47fd6ebf 100644 --- a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironment.java +++ b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironment.java @@ -30,7 +30,6 @@ import io.streamthoughts.azkarra.api.StreamsTopologyMeta; import io.streamthoughts.azkarra.api.annotations.VisibleForTesting; import io.streamthoughts.azkarra.api.config.Conf; -import io.streamthoughts.azkarra.api.config.RocksDBConfig; import io.streamthoughts.azkarra.api.errors.AlreadyExistsException; import io.streamthoughts.azkarra.api.errors.AzkarraException; import io.streamthoughts.azkarra.api.events.EventStream; @@ -43,6 +42,7 @@ import io.streamthoughts.azkarra.api.streams.errors.StreamThreadExceptionHandler; import io.streamthoughts.azkarra.api.streams.topology.TopologyDefinition; import io.streamthoughts.azkarra.api.streams.topology.TopologyMetadata; +import io.streamthoughts.azkarra.api.util.Version; import io.streamthoughts.azkarra.runtime.env.internal.BasicContainerId; import io.streamthoughts.azkarra.runtime.env.internal.EnvironmentAwareComponentSupplier; import io.streamthoughts.azkarra.runtime.streams.DefaultApplicationIdBuilder; @@ -63,6 +63,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -84,6 +85,8 @@ public class LocalStreamsExecutionEnvironment implements private static final Logger LOG = LoggerFactory.getLogger(LocalStreamsExecutionEnvironment.class); + public static final String STREAMS_CONFIG_PREFIX = "streams"; + /** * Static helper that can be used to creates a new {@link StreamsExecutionEnvironment} instance * using the empty configuration and a generated unique name. @@ -98,20 +101,18 @@ public static LocalStreamsExecutionEnvironment create() { * Static helper that can be used to creates a new {@link StreamsExecutionEnvironment} instance from * the specified env name and using the configuration. * - * @param envName the name to be used for identifying this environment. - * + * @param name the name to be used for identifying this environment. * @return a new {@link LocalStreamsExecutionEnvironment} instance. */ - public static LocalStreamsExecutionEnvironment create(final String envName) { - return create(Conf.empty(), envName); + public static LocalStreamsExecutionEnvironment create(final String name) { + return create(Conf.empty(), name); } /** * Static helper that can be used to creates a new {@link StreamsExecutionEnvironment} instance from * the specified {@link Conf} and using a generated env name. * - * @param settings the {@link Conf} instance. - * + * @param settings the {@link Conf} instance. * @return a new {@link LocalStreamsExecutionEnvironment} instance. */ public static LocalStreamsExecutionEnvironment create(final Conf settings) { @@ -123,12 +124,11 @@ public static LocalStreamsExecutionEnvironment create(final Conf settings) { * the specified {@link Conf} and env name. * * @param settings the {@link Conf} instance. - * @param envName the name to be used for identifying this environment. - * + * @param name the name to be used for identifying this environment. * @return a new {@link LocalStreamsExecutionEnvironment} instance. */ - public static LocalStreamsExecutionEnvironment create(final Conf settings, final String envName) { - return new LocalStreamsExecutionEnvironment(settings, envName); + public static LocalStreamsExecutionEnvironment create(final Conf settings, final String name) { + return new LocalStreamsExecutionEnvironment(settings, name); } private static final ThreadPerStreamsExecutor STREAMS_EXECUTOR = new ThreadPerStreamsExecutor(); @@ -143,6 +143,11 @@ public static LocalStreamsExecutionEnvironment create(final Conf settings, final */ private State state; + private final List> confSuppliers = new LinkedList<>(); + + /** + * The environment's configuration. + */ private Conf configuration; private final List stateListeners = new LinkedList<>(); @@ -174,28 +179,17 @@ public static LocalStreamsExecutionEnvironment create(final Conf settings, final /** * Creates a new {@link LocalStreamsExecutionEnvironment} instance. * - * @param configuration the default {@link Conf} instance. + * @param config the configuration of the environment. + * @param name the name of the environment. */ - private LocalStreamsExecutionEnvironment(final Conf configuration) { - this(configuration, EnvironmentNameGenerator.generate()); - } - - /** - * Creates a new {@link LocalStreamsExecutionEnvironment} instance. - * - * @param envName the environment name to be used. - */ - private LocalStreamsExecutionEnvironment(final Conf config, - final String envName) { - Objects.requireNonNull(config, "config cannot be null"); - Objects.requireNonNull(envName, "envName cannot be null"); - this.configuration = config; + private LocalStreamsExecutionEnvironment(final Conf config, final String name) { + this.name = Objects.requireNonNull(name, "name cannot be null"); + this.configuration = Objects.requireNonNull(config, "config cannot be null"); this.activeStreams = new HashMap<>(); this.interceptors = new LinkedList<>(); this.kafkaStreamsFactory = () -> KafkaStreamsFactory.DEFAULT; this.applicationIdBuilderSupplier = DefaultApplicationIdBuilder::new; this.topologies = new LinkedList<>(); - this.name = envName; setState(State.CREATED); } @@ -231,6 +225,10 @@ public boolean isDefault() { return isDefault; } + /** + * {@inheritDoc} + */ + @Override public LocalStreamsExecutionEnvironment isDefault(final boolean isDefault) { this.isDefault = isDefault; return this; @@ -248,13 +246,10 @@ public StreamsTopologyExecution newTopologyExecution(final StreamsTopologyMeta m * Adds a {@link KafkaStreams.StateListener} instance that will set to all {@link KafkaStreams} instance created * in this {@link StreamsExecutionEnvironment}. * - * @see KafkaStreams#setStateListener(KafkaStreams.StateListener). - * - * @param listener the {@link KafkaStreams.StateListener} instance. - * - * @throws IllegalStateException if this {@link StreamsExecutionEnvironment} instance is started. - * + * @param listener the {@link KafkaStreams.StateListener} instance. * @return this {@link StreamsExecutionEnvironment} instance. + * @throws IllegalStateException if this {@link StreamsExecutionEnvironment} instance is started. + * @see KafkaStreams#setStateListener(KafkaStreams.StateListener). */ public LocalStreamsExecutionEnvironment addStateListener(final KafkaStreams.StateListener listener) { Objects.requireNonNull(listener, "Cannot add empty listener"); @@ -266,13 +261,10 @@ public LocalStreamsExecutionEnvironment addStateListener(final KafkaStreams.Stat * Adds a {@link StateRestoreListener} instance that will set to all {@link KafkaStreams} instance created * in this {@link StreamsExecutionEnvironment}. * - * @see KafkaStreams#setGlobalStateRestoreListener(StateRestoreListener) . - * - * @param listener the {@link StateRestoreListener} instance. - * - * @throws IllegalStateException if this {@link StreamsExecutionEnvironment} instance is started. - * + * @param listener the {@link StateRestoreListener} instance. * @return this {@link StreamsExecutionEnvironment} instance. + * @throws IllegalStateException if this {@link StreamsExecutionEnvironment} instance is started. + * @see KafkaStreams#setGlobalStateRestoreListener(StateRestoreListener) . */ public LocalStreamsExecutionEnvironment addGlobalStateListener(final StateRestoreListener listener) { Objects.requireNonNull(listener, "Cannot add empty listener"); @@ -285,7 +277,7 @@ public LocalStreamsExecutionEnvironment addGlobalStateListener(final StateRestor * in this {@link StreamsExecutionEnvironment}. * The interceptors will be executed in the order in which they were added. * - * @param interceptor the {@link {@link StreamsLifecycleInterceptor}}. + * @param interceptor the {@link {@link StreamsLifecycleInterceptor}}. * @return this {@link StreamsExecutionEnvironment} instance. */ public LocalStreamsExecutionEnvironment addStreamsLifecycleInterceptor( @@ -298,9 +290,8 @@ public LocalStreamsExecutionEnvironment addStreamsLifecycleInterceptor( * Sets the {@link StreamThreadExceptionHandler} invoked when a StreamThread abruptly terminates * due to an uncaught exception. * - * @param handler the {@link StreamThreadExceptionHandler}. - * @return this {@link StreamsExecutionEnvironment} instance. - * + * @param handler the {@link StreamThreadExceptionHandler}. + * @return this {@link StreamsExecutionEnvironment} instance. * @see KafkaStreams#setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler) */ public LocalStreamsExecutionEnvironment setStreamThreadExceptionHandler( @@ -312,7 +303,7 @@ public LocalStreamsExecutionEnvironment setStreamThreadExceptionHandler( /** * Gets the {@link StreamThreadExceptionHandler}. * - * @return the {@link Supplier}, otherwise {@code null} if no handler is set. + * @return the {@link Supplier}, otherwise {@code null} if no handler is set. */ public Supplier getStreamThreadExceptionHandler() { return streamThreadExceptionHandler; @@ -340,10 +331,10 @@ public Set getContainerIds() { @Override public Set getApplicationIds() { return activeStreams.values() - .stream() - .map(KafkaStreamsContainer::applicationId) - .map(ApplicationId::new) - .collect(Collectors.toSet()); + .stream() + .map(KafkaStreamsContainer::applicationId) + .map(ApplicationId::new) + .collect(Collectors.toSet()); } /** @@ -353,11 +344,11 @@ public Set getApplicationIds() { public Optional getApplicationById(final ApplicationId id) { final List containers = getActiveContainersForApplication(id); if (containers.isEmpty()) return Optional.empty(); - var container = (LocalKafkaStreamsContainer)containers.get(0); + var container = (LocalKafkaStreamsContainer) containers.get(0); return Optional.of(new KafkaStreamsApplication( - name, - container.applicationId(), - container.allInstances() + name, + container.applicationId(), + container.allInstances() )); } @@ -365,11 +356,12 @@ public Optional getApplicationById(final ApplicationId * {@inheritDoc} */ @Override - public LocalStreamsExecutionEnvironment setConfiguration(final Conf configuration) { - this.configuration = configuration; + public LocalStreamsExecutionEnvironment addConfiguration(final Supplier configuration) { + confSuppliers.add(Objects.requireNonNull(configuration, "configuration should not be null")); return this; } + /** * {@inheritDoc} */ @@ -378,14 +370,15 @@ public Conf getConfiguration() { return configuration; } + /** - * {@inheritDoc} + * Helper method to add a configuration prefixed with 'streams.'. + * + * @param configuration the {@link Conf} to supply. + * @return {@code this}. */ - @Override - public LocalStreamsExecutionEnvironment setRocksDBConfig(final RocksDBConfig rocksDBConfig) { - Objects.requireNonNull(rocksDBConfig, "rocksDBConfig cannot be null"); - configuration = configuration.withFallback(Conf.of("streams", rocksDBConfig.conf())); - return this; + public LocalStreamsExecutionEnvironment addStreamsConfiguration(final Supplier configuration) { + return addConfiguration(() -> Conf.of(STREAMS_CONFIG_PREFIX, configuration.get())); } /** @@ -407,30 +400,101 @@ public Supplier getApplicationIdBuilder() { } /** - * Add a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment} to be started. + * Registers a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment}. + * A new {@link KafkaStreams} instance will be created and started for this topology + * when the environment will start. + * + * If the {@link LocalStreamsExecutionEnvironment} is already started, then a new {@link KafkaStreams} instance + * is immediately created. * - * @param provider the {@link TopologyProvider} supplier. + * @see #addTopology(Supplier) * - * @return this {@link ApplicationId} instance if the environment is already started, - * otherwise {@code null}. + * @param supplier the {@link TopologyProvider} supplier. + * @return {@code this}. */ - public ApplicationId addTopology(final Supplier provider) { - return addTopology(provider, new InternalExecuted()); + public LocalStreamsExecutionEnvironment registerTopology(final Supplier supplier) { + return registerTopology(supplier, new InternalExecuted()); + } /** - * Add a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment} to be started. + * Registers a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment}. + * A new {@link KafkaStreams} instance will be created and started for this topology + * when the environment will start. + * + * If the {@link LocalStreamsExecutionEnvironment} is already started, then a new {@link KafkaStreams} instance + * is immediately created. * - * @param provider the {@link TopologyProvider} supplier. - * @param executed the {@link Executed} instance. + * @see #addTopology(Supplier, Executed) * - * @return this {@link ApplicationId} instance if the environment is already started, - * otherwise {@code null}. + * @param topology the {@link Topology}. + * @param version the topology's {@link Version}. + * @param executed the topology's execution options. + * + * @return {@code this}. */ - public ApplicationId addTopology(final Supplier provider, final Executed executed) { - final TopologyDefinitionHolder internalProvider = new TopologyDefinitionHolder(provider, executed); + public LocalStreamsExecutionEnvironment registerTopology(final Topology topology, + final Version version, + final Executed executed) { + addTopology(() -> TopologyProvider.of(topology, version), executed); + return this; + } + + /** + * Registers a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment}. + * A new {@link KafkaStreams} instance will be created and started for this topology + * when the environment will start. + * + * If the {@link LocalStreamsExecutionEnvironment} is already started, then a new {@link KafkaStreams} instance + * is immediately created. + * + * @see #addTopology(Supplier, Executed) + * + * @param supplier the {@link TopologyProvider} supplier. + * @param executed the {@link Executed} instance. + * + * @return {@code this}. + */ + public LocalStreamsExecutionEnvironment registerTopology(final Supplier supplier, + final Executed executed) { + addTopology(supplier, executed); + return this; + } + + /** + * Adds a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment}. + * A new {@link KafkaStreams} instance will be created and started for this topology + * when the environment will start. + * + * If the {@link LocalStreamsExecutionEnvironment} is already started, then a new {@link KafkaStreams} instance + * is immediately created. + * + * @param supplier the {@link TopologyProvider} supplier. + * @return the {@link ApplicationId} instance if the environment is already started, + * otherwise {@link Optional#empty()}. + */ + public Optional addTopology(final Supplier supplier) { + return addTopology(supplier, new InternalExecuted()); + } + + /** + * Adds a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment}. + * A new {@link KafkaStreams} instance will be created and started for this topology + * when the environment will start. + * + * If the {@link LocalStreamsExecutionEnvironment} is already started, then a new {@link KafkaStreams} instance + * is immediately created. + * + * @param supplier the {@link TopologyProvider} supplier. + * @param executed the {@link Executed} instance. + * @return the {@link ApplicationId} instance if the environment is already started, + * otherwise {@link Optional#empty()}. + */ + public Optional addTopology(final Supplier supplier, + final Executed executed) { + final TopologyDefinitionHolder internalProvider = new TopologyDefinitionHolder(supplier, executed); topologies.add(internalProvider); - return state == State.STARTED ? start(internalProvider) : null; + return state == State.STARTED ? Optional.of(start(internalProvider)) : Optional.empty(); } /** @@ -440,16 +504,37 @@ public ApplicationId addTopology(final Supplier provider, fina public void start() throws IllegalStateException, AzkarraException { if (state != State.CREATED) { throw new IllegalStateException( - "The environment is either already started or already stopped, cannot re-start"); + "The environment is either already started or already stopped, cannot re-start"); } + initConfiguration(); topologies.forEach(this::start); setState(State.STARTED); } + @VisibleForTesting + void initConfiguration() { + if (!confSuppliers.isEmpty()) { + Conf newConfig = null; + final ListIterator> it = confSuppliers.listIterator(confSuppliers.size()); + while (it.hasPrevious()) { + Conf conf = supply(it.previous(), configuration); + if (newConfig == null) + newConfig = conf; + else + newConfig = newConfig.withFallback(conf); + } + configuration = newConfig; + } + } + private ApplicationId start(final TopologyDefinitionHolder topologyHolder) { final TopologyDefinition definition = topologyHolder.createTopologyDefinition(); - LOG.info("Building new Topology for name='{}', version='{}'", definition.getName(), definition.getVersion()); + LOG.info( + "Creating new container for topology with: name='{}', version='{}'", + definition.getName(), + definition.getVersion() + ); var topologyConfig = topologyHolder.getTopologyConfig(); @@ -460,7 +545,10 @@ private ApplicationId start(final TopologyDefinitionHolder topologyHolder) { topologyHolder.setContainerId(containerId); - var streamsConfig = topologyConfig.hasPath("streams") ? topologyConfig.getSubConf("streams") : Conf.empty(); + var streamsConfig = topologyConfig.hasPath(STREAMS_CONFIG_PREFIX) ? + topologyConfig.getSubConf(STREAMS_CONFIG_PREFIX) : + Conf.empty(); + var applicationIdConfig = Conf.of(StreamsConfig.APPLICATION_ID_CONFIG, applicationId.toString()); if (streamThreadExceptionHandler == null) @@ -551,11 +639,10 @@ public void terminate(final ApplicationId id, final Duration timeout) { * Close the {@link KafkaStreams} instance for the given identifier and wait up to the {@code timeout} * for the instance to be closed. * - * @param id the streams application identifier. - * @param cleanUp flag to indicate if local states must be cleanup. - * @param timeout the duration to wait for the streams to shutdown. - * @param remove if the instance should be removed from active streams. - * + * @param id the streams application identifier. + * @param cleanUp flag to indicate if local states must be cleanup. + * @param timeout the duration to wait for the streams to shutdown. + * @param remove if the instance should be removed from active streams. * @throws IllegalArgumentException if no streams instance exist for the given {@code id}. */ private void closeStreamsContainer(final ContainerId id, @@ -581,16 +668,16 @@ private void setState(final State started) { private void checkStreamsIsAlreadyRunningFor(final ApplicationId id) { if (!getActiveContainersForApplication(id).isEmpty()) { throw new AlreadyExistsException( - "A local KafkaStream instance is already registered with an application.id '" + id + "'"); + "A local KafkaStream instance is already registered with an application.id '" + id + "'"); } } private List getActiveContainersForApplication(final ApplicationId id) { return activeStreams.entrySet() - .stream() - .filter(it -> it.getKey().startWith(id)) - .map(Map.Entry::getValue) - .collect(Collectors.toList()); + .stream() + .filter(it -> it.getKey().startWith(id)) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); } private void checkIsStarted() { @@ -612,7 +699,7 @@ public LocalStreamsExecutionEnvironment addFallbackConfiguration(final Conf fall * Sets the {@link KafkaStreamsFactory} that will be used to provide * the {@link KafkaStreams} to configure and start. * - * @param factory the {@link KafkaStreamsFactory} instance. + * @param factory the {@link KafkaStreamsFactory} instance. * @return this {@link StreamsExecutionEnvironment} instance. */ public LocalStreamsExecutionEnvironment setKafkaStreamsFactory(final Supplier factory) { @@ -624,8 +711,8 @@ private ApplicationId generateApplicationId(final TopologyDefinition definition, final Conf TopologyConfig) { var applicationIdBuilder = supply(applicationIdBuilderSupplier, TopologyConfig); return applicationIdBuilder.buildApplicationId( - new TopologyMetadata(definition.getName(), definition.getVersion(), definition.getDescription()), - TopologyConfig + new TopologyMetadata(definition.getName(), definition.getVersion(), definition.getDescription()), + TopologyConfig ); } @@ -680,8 +767,8 @@ class TopologyDefinitionHolder { /** * Creates a new {@link TopologyDefinitionHolder} instance. * - * @param supplier the supplier to supplier. - * @param executed the {@link Executed} instance. + * @param supplier the supplier to supplier. + * @param executed the {@link Executed} instance. */ TopologyDefinitionHolder(final Supplier supplier, final Executed executed) { @@ -695,9 +782,9 @@ public void setContainerId(final ContainerId containerId) { List getAllInterceptors() { return Stream - .concat(interceptors.stream(), executed.interceptors().stream()) - .map(i -> supply(i, getTopologyConfig())) - .collect(Collectors.toList()); + .concat(interceptors.stream(), executed.interceptors().stream()) + .map(i -> supply(i, getTopologyConfig())) + .collect(Collectors.toList()); } KafkaStreamsFactory getKafkaStreamsFactory() { @@ -709,14 +796,14 @@ Conf getTopologyConfig() { var ctxConfig = context != null ? context.getConfiguration() : Conf.empty(); var envConfig = LocalStreamsExecutionEnvironment.this.getConfiguration(); // Merged all configurations - return Conf.of(executed.config(), envConfig, envConfig, ctxConfig); + return Conf.of(executed.config(), envConfig, ctxConfig); } TopologyDefinition createTopologyDefinition() { return new InternalTopologyDefinition( - executed.name(), - executed.description(), - supply(supplier, getTopologyConfig()) + executed.name(), + executed.description(), + supply(supplier, getTopologyConfig()) ); } @@ -784,8 +871,8 @@ public Topology getTopology() { @Override public List getEventStreams() { return (provider instanceof EventStreamProvider) ? - ((EventStreamProvider)provider).eventStreams() : - Collections.emptyList(); + ((EventStreamProvider) provider).eventStreams() : + Collections.emptyList(); } } } \ No newline at end of file diff --git a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/interceptors/CompositeStreamsInterceptor.java b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/interceptors/CompositeStreamsInterceptor.java index 3ff383ca..c6f6d4ac 100644 --- a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/interceptors/CompositeStreamsInterceptor.java +++ b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/interceptors/CompositeStreamsInterceptor.java @@ -82,7 +82,7 @@ public void onStop(final StreamsLifecycleContext context, final StreamsLifecycle * {@inheritDoc} */ @Override - public void setExecutionEnvironment(final StreamsExecutionEnvironment environment) { + public void setExecutionEnvironment(final StreamsExecutionEnvironment environment) { for (StreamsLifecycleInterceptor interceptor : interceptors) { if (interceptor instanceof StreamsExecutionEnvironmentAware) { ((StreamsExecutionEnvironmentAware)interceptor).setExecutionEnvironment(environment); diff --git a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/streams/DefaultApplicationIdBuilder.java b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/streams/DefaultApplicationIdBuilder.java index 8e5bd3c3..ba1018bd 100644 --- a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/streams/DefaultApplicationIdBuilder.java +++ b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/streams/DefaultApplicationIdBuilder.java @@ -39,13 +39,13 @@ public class DefaultApplicationIdBuilder implements ApplicationIdBuilder, Stream private static final char[] AUTHORIZED_CHAR_SEPARATOR = {' ', '-', '_', '.'}; private static final String INTERNAL_ENV_NAME_PREFIX = "__"; - private StreamsExecutionEnvironment environment; + private StreamsExecutionEnvironment environment; /** * {@inheritDoc} */ @Override - public void setExecutionEnvironment(final StreamsExecutionEnvironment environment) { + public void setExecutionEnvironment(final StreamsExecutionEnvironment environment) { this.environment = environment; } diff --git a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/streams/errors/CloseKafkaStreamsOnThreadException.java b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/streams/errors/CloseKafkaStreamsOnThreadException.java index 51af2f85..29fd5929 100644 --- a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/streams/errors/CloseKafkaStreamsOnThreadException.java +++ b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/streams/errors/CloseKafkaStreamsOnThreadException.java @@ -38,7 +38,7 @@ public class CloseKafkaStreamsOnThreadException implements private static final Logger LOG = LoggerFactory.getLogger(CloseKafkaStreamsOnThreadException.class); - private StreamsExecutionEnvironment environment; + private StreamsExecutionEnvironment environment; /** * {@inheritDoc} @@ -62,7 +62,7 @@ public void handle(final KafkaStreamsContainer container, * {@inheritDoc} */ @Override - public void setExecutionEnvironment(final StreamsExecutionEnvironment environment) { + public void setExecutionEnvironment(final StreamsExecutionEnvironment environment) { this.environment = environment; } } diff --git a/azkarra-runtime/src/test/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironmentTest.java b/azkarra-runtime/src/test/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironmentTest.java index 2915d82f..31aa62d6 100644 --- a/azkarra-runtime/src/test/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironmentTest.java +++ b/azkarra-runtime/src/test/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironmentTest.java @@ -49,7 +49,8 @@ public void shouldFallbackToEnvCongWhenInitializingTopologies() { "default.prop", "value" ); - environment.setConfiguration(envConf); + environment.addConfiguration(envConf); + environment.initConfiguration(); Executed executed = Executed.as("dummy-topology", "a test topology") .withConfig(Conf.of( diff --git a/azkarra-runtime/src/test/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionTest.java b/azkarra-runtime/src/test/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionTest.java index 5d655edb..bbc86263 100644 --- a/azkarra-runtime/src/test/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionTest.java +++ b/azkarra-runtime/src/test/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionTest.java @@ -24,6 +24,7 @@ import io.streamthoughts.azkarra.api.config.Conf; import io.streamthoughts.azkarra.api.config.Configurable; import io.streamthoughts.azkarra.api.providers.TopologyDescriptor; +import io.streamthoughts.azkarra.api.util.Version; import io.streamthoughts.azkarra.runtime.context.DefaultAzkarraContext; import io.streamthoughts.azkarra.runtime.context.internal.ContextAwareTopologySupplier; import io.streamthoughts.azkarra.runtime.streams.topology.InternalExecuted; @@ -56,20 +57,14 @@ public void should_properly_merged_all_configs_when_adding_topology() { var mkDescriptor = mock(TopologyDescriptor.class); when(mkDescriptor.name()).thenReturn("test"); + when(mkDescriptor.version()).thenReturn(Version.parse("1.0")); when(mkDescriptor.configuration()).thenReturn(Conf.of("prop.descriptor", "desc.value")); when(mkDescriptor.type()).thenReturn(TopologyDescriptor.class); Executed executed = Executed.as("test-app").withConfig(Conf.of("prop.executed", "exec.value")); context.setConfiguration(Conf.of("prop.context", "value")); final LocalStreamsExecution execution = new LocalStreamsExecution( - new StreamsTopologyMeta( - mkDescriptor.name(), - mkDescriptor.version(), - mkDescriptor.description(), - mkDescriptor.type(), - mkDescriptor.classLoader(), - mkDescriptor.configuration() - ), + StreamsTopologyMeta.create().from(mkDescriptor).build(), executed, context, mkEnv @@ -98,6 +93,7 @@ public void should_properly_configure_interceptors_when_adding_topology() { var mkDescriptor = mock(TopologyDescriptor.class); when(mkDescriptor.name()).thenReturn("test"); + when(mkDescriptor.version()).thenReturn(Version.parse("1.0")); when(mkDescriptor.configuration()).thenReturn(Conf.empty()); when(mkDescriptor.type()).thenReturn(TopologyDescriptor.class); when(mkDescriptor.classLoader()).thenReturn(this.getClass().getClassLoader()); @@ -110,14 +106,7 @@ public void should_properly_configure_interceptors_when_adding_topology() { //Execute final LocalStreamsExecution execution = new LocalStreamsExecution( - new StreamsTopologyMeta( - mkDescriptor.name(), - mkDescriptor.version(), - mkDescriptor.description(), - mkDescriptor.type(), - mkDescriptor.classLoader(), - mkDescriptor.configuration() - ), + StreamsTopologyMeta.create().from(mkDescriptor).build(), Executed.as("test"), context, mkEnv