From ba57b1468661dce8cd31511d15881c95a3685fb4 Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Wed, 3 Mar 2021 13:54:49 +0100 Subject: [PATCH] refactor(api/runtime): set default environment name for LocalStreamsExecutionEnvironment --- .../api/StreamsExecutionEnvironment.java | 2 ++ .../env/LocalStreamsExecutionEnvironment.java | 18 +++++------------- ...ocalStreamsExecutionEnvironmentFactory.java | 2 +- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsExecutionEnvironment.java b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsExecutionEnvironment.java index b4d3fc7a..22572a78 100644 --- a/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsExecutionEnvironment.java +++ b/azkarra-api/src/main/java/io/streamthoughts/azkarra/api/StreamsExecutionEnvironment.java @@ -44,6 +44,8 @@ */ public interface StreamsExecutionEnvironment> extends HasName { + String ENVIRONMENT_DEFAULT_NAME = "default"; + /** * Gets the type of this {@link StreamsExecutionEnvironment}. * diff --git a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironment.java b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironment.java index 531172bc..c0536b4b 100644 --- a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironment.java +++ b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironment.java @@ -76,7 +76,7 @@ import java.util.stream.Stream; /** - * The default {@link StreamsExecutionEnvironment} implementation. + * A {@link StreamsExecutionEnvironment} implementation that runs and manages {@link KafkaStreams} instance locally. */ public class LocalStreamsExecutionEnvironment implements StreamsExecutionEnvironment, @@ -106,7 +106,7 @@ public static LocalStreamsExecutionEnvironment create() { * @return a new {@link LocalStreamsExecutionEnvironment} instance. */ public static LocalStreamsExecutionEnvironment create(final String name) { - return create(Conf.empty(), name); + return create(name, Conf.empty()); } /** @@ -117,7 +117,7 @@ public static LocalStreamsExecutionEnvironment create(final String name) { * @return a new {@link LocalStreamsExecutionEnvironment} instance. */ public static LocalStreamsExecutionEnvironment create(final Conf settings) { - return create(settings, EnvironmentNameGenerator.generate()); + return create(ENVIRONMENT_DEFAULT_NAME, settings); } /** @@ -128,7 +128,7 @@ public static LocalStreamsExecutionEnvironment create(final Conf settings) { * @param name the name to be used for identifying this environment. * @return a new {@link LocalStreamsExecutionEnvironment} instance. */ - public static LocalStreamsExecutionEnvironment create(final Conf settings, final String name) { + public static LocalStreamsExecutionEnvironment create(final String name, final Conf settings) { return new LocalStreamsExecutionEnvironment(settings, name); } @@ -194,6 +194,7 @@ private LocalStreamsExecutionEnvironment(final Conf config, final String name) { this.applicationIdBuilderSupplier = DefaultApplicationIdBuilder::new; this.topologies = new LinkedList<>(); this.containerIdBuilder = new DefaultContainerIdBuilder(); + this.isDefault = name.equalsIgnoreCase(ENVIRONMENT_DEFAULT_NAME); setState(State.CREATED); } @@ -727,15 +728,6 @@ public void setAzkarraContext(final AzkarraContext context) { this.context = context; } - private static final class EnvironmentNameGenerator { - - private static final AtomicInteger NUM = new AtomicInteger(1); - - static String generate() { - return String.format("__streams_env_%02d", NUM.getAndIncrement()); - } - } - /** * Inner {@link Executor} which is used for starting {@link KafkaStreams} instance. * One new {@link Thread} is created per streams instance. diff --git a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironmentFactory.java b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironmentFactory.java index c230ea99..9583ae26 100644 --- a/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironmentFactory.java +++ b/azkarra-runtime/src/main/java/io/streamthoughts/azkarra/runtime/env/LocalStreamsExecutionEnvironmentFactory.java @@ -46,7 +46,7 @@ public class LocalStreamsExecutionEnvironmentFactory */ @Override public LocalStreamsExecutionEnvironment create(final String name, final Conf conf) { - return initialize(LocalStreamsExecutionEnvironment.create(conf, name)); + return initialize(LocalStreamsExecutionEnvironment.create(name, conf)); } /**