Skip to content

Commit

Permalink
refactor(api/runtime): set default environment name for LocalStreamsE…
Browse files Browse the repository at this point in the history
…xecutionEnvironment
  • Loading branch information
fhussonnois committed Mar 3, 2021
1 parent d63ad8e commit ba57b14
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 14 deletions.
Expand Up @@ -44,6 +44,8 @@
*/
public interface StreamsExecutionEnvironment<T extends StreamsExecutionEnvironment<T>> extends HasName {

String ENVIRONMENT_DEFAULT_NAME = "default";

/**
* Gets the type of this {@link StreamsExecutionEnvironment}.
*
Expand Down
Expand Up @@ -76,7 +76,7 @@
import java.util.stream.Stream;

/**
* The default {@link StreamsExecutionEnvironment} implementation.
* A {@link StreamsExecutionEnvironment} implementation that runs and manages {@link KafkaStreams} instance locally.
*/
public class LocalStreamsExecutionEnvironment implements
StreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>,
Expand Down Expand Up @@ -106,7 +106,7 @@ public static LocalStreamsExecutionEnvironment create() {
* @return a new {@link LocalStreamsExecutionEnvironment} instance.
*/
public static LocalStreamsExecutionEnvironment create(final String name) {
return create(Conf.empty(), name);
return create(name, Conf.empty());
}

/**
Expand All @@ -117,7 +117,7 @@ public static LocalStreamsExecutionEnvironment create(final String name) {
* @return a new {@link LocalStreamsExecutionEnvironment} instance.
*/
public static LocalStreamsExecutionEnvironment create(final Conf settings) {
return create(settings, EnvironmentNameGenerator.generate());
return create(ENVIRONMENT_DEFAULT_NAME, settings);
}

/**
Expand All @@ -128,7 +128,7 @@ public static LocalStreamsExecutionEnvironment create(final Conf settings) {
* @param name the name to be used for identifying this environment.
* @return a new {@link LocalStreamsExecutionEnvironment} instance.
*/
public static LocalStreamsExecutionEnvironment create(final Conf settings, final String name) {
public static LocalStreamsExecutionEnvironment create(final String name, final Conf settings) {
return new LocalStreamsExecutionEnvironment(settings, name);
}

Expand Down Expand Up @@ -194,6 +194,7 @@ private LocalStreamsExecutionEnvironment(final Conf config, final String name) {
this.applicationIdBuilderSupplier = DefaultApplicationIdBuilder::new;
this.topologies = new LinkedList<>();
this.containerIdBuilder = new DefaultContainerIdBuilder();
this.isDefault = name.equalsIgnoreCase(ENVIRONMENT_DEFAULT_NAME);
setState(State.CREATED);
}

Expand Down Expand Up @@ -727,15 +728,6 @@ public void setAzkarraContext(final AzkarraContext context) {
this.context = context;
}

private static final class EnvironmentNameGenerator {

private static final AtomicInteger NUM = new AtomicInteger(1);

static String generate() {
return String.format("__streams_env_%02d", NUM.getAndIncrement());
}
}

/**
* Inner {@link Executor} which is used for starting {@link KafkaStreams} instance.
* One new {@link Thread} is created per streams instance.
Expand Down
Expand Up @@ -46,7 +46,7 @@ public class LocalStreamsExecutionEnvironmentFactory
*/
@Override
public LocalStreamsExecutionEnvironment create(final String name, final Conf conf) {
return initialize(LocalStreamsExecutionEnvironment.create(conf, name));
return initialize(LocalStreamsExecutionEnvironment.create(name, conf));
}

/**
Expand Down

0 comments on commit ba57b14

Please sign in to comment.