Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes #4 update producer and consumer config to support all Kafka con… #5

Merged
merged 1 commit into from Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -51,15 +51,9 @@ public void onStartup() {
}

private Consumer<byte[], byte[]> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());

// Create the consumer using props.
final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
final Map<String, Object> properties = config.getProperties();
// Create the consumer using properties.
final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties);
// Subscribe to the topic.
String topic = config.getTopic();
if(topic.contains(",")) {
Expand Down
19 changes: 8 additions & 11 deletions kafka-sidecar/src/main/resources/config/kafka-consumer.yml
@@ -1,20 +1,17 @@
bootstrapServers: localhost:9092
# ensures poll is frequently needed and called
maxPollRecords: 30
isolationLevel: read_committed
enableAutoCommit: false
autoOffsetReset: earliest
autoCommitIntervalMs: 1000
keyDeserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
valueDeserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
# Generic Kafka Consumer Configuration
properties:
bootstrap.servers: localhost:9092
key.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
enable.auto.commit: false
auto.offset.reset: earliest
group.id: group1

# Callback Consumer Specific Configuration
# The topic that is going to be consumed. For callback consumer only in the kafka-sidecar.
# If two or more topics are going to be subscribed, concat them with comma without space.
topic: test1,test2
# topic: test1
# The default consumer group. For callback consumer only in the kafka-sidecar.
groupId: group1
# Waiting period in millisecond to poll another batch
waitPeriod: 10000

Expand Down
35 changes: 19 additions & 16 deletions kafka-sidecar/src/main/resources/config/kafka-producer.yml
@@ -1,17 +1,20 @@
---
acks: all
retries: 3
batchSize: 16384
lingerMs: 1
bufferMemory: 33554432
keySerializer: org.apache.kafka.common.serialization.ByteArraySerializer
valueSerializer: org.apache.kafka.common.serialization.ByteArraySerializer
keyDeSerializer: org.apache.kafka.common.serialization.ByteArraySerializer
valueDeSerializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
sessionTimeout: 30000
autoOffsetreset: earliest
enableAutoCommit: true
bootstrapServers: localhost:9092
enableIdempotence: ${kafka-producer.enableIdempotence:true}
injectOpenTracing: false
injectCallerId: true
# Generic configuration for Kafka producer.
properties:
key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
acks: ${kafka-producer.acks:all}
bootstrap.servers: localhost:9092
buffer.memory: 33554432
retries: ${kafka-producer.retries:3}
batch.size: 16384
linger.ms: 1
max.in.flight.requests.per.connection: ${kafka-producer.max.in.flight.requests.per.connection:5}
enable.idempotence: ${kafka-producer.enable.idempotence:true}

# The default topic for the producer. Only certain producer implementation will use it.
topic: portal-event
# if open tracing is enable. traceability, correlation and metrics should not be in the chain if opentracing is used.
injectOpenTracing: ${client.injectOpenTracing:false}
# inject serviceId as callerId into the http header for metrics to collect the caller. The serviceId is from server.yml
injectCallerId: ${client.injectCallerId:false}
8 changes: 4 additions & 4 deletions kafka-sidecar/src/main/resources/config/service.yml
Expand Up @@ -21,14 +21,14 @@ singletons:
- com.networknt.server.StartupHookProvider:
- com.networknt.mesh.kafka.ProducerStartupHook
- com.networknt.mesh.kafka.ConsumerStartupHook
# - com.networknt.mesh.kafka.CallbackConsumerStartupHook
- com.networknt.mesh.kafka.KsqldbConsumerStartupHook
- com.networknt.mesh.kafka.CallbackConsumerStartupHook
# - com.networknt.mesh.kafka.KsqldbConsumerStartupHook
# ShutdownHookProvider implementations, there are one to many and they are called in the same sequence defined.
- com.networknt.server.ShutdownHookProvider:
- com.networknt.mesh.kafka.ProducerShutdownHook
- com.networknt.mesh.kafka.ConsumerShutdownHook
# - com.networknt.mesh.kafka.CallbackConsumerShutdownHook
- com.networknt.mesh.kafka.KsqldbConsumerShutdownHook
- com.networknt.mesh.kafka.CallbackConsumerShutdownHook
# - com.networknt.mesh.kafka.KsqldbConsumerShutdownHook
- com.networknt.kafka.producer.NativeLightProducer:
- com.networknt.kafka.producer.SidecarProducer

Expand Down