Skip to content

Commit

Permalink
Merge 89c751c into 275a734
Browse files Browse the repository at this point in the history
  • Loading branch information
Red1L committed Dec 14, 2017
2 parents 275a734 + 89c751c commit 160ebc1
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 68 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ services:
- zookeeper:zk
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "testStreams:1:1,testClients:1:1"
KAFKA_ZOOKEEPER_CONNECT: "zk:2181"
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,28 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package org.seedstack.kafka.internal;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.name.Names;

import java.text.MessageFormat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import javax.inject.Inject;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.seedstack.kafka.KafkaConfig;
import org.seedstack.kafka.spi.MessageConsumer;
import org.seedstack.seed.Logging;
import org.seedstack.seed.SeedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,9 +45,11 @@ public class MessageConsumerPoller<K, V> implements Runnable {
private ReentrantReadWriteLock consumerLock = new ReentrantReadWriteLock();
@Inject
private Injector injector;
@Logging
private Logger logger;

MessageConsumerPoller(KafkaConfig.ConsumerConfig consumerConfig, MessageConsumer<K, V> messageConsumer,
String messageConsumerName) {
String messageConsumerName) {
this.messageConsumer = messageConsumer;
this.consumerConfig = consumerConfig;
this.messageConsumerName = messageConsumerName;
Expand Down Expand Up @@ -72,7 +78,7 @@ public void run() {
messageConsumer.onException(e);
}
}
} catch (Exception e) {
} catch (Throwable e) {
if (consumer != null) {
consumer.close();
}
Expand All @@ -94,20 +100,17 @@ synchronized void start() {
synchronized void stop() {
LOGGER.debug(STOPPING_TO_POLL, messageConsumerName);
if (active.getAndSet(false)) {
Consumer consumer = getConsumer();
if (consumer != null) {
getConsumer().close();
try {
consumerLock.readLock().tryLock(10, TimeUnit.SECONDS);
if (consumer != null) {
consumer.close();
}
consumerLock.readLock().unlock();
} catch (Exception e) {
logger.warn(MessageFormat.format("Failed to close Kafka consumer [{0}], the polling thread has been interrupted nonetheless", messageConsumerName));
} finally {
thread.interrupt();
}
thread.interrupt();
}
}

private Consumer getConsumer() {
consumerLock.readLock().lock();
try {
return consumer;
} finally {
consumerLock.readLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,26 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package org.seedstack.kafka.internal;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.name.Names;
import java.util.regex.Pattern;
import javax.inject.Inject;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.seedstack.kafka.KafkaConfig;
import org.seedstack.kafka.spi.MessageStream;
import org.seedstack.kafka.spi.Stream;
import org.seedstack.seed.SeedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.regex.Pattern;

import static com.google.common.base.Preconditions.checkNotNull;

class MessageStreamHandler<K, V> implements Runnable {
private static final String STOPPING_TO_POLL = "Stopping to poll messages for Kafka kStreams {}";
private static final String STARTING_TO_POLL_MESSAGES_FOR_MESSAGE_CONSUMER_LISTENER = "Starting to poll messages " +
Expand All @@ -47,17 +47,17 @@ class MessageStreamHandler<K, V> implements Runnable {
public void run() {
LOGGER.debug(STARTING_TO_POLL_MESSAGES_FOR_MESSAGE_CONSUMER_LISTENER, getMessageStreamName());
try {
KStreamBuilder builder = new KStreamBuilder();
StreamsBuilder builder = new StreamsBuilder();
KStream<K, V> kStream;
if (streamConfig.getTopicPattern() != null) {
kStream = builder.stream(Pattern.compile(streamConfig.getTopicPattern()));
} else {
kStream = builder.stream(streamConfig.getTopics().toArray(new String[streamConfig.getTopics().size()]));
kStream = builder.stream(streamConfig.getTopics());
}
messageStream.onStream(kStream);
kStreams = new KafkaStreams(builder, streamConfig.getProperties());
kStreams = new KafkaStreams(builder.build(), streamConfig.getProperties());
kStreams.start();
} catch (Exception e) {
} catch (Throwable e) {
if (kStreams != null) {
kStreams.close();
}
Expand Down
80 changes: 40 additions & 40 deletions src/test/java/org/seedstack/kafka/clients/KafkaClientsIT.java
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
/**
* Copyright (c) 2013-2016, The SeedStack authors <http://seedstack.org>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package org.seedstack.kafka.clients;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.fest.assertions.Assertions;
import org.junit.Test;
import org.seedstack.seed.Logging;
import org.seedstack.seed.it.AbstractSeedIT;
import org.slf4j.Logger;

public class KafkaClientsIT extends AbstractSeedIT {
private static final String TOPIC = "testClients";
static CountDownLatch count = new CountDownLatch(2);
@Inject
@Named("kafkaProducerTest")
private Producer<Integer, String> producer;
@Logging
private Logger logger;

@Test
public void consumerTest() throws InterruptedException, IOException {
producer.send(new ProducerRecord<>(TOPIC, 1, "test"));
producer.send(new ProducerRecord<>(TOPIC, 2, "test2"));
producer.close();
Assertions.assertThat(count.await(10, TimeUnit.SECONDS)).isTrue();
}
}
/**
* Copyright (c) 2013-2016, The SeedStack authors <http://seedstack.org>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package org.seedstack.kafka.clients;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.fest.assertions.Assertions;
import org.junit.Test;
import org.seedstack.seed.Logging;
import org.seedstack.seed.it.AbstractSeedIT;
import org.slf4j.Logger;

public class KafkaClientsIT extends AbstractSeedIT {
private static final String TOPIC = "testClients";
static CountDownLatch count = new CountDownLatch(2);
@Inject
@Named("kafkaProducerTest")
private Producer<Integer, String> producer;
@Logging
private Logger logger;

@Test
public void consumerTest() throws InterruptedException, IOException {
producer.send(new ProducerRecord<>(TOPIC, 1, "test"));
producer.send(new ProducerRecord<>(TOPIC, 2, "test2"));
producer.close();
Assertions.assertThat(count.await(20, TimeUnit.SECONDS)).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void streamTest() {
try {
producer.send(new ProducerRecord<>(TOPIC, 3, "test3"));
producer.send(new ProducerRecord<>(TOPIC, 4, "test4"));
Assertions.assertThat(count.await(10, TimeUnit.SECONDS)).isTrue();
Assertions.assertThat(count.await(20, TimeUnit.SECONDS)).isTrue();
producer.close();
} catch (Exception e) {
Fail.fail(e.getMessage(), e);
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ kafka:
bootstrap.servers: localhost:9092
key.deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto.offset.reset: earliest
producers:
kafkaProducerTest:
properties:
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="ERROR">
<root level="WARN">
<appender-ref ref="STDOUT" />
</root>
</configuration>

0 comments on commit 160ebc1

Please sign in to comment.