Skip to content

Commit

Permalink
sub-taks(all): refactor StreamsExecutionEnvironment
Browse files Browse the repository at this point in the history
Changes:
- add new methods registerTopology, addConfiguration and addStreamsConfiguration to LocalStreamsExecutionEnvironment class
- update StreamsTopologyExecution to return an optional
- add builder class for StreamsTopologyMeta
  • Loading branch information
fhussonnois committed Feb 23, 2021
1 parent 554c4ae commit 21ffbc0
Show file tree
Hide file tree
Showing 16 changed files with 507 additions and 305 deletions.
Expand Up @@ -20,7 +20,6 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import io.streamthoughts.azkarra.api.config.Conf;
import io.streamthoughts.azkarra.api.config.RocksDBConfig;
import io.streamthoughts.azkarra.api.errors.NotFoundException;
import io.streamthoughts.azkarra.api.model.HasId;
import io.streamthoughts.azkarra.api.model.HasName;
Expand All @@ -29,6 +28,7 @@
import io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer;
import org.apache.kafka.streams.KafkaStreams;

import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
Expand All @@ -37,7 +37,10 @@
import java.util.function.Supplier;

/**
* A StreamsExecutionEnvironment manages the lifecycle of {@link org.apache.kafka.streams.Topology} instances.
* A {@code StreamsExecutionEnvironment} manages the execution and the lifecycle of one or many {@link KafkaStreams}
* instances that run either locally or remotely.
*
* @see KafkaStreamsContainer
*/
public interface StreamsExecutionEnvironment<T extends StreamsExecutionEnvironment<T>> extends HasName {

Expand Down Expand Up @@ -107,7 +110,7 @@ default KafkaStreamsContainer getContainerById(final ContainerId id) {
.findFirst()
.orElseThrow(() ->
new NotFoundException(
"Fail to find running KafkaStreams instance for container id '"
"Failed to find running KafkaStreams instance for container id '"
+ id
+ "' in environment '" + name() + "'")
);
Expand Down Expand Up @@ -136,26 +139,32 @@ default KafkaStreamsContainer getContainerById(final ContainerId id) {
Optional<KafkaStreamsApplication> getApplicationById(final ApplicationId id);

/**
* Sets this environment configuration.
* Adds a new configuration to this environment.
* This method can be invoked multiple time. The supplied configuration will override all prior configurations.
*
* @see #addConfiguration(Supplier)
*
* @return this {@link StreamsExecutionEnvironment} instance.
*/
T setConfiguration(final Conf configuration);
default T addConfiguration(final Conf configuration) {
return addConfiguration(() -> configuration);
}

/**
* Gets this environment configuration.
* Adds a new configuration to this environment.
* This method can be invoked multiple time. The supplied configuration will override all prior configurations.
*
* @return the {@link Conf} instance.
*
* @return this {@link StreamsExecutionEnvironment} instance.
*/
Conf getConfiguration();
T addConfiguration(final Supplier<Conf> configuration);

/**
* Sets the {@link RocksDBConfig} streamsConfig used by topology persistent stores.
* Gets this environment configuration.
*
* @param settings the {@link RocksDBConfig} instance.
* @return this {@link StreamsExecutionEnvironment} instance.
* @return the {@link Conf} instance.
*/
T setRocksDBConfig(final RocksDBConfig settings);
Conf getConfiguration();

/**
* Sets the {@link ApplicationIdBuilder} that should be used for building streams {@code application.id}.
Expand All @@ -173,7 +182,8 @@ default KafkaStreamsContainer getContainerById(final ContainerId id) {
Supplier<ApplicationIdBuilder> getApplicationIdBuilder();

/**
* Adds streamsConfig that will be used in fallback if not present in defined environment streamsConfig.
* Adds settings to this environment that will be used in fallback if not present
* in the defined environment configuration.
*
* @param settings the {@link Conf} instance.
* @return this {@link StreamsExecutionEnvironment} instance.
Expand Down Expand Up @@ -321,7 +331,7 @@ default View describe() {
* A {@code Environment} is used to describe the current state of a
* {@link io.streamthoughts.azkarra.api.StreamsExecutionEnvironment} instance.
*/
class View implements HasName {
class View implements HasName, Serializable {

private final StreamsExecutionEnvironment<?> environment;

Expand Down
Expand Up @@ -25,5 +25,5 @@ public interface StreamsExecutionEnvironmentAware {
*
* @param environment the {@link StreamsExecutionEnvironment} instance
*/
void setExecutionEnvironment(final StreamsExecutionEnvironment environment);
void setExecutionEnvironment(final StreamsExecutionEnvironment<?> environment);
}
Expand Up @@ -19,27 +19,28 @@

package io.streamthoughts.azkarra.api;

import java.util.Optional;
import java.util.concurrent.Callable;

/**
* TopologyExecution provide a way for topology to be executed in an specific environment.
*
* @see StreamsExecutionEnvironment
*/
public interface StreamsTopologyExecution extends Callable<ApplicationId> {
public interface StreamsTopologyExecution extends Callable<Optional<ApplicationId>> {

/**
* Starts the streams-topology encapsulated by this object.
*
* @return the {@link ApplicationId}.
*/
ApplicationId start();
Optional<ApplicationId> start();

/**
* {@inheritDoc}
*/
@Override
default ApplicationId call() {
default Optional<ApplicationId> call() {
return start();
}
}
Expand Up @@ -20,10 +20,12 @@
package io.streamthoughts.azkarra.api;

import io.streamthoughts.azkarra.api.config.Conf;
import io.streamthoughts.azkarra.api.providers.TopologyDescriptor;
import io.streamthoughts.azkarra.api.streams.TopologyProvider;
import io.streamthoughts.azkarra.api.util.Version;

import java.util.Objects;
import java.util.Optional;

public class StreamsTopologyMeta {

Expand All @@ -34,29 +36,28 @@ public class StreamsTopologyMeta {
private final ClassLoader classLoader;
private final Conf conf;


/**
* Creates a new {@link StreamsTopologyMeta} instance.
*
* @param name the name of the topology.
* @param version the version of the topology
* @param description the description of the topology.
* @param type the topology {@link Class}.
* @param classLoader the topology {@link ClassLoader}.
* @param conf the default {@link Conf} for the topology.
* @param name the name of the topology.
* @param version the version of the topology
* @param description the description of the topology.
* @param type the topology {@link Class}.
* @param classLoader the topology {@link ClassLoader}.
* @param conf the default {@link Conf} for the topology.
*/
public StreamsTopologyMeta(final String name,
final Version version,
final String description,
final Class<TopologyProvider> type,
final ClassLoader classLoader,
final Conf conf) {
this.name = name;
this.version = version;
private StreamsTopologyMeta(final String name,
final Version version,
final String description,
final Class<TopologyProvider> type,
final ClassLoader classLoader,
final Conf conf) {
this.name = Objects.requireNonNull(name, "name should not be null");
this.version = Objects.requireNonNull(version, "version should not be null");
this.type = Objects.requireNonNull(type, "version should not be null");
this.classLoader = Objects.requireNonNull(classLoader, "classLoader should not be null");
this.conf = Objects.requireNonNull(conf, "conf should not be null");
this.description = description;
this.type = type;
this.classLoader = classLoader;
this.conf = conf;
}

public String name() {
Expand Down Expand Up @@ -121,4 +122,76 @@ public String toString() {
", conf=" + conf +
'}';
}

/**
* Helper method to create a new {@link Builder} instance.
*
* @return a new {@link Builder} instance
*/
public static Builder create() {
return new Builder();
}

public static class Builder {

private String name;
private Version version;
private String description;
private Class<TopologyProvider> type;
private ClassLoader classLoader;
private Conf conf;

public Builder from(final TopologyDescriptor<TopologyProvider> descriptor) {
return this
.setName(descriptor.name())
.setVersion(descriptor.version())
.setDescription(descriptor.description())
.setType(descriptor.type())
.setClassLoader(descriptor.classLoader())
.setConf(descriptor.configuration());
}

public Builder setName(final String name) {
this.name = Objects.requireNonNull(name, "name should not be null");
return this;
}

public Builder setVersion(final Version version) {
this.version = Objects.requireNonNull(version, "version should not be null");
;
return this;
}

public Builder setDescription(final String description) {
this.description = description;
return this;
}

public Builder setType(final Class<TopologyProvider> type) {
this.type = Objects.requireNonNull(type, "type should not be null");
;
return this;
}

public Builder setClassLoader(final ClassLoader classLoader) {
this.classLoader = classLoader;
return this;
}

public Builder setConf(final Conf conf) {
this.conf = conf;
return this;
}

public StreamsTopologyMeta build() {
return new StreamsTopologyMeta(
name,
version,
description,
type,
Optional.ofNullable(classLoader).orElse(type.getClassLoader()),
Optional.ofNullable(conf).orElse(Conf.empty())
);
}
}
}
Expand Up @@ -28,8 +28,9 @@
import java.util.Set;

/**
* Class which can be used for configuring components.
* A {@code Conf} represents an immutable object which can be used for configuring components.
*
* @see Configurable
* @see io.streamthoughts.azkarra.api.AzkarraContext
* @see io.streamthoughts.azkarra.api.StreamsExecutionEnvironment
* @see io.streamthoughts.azkarra.api.streams.TopologyProvider
Expand Down
Expand Up @@ -42,7 +42,7 @@
*/
public class MapConf extends AbstractConf {

protected final Map<String, ?> parameters;
private final Map<String, ?> parameters;

private final Conf fallback;

Expand All @@ -52,7 +52,7 @@ public class MapConf extends AbstractConf {
* @return a new {@link MapConf} instance.
*/
static MapConf empty() {
return new MapConf(Collections.emptyMap(), null, true);
return new MapConf(Collections.emptyMap());
}

/**
Expand All @@ -61,7 +61,7 @@ static MapConf empty() {
* @return a new {@link MapConf} instance.
*/
static MapConf singletonConf(final String key, final Object value) {
return new MapConf(Collections.singletonMap(key, value), null, true);
return new MapConf(Collections.singletonMap(key, value));
}

/**
Expand All @@ -83,7 +83,7 @@ static MapConf singletonConf(final String key, final Object value) {
this(parameters, null, explode);
}

protected MapConf(final Map<String, ?> parameters,
private MapConf(final Map<String, ?> parameters,
final Conf fallback,
final boolean explode) {
Objects.requireNonNull(parameters, "parameters cannot be null");
Expand Down
Expand Up @@ -21,11 +21,12 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

/**
* Configuration class for setting internal RocksDB options.
*/
public class RocksDBConfig {
public class RocksDBConfig implements Supplier<Conf> {

public static String ROCKS_DB_MAX_WRITE_BUFFER_NUMBER_CONFIG = "rocksdb.max.write.buffer.number";
public static String ROCKS_DB_WRITE_BUFFER_SIZE_CONFIG = "rocksdb.write.buffer.size";
Expand All @@ -35,12 +36,22 @@ public class RocksDBConfig {
public static String ROCKS_DB_LOG_LEVEL_CONFIG = "rocksdb.log.level";
public static String ROCKS_DB_LOG_MAX_FILE_SIZE_CONFIG = "rocksdb.log.max.file.size";

private Map<String, String> configs = new HashMap<>();
private final Map<String, Object> configs = new HashMap<>();

/**
* Helper method to create a new {@link RocksDBConfig} object with statistics enable.
*
* @return a new {@link RocksDBConfig}.
*/
public static RocksDBConfig withStatsEnable() {
return new RocksDBConfig(true);
}

/**
* Helper method to create a new {@link RocksDBConfig} object with statistics disable.
*
* @return a new {@link RocksDBConfig}.
*/
public static RocksDBConfig withStatsDisable() {
return new RocksDBConfig(false);
}
Expand Down Expand Up @@ -82,12 +93,19 @@ public RocksDBConfig withLogMaxFileSize(final int logMaxFileSize) {
return this;
}

public Conf conf() {
return new MapConf(configs);
}

/**
* {@inheritDoc}
*/
@Override
public String toString() {
return configs.toString();
}

/**
* {@inheritDoc}
*/
@Override
public Conf get() {
return Conf.of(configs);
}
}

0 comments on commit 21ffbc0

Please sign in to comment.