forked from quarkusio/quarkus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
KafkaStreamsRuntimeConfig.java
86 lines (73 loc) · 2.48 KB
/
KafkaStreamsRuntimeConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package io.quarkus.kafka.streams.runtime;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
@ConfigRoot(name = "kafka-streams", phase = ConfigPhase.RUN_TIME)
public class KafkaStreamsRuntimeConfig {
/**
* A unique identifier for this Kafka Streams application.
* If not set, defaults to quarkus.application.name.
*/
@ConfigItem(defaultValue = "${quarkus.application.name}")
public String applicationId;
/**
* A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s)
*/
@ConfigItem(defaultValue = "localhost:9012")
public List<InetSocketAddress> bootstrapServers;
/**
* A unique identifier of this application instance, typically in the form host:port.
*/
@ConfigItem
public Optional<String> applicationServer;
/**
* A comma-separated list of topic names.
* The pipeline will only be started once all these topics are present in the Kafka cluster.
*/
@ConfigItem
public List<String> topics;
/**
* The schema registry key.
*
* e.g. to diff between different registry impls / instances
* as they have this registry url under different property key.
*
* Red Hat / Apicurio - apicurio.registry.url
* Confluent - schema.registry.url
*/
@ConfigItem(defaultValue = "schema.registry.url")
public String schemaRegistryKey;
/**
* The schema registry url.
*/
@ConfigItem
public Optional<String> schemaRegistryUrl;
/**
* The SASL JAAS config.
*/
public SaslConfig sasl;
/**
* Kafka SSL config
*/
public SslConfig ssl;
@Override
public String toString() {
return "KafkaStreamsRuntimeConfig{" +
"applicationId='" + applicationId + '\'' +
", bootstrapServers=" + bootstrapServers +
", applicationServer=" + applicationServer +
", topics=" + topics +
", schemaRegistryKey='" + schemaRegistryKey + '\'' +
", schemaRegistryUrl=" + schemaRegistryUrl +
", sasl=" + sasl +
", ssl=" + ssl +
'}';
}
public List<String> getTrimmedTopics() {
return topics.stream().map(String::trim).collect(Collectors.toList());
}
}