Skip to content

Commit

Permalink
fix(runtime): fix breaking change due to random container id
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Mar 2, 2021
1 parent 689a9e3 commit b390c62
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 58 deletions.
Expand Up @@ -31,9 +31,10 @@ public interface ApplicationIdBuilder {
* Builds the {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG} for
* the specified topology and configuration.
*
* @param metadata the {@link TopologyMetadata} instance.
* @param metadata the topology's metadata.
* @param config the topology's configuration.
*
* @return a new {@link ApplicationId} instance.
*/
ApplicationId buildApplicationId(final TopologyMetadata metadata, final Conf streamsConfig);
ApplicationId buildApplicationId(final TopologyMetadata metadata, final Conf config);
}
@@ -0,0 +1,43 @@
/*
* Copyright 2021 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.azkarra.runtime.env;

import io.streamthoughts.azkarra.api.ApplicationId;
import io.streamthoughts.azkarra.api.ContainerId;
import io.streamthoughts.azkarra.api.config.Conf;
import io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer;
import io.streamthoughts.azkarra.api.streams.topology.TopologyMetadata;

/**
* A {@code ContainerIdBuilder} is used to build {@link ContainerId}.
*/
public interface ContainerIdBuilder {

/**
* Builds the identifier that will be used for identifying the {@link KafkaStreamsContainer} running the topology.
*
* @param metadata the topology's metadata.
* @param config the topology's configuration.
*
* @return a new {@link ContainerId} instance.
*/
ContainerId buildContainerId(final ApplicationId applicationId,
final TopologyMetadata metadata,
final Conf config);
}
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 StreamThoughts.
* Copyright 2019-2021 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -44,6 +44,7 @@
import io.streamthoughts.azkarra.api.streams.topology.TopologyMetadata;
import io.streamthoughts.azkarra.api.util.Version;
import io.streamthoughts.azkarra.runtime.env.internal.BasicContainerId;
import io.streamthoughts.azkarra.runtime.env.internal.DefaultContainerIdBuilder;
import io.streamthoughts.azkarra.runtime.env.internal.EnvironmentAwareComponentSupplier;
import io.streamthoughts.azkarra.runtime.streams.DefaultApplicationIdBuilder;
import io.streamthoughts.azkarra.runtime.streams.LocalKafkaStreamsContainer;
Expand Down Expand Up @@ -101,7 +102,7 @@ public static LocalStreamsExecutionEnvironment create() {
* Static helper that can be used to creates a new {@link StreamsExecutionEnvironment} instance from
* the specified env name and using the configuration.
*
* @param name the name to be used for identifying this environment.
* @param name the name to be used for identifying this environment.
* @return a new {@link LocalStreamsExecutionEnvironment} instance.
*/
public static LocalStreamsExecutionEnvironment create(final String name) {
Expand All @@ -123,8 +124,8 @@ public static LocalStreamsExecutionEnvironment create(final Conf settings) {
* Static helper that can be used to creates a new {@link StreamsExecutionEnvironment} instance from
* the specified {@link Conf} and env name.
*
* @param settings the {@link Conf} instance.
* @param name the name to be used for identifying this environment.
* @param settings the {@link Conf} instance.
* @param name the name to be used for identifying this environment.
* @return a new {@link LocalStreamsExecutionEnvironment} instance.
*/
public static LocalStreamsExecutionEnvironment create(final Conf settings, final String name) {
Expand Down Expand Up @@ -176,6 +177,8 @@ public static LocalStreamsExecutionEnvironment create(final Conf settings, final

private boolean isDefault;

private final ContainerIdBuilder containerIdBuilder;

/**
* Creates a new {@link LocalStreamsExecutionEnvironment} instance.
*
Expand All @@ -190,6 +193,7 @@ private LocalStreamsExecutionEnvironment(final Conf config, final String name) {
this.kafkaStreamsFactory = () -> KafkaStreamsFactory.DEFAULT;
this.applicationIdBuilderSupplier = DefaultApplicationIdBuilder::new;
this.topologies = new LinkedList<>();
this.containerIdBuilder = new DefaultContainerIdBuilder();
setState(State.CREATED);
}

Expand Down Expand Up @@ -375,7 +379,7 @@ public Conf getConfiguration() {
* Helper method to add a configuration prefixed with 'streams.'.
*
* @param configuration the {@link Conf} to supply.
* @return {@code this}.
* @return {@code this}.
*/
public LocalStreamsExecutionEnvironment addStreamsConfiguration(final Supplier<Conf> configuration) {
return addConfiguration(() -> Conf.of(STREAMS_CONFIG_PREFIX, configuration.get()));
Expand Down Expand Up @@ -403,14 +407,13 @@ public Supplier<ApplicationIdBuilder> getApplicationIdBuilder() {
* Registers a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment}.
* A new {@link KafkaStreams} instance will be created and started for this topology
* when the environment will start.
*
* <p>
* If the {@link LocalStreamsExecutionEnvironment} is already started, then a new {@link KafkaStreams} instance
* is immediately created.
*
* @see #addTopology(Supplier)
*
* @param supplier the {@link TopologyProvider} supplier.
* @return {@code this}.
* @return {@code this}.
* @see #addTopology(Supplier)
*/
public LocalStreamsExecutionEnvironment registerTopology(final Supplier<TopologyProvider> supplier) {
return registerTopology(supplier, new InternalExecuted());
Expand All @@ -421,17 +424,15 @@ public LocalStreamsExecutionEnvironment registerTopology(final Supplier<Topology
* Registers a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment}.
* A new {@link KafkaStreams} instance will be created and started for this topology
* when the environment will start.
*
* <p>
* If the {@link LocalStreamsExecutionEnvironment} is already started, then a new {@link KafkaStreams} instance
* is immediately created.
*
* @see #addTopology(Supplier, Executed)
*
* @param topology the {@link Topology}.
* @param version the topology's {@link Version}.
* @param executed the topology's execution options.
*
* @return {@code this}.
* @return {@code this}.
* @see #addTopology(Supplier, Executed)
*/
public LocalStreamsExecutionEnvironment registerTopology(final Topology topology,
final Version version,
Expand All @@ -444,16 +445,14 @@ public LocalStreamsExecutionEnvironment registerTopology(final Topology topology
* Registers a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment}.
* A new {@link KafkaStreams} instance will be created and started for this topology
* when the environment will start.
*
* <p>
* If the {@link LocalStreamsExecutionEnvironment} is already started, then a new {@link KafkaStreams} instance
* is immediately created.
*
* @see #addTopology(Supplier, Executed)
*
* @param supplier the {@link TopologyProvider} supplier.
* @param executed the {@link Executed} instance.
*
* @return {@code this}.
* @return {@code this}.
* @see #addTopology(Supplier, Executed)
*/
public LocalStreamsExecutionEnvironment registerTopology(final Supplier<TopologyProvider> supplier,
final Executed executed) {
Expand All @@ -465,12 +464,12 @@ public LocalStreamsExecutionEnvironment registerTopology(final Supplier<Topology
* Adds a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment}.
* A new {@link KafkaStreams} instance will be created and started for this topology
* when the environment will start.
*
* <p>
* If the {@link LocalStreamsExecutionEnvironment} is already started, then a new {@link KafkaStreams} instance
* is immediately created.
*
* @param supplier the {@link TopologyProvider} supplier.
* @return the {@link ApplicationId} instance if the environment is already started,
* @return the {@link ApplicationId} instance if the environment is already started,
* otherwise {@link Optional#empty()}.
*/
public Optional<ApplicationId> addTopology(final Supplier<TopologyProvider> supplier) {
Expand All @@ -481,13 +480,13 @@ public Optional<ApplicationId> addTopology(final Supplier<TopologyProvider> supp
* Adds a new {@link TopologyProvider} instance to this {@link StreamsExecutionEnvironment}.
* A new {@link KafkaStreams} instance will be created and started for this topology
* when the environment will start.
*
* <p>
* If the {@link LocalStreamsExecutionEnvironment} is already started, then a new {@link KafkaStreams} instance
* is immediately created.
*
* @param supplier the {@link TopologyProvider} supplier.
* @param executed the {@link Executed} instance.
* @return the {@link ApplicationId} instance if the environment is already started,
* @return the {@link ApplicationId} instance if the environment is already started,
* otherwise {@link Optional#empty()}.
*/
public Optional<ApplicationId> addTopology(final Supplier<TopologyProvider> supplier,
Expand Down Expand Up @@ -531,43 +530,50 @@ private ApplicationId start(final TopologyDefinitionHolder topologyHolder) {
final TopologyDefinition definition = topologyHolder.createTopologyDefinition();

LOG.info(
"Creating new container for topology with: name='{}', version='{}'",
definition.getName(),
definition.getVersion()
"Creating new container for topology with: name='{}', version='{}'",
definition.getName(),
definition.getVersion()
);

final TopologyMetadata metadata = new TopologyMetadata(
definition.getName(),
definition.getVersion(),
definition.getDescription()
);

var topologyConfig = topologyHolder.getTopologyConfig();

var applicationId = generateApplicationId(definition, topologyHolder.getTopologyConfig());
var applicationId = generateApplicationId(metadata, topologyConfig);
checkStreamsIsAlreadyRunningFor(applicationId);

var containerId = BasicContainerId.create(applicationId).randomize(4);
topologyHolder.setContainerId(containerIdBuilder.buildContainerId(
applicationId,
metadata,
topologyConfig
));

topologyHolder.setContainerId(containerId);
if (streamThreadExceptionHandler == null)
streamThreadExceptionHandler = CloseKafkaStreamsOnThreadException::new;

var threadExceptionHandler = supply(streamThreadExceptionHandler, topologyConfig);

var streamsConfig = topologyConfig.hasPath(STREAMS_CONFIG_PREFIX) ?
topologyConfig.getSubConf(STREAMS_CONFIG_PREFIX) :
Conf.empty();

var applicationIdConfig = Conf.of(StreamsConfig.APPLICATION_ID_CONFIG, applicationId.toString());

if (streamThreadExceptionHandler == null)
streamThreadExceptionHandler = CloseKafkaStreamsOnThreadException::new;

var threadExceptionHandler = supply(streamThreadExceptionHandler, topologyConfig);

var kafkaStreamsContainer = LocalKafkaStreamsContainer.newBuilder()
.withContainerId(containerId.id())
.withStateListeners(stateListeners)
.withRestoreListeners(restoreListeners)
.withStreamThreadExceptionHandlers(List.of(threadExceptionHandler))
.withStreamsConfig(Conf.of(applicationIdConfig, streamsConfig))
.withTopologyDefinition(definition)
.withKafkaStreamsFactory(topologyHolder.getKafkaStreamsFactory())
.withInterceptors(topologyHolder.getAllInterceptors())
.build();

activeStreams.put(containerId, kafkaStreamsContainer);
.withContainerId(topologyHolder.getContainerId().id())
.withStateListeners(stateListeners)
.withRestoreListeners(restoreListeners)
.withStreamThreadExceptionHandlers(List.of(threadExceptionHandler))
.withStreamsConfig(Conf.of(applicationIdConfig, streamsConfig))
.withTopologyDefinition(definition)
.withKafkaStreamsFactory(topologyHolder.getKafkaStreamsFactory())
.withInterceptors(topologyHolder.getAllInterceptors())
.build();

activeStreams.put(topologyHolder.getContainerId(), kafkaStreamsContainer);
kafkaStreamsContainer.start(STREAMS_EXECUTOR);
return applicationId;
}
Expand Down Expand Up @@ -673,10 +679,10 @@ private void checkStreamsIsAlreadyRunningFor(final ApplicationId id) {
}

private List<KafkaStreamsContainer> getActiveContainersForApplication(final ApplicationId id) {
return activeStreams.entrySet()
final String unwrapped = id.id();
return activeStreams.values()
.stream()
.filter(it -> it.getKey().startWith(id))
.map(Map.Entry::getValue)
.filter(it -> it.applicationId().equals(unwrapped))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -707,13 +713,10 @@ public LocalStreamsExecutionEnvironment setKafkaStreamsFactory(final Supplier<Ka
return this;
}

private ApplicationId generateApplicationId(final TopologyDefinition definition,
private ApplicationId generateApplicationId(final TopologyMetadata metadata,
final Conf TopologyConfig) {
var applicationIdBuilder = supply(applicationIdBuilderSupplier, TopologyConfig);
return applicationIdBuilder.buildApplicationId(
new TopologyMetadata(definition.getName(), definition.getVersion(), definition.getDescription()),
TopologyConfig
);
return applicationIdBuilder.buildApplicationId(metadata, TopologyConfig);
}

/**
Expand Down Expand Up @@ -776,7 +779,11 @@ class TopologyDefinitionHolder {
this.executed = new InternalExecuted(executed);
}

public void setContainerId(final ContainerId containerId) {
ContainerId getContainerId() {
return containerId;
}

void setContainerId(final ContainerId containerId) {
this.containerId = containerId;
}

Expand Down
@@ -0,0 +1,43 @@
/*
* Copyright 2021 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.azkarra.runtime.env.internal;

import io.streamthoughts.azkarra.api.ApplicationId;
import io.streamthoughts.azkarra.api.ContainerId;
import io.streamthoughts.azkarra.api.config.Conf;
import io.streamthoughts.azkarra.api.streams.topology.TopologyMetadata;
import io.streamthoughts.azkarra.runtime.env.ContainerIdBuilder;

public class DefaultContainerIdBuilder implements ContainerIdBuilder {

public static final String CONTAINER_ID_CONFIG = "container.id";

/**
* {@inheritDoc}
*/
@Override
public ContainerId buildContainerId(final ApplicationId applicationId,
final TopologyMetadata metadata,
final Conf config) {
if (config.hasPath(CONTAINER_ID_CONFIG))
return new BasicContainerId(config.getString(CONTAINER_ID_CONFIG));

return BasicContainerId.create(applicationId);
}
}
Expand Up @@ -138,7 +138,7 @@ public KafkaStreamsApplication getStreamsApplicationById(final String id) {
.stream()
.flatMap(env -> env.getApplicationById(new ApplicationId(id)).stream())
.findFirst()
.orElseThrow(() -> new NotFoundException("Failed to found KafkaStreams application for id " + id));
.orElseThrow(() -> new NotFoundException("Failed to find KafkaStreams application for id " + id));
}

/**
Expand Down

0 comments on commit b390c62

Please sign in to comment.