Skip to content

Commit

Permalink
Add global extraConfig parameter to KafkaClients
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Apr 17, 2018
1 parent cc61bf1 commit 7dbd1eb
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 77 deletions.
16 changes: 15 additions & 1 deletion core/src/main/java/com/softwaremill/kmq/KafkaClients.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@

public class KafkaClients {
private final String bootstrapServers;
private final Map<String, Object> extraGlobalConfig;

public KafkaClients(String bootstrapServers) {
this(bootstrapServers, Collections.emptyMap());
}

/**
* @param extraGlobalConfig Extra Kafka parameter configuration, e.g. SSL
*/
public KafkaClients(String bootstrapServers, Map<String, Object> extraGlobalConfig) {
this.bootstrapServers = bootstrapServers;
this.extraGlobalConfig = extraGlobalConfig;
}

public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>> keySerializer,
Expand All @@ -37,6 +46,9 @@ public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>>
for (Map.Entry<String, Object> extraCfgEntry : extraConfig.entrySet()) {
props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue());
}
for (Map.Entry<String, Object> extraCfgEntry : extraGlobalConfig.entrySet()) {
props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue());
}

return new KafkaProducer<>(props);
}
Expand All @@ -60,10 +72,12 @@ public <K, V> KafkaConsumer<K, V> createConsumer(String groupId,
if (groupId != null) {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
// extraConfig : configure the kafka parameters (ex: ssl, ...)
for (Map.Entry<String, Object> extraCfgEntry : extraConfig.entrySet()) {
props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue());
}
for (Map.Entry<String, Object> extraCfgEntry : extraGlobalConfig.entrySet()) {
props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue());
}

return new KafkaConsumer<>(props);
}
Expand Down
13 changes: 2 additions & 11 deletions core/src/main/java/com/softwaremill/kmq/KmqClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,15 @@ public KmqClient(KmqConfig config, KafkaClients clients,
Class<? extends Deserializer<K>> keyDeserializer,
Class<? extends Deserializer<V>> valueDeserializer,
long msgPollTimeout) {
this(config, clients, keyDeserializer, valueDeserializer, msgPollTimeout, Collections.emptyMap());
}

public KmqClient(KmqConfig config, KafkaClients clients,
Class<? extends Deserializer<K>> keyDeserializer,
Class<? extends Deserializer<V>> valueDeserializer,
long msgPollTimeout, Map<String, Object> extraConfig) {

this.config = config;
this.msgPollTimeout = msgPollTimeout;

this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer);
// Using the custom partitioner, each offset-partition will contain markers only from a single queue-partition.
// Adding the PARTITIONER_CLASS_CONFIG in extraConfig map, if extraConfig is not empty
this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer, extraConfig);
extraConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class);
this.markerProducer = clients.createProducer(
MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class,
extraConfig);
Collections.singletonMap(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class));

LOG.info(String.format("Subscribing to topic: %s, using group id: %s", config.getMsgTopic(), config.getMsgConsumerGroupId()));
msgConsumer.subscribe(Collections.singletonList(config.getMsgTopic()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
*/
public class RedeliveryTracker {
public static Closeable start(KafkaClients clients, KmqConfig config) {
return start(clients, config, Collections.emptyMap());
}

public static Closeable start(KafkaClients clients, KmqConfig config, Map<String, Object> extraConfig) {
return RedeliveryActors.start(clients, config, extraConfig);
return RedeliveryActors.start(clients, config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
import scala.collection.JavaConverters._
import scala.concurrent.duration._

class CommitMarkerOffsetsActor(markerTopic: String, clients: KafkaClients, extraConfig: java.util.Map[String, Object]) extends Actor with StrictLogging {
class CommitMarkerOffsetsActor(markerTopic: String, clients: KafkaClients) extends Actor with StrictLogging {

private val consumer = clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer], extraConfig)
private val consumer = clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer])

private var toCommit: Map[Partition, Offset] = Map()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer

import scala.collection.JavaConverters._

class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig, extraConfig: java.util.Map[String, Object]) extends Actor with StrictLogging {
class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig) extends Actor with StrictLogging {

private val OneSecond = 1000L

Expand All @@ -28,8 +28,8 @@ class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig, extraConfig:
override def preStart(): Unit = {
markerConsumer = clients.createConsumer(config.getRedeliveryConsumerGroupId,
classOf[MarkerKey.MarkerKeyDeserializer],
classOf[MarkerValue.MarkerValueDeserializer], extraConfig)
producer = clients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer], extraConfig)
classOf[MarkerValue.MarkerValueDeserializer])
producer = clients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer])

setupMarkerConsumer()
setupOffsetCommitting()
Expand Down Expand Up @@ -61,7 +61,7 @@ class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig, extraConfig:

private def partitionAssigned(p: Partition, endOffset: Offset): Unit = {
val redeliverActorProps = Props(
new RedeliverActor(p, new RetryingRedeliverer(new DefaultRedeliverer(p, producer, config, clients, extraConfig))))
new RedeliverActor(p, new RetryingRedeliverer(new DefaultRedeliverer(p, producer, config, clients))))
.withDispatcher("kmq.redeliver-dispatcher")
val redeliverActor = context.actorOf(
redeliverActorProps,
Expand All @@ -74,7 +74,7 @@ class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig, extraConfig:

private def setupOffsetCommitting(): Unit = {
commitMarkerOffsetsActor = context.actorOf(
Props(new CommitMarkerOffsetsActor(config.getMarkerTopic, clients, extraConfig)),
Props(new CommitMarkerOffsetsActor(config.getMarkerTopic, clients)),
"commit-marker-offsets")

commitMarkerOffsetsActor ! DoCommit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ trait Redeliverer {

class DefaultRedeliverer(
partition: Partition, producer: KafkaProducer[Array[Byte], Array[Byte]],
config: KmqConfig, clients: KafkaClients, extraConfig: java.util.Map[String, Object])
extends Redeliverer with StrictLogging {
config: KmqConfig, clients: KafkaClients) extends Redeliverer with StrictLogging {

private val SendTimeoutSeconds = 60L

private val tp = new TopicPartition(config.getMsgTopic, partition)

private val reader = {
val c = clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer], extraConfig)
val c = clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer])
c.assign(Collections.singleton(tp))
new SingleOffsetReader(tp, c)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,9 @@ import scala.collection.JavaConverters._

object RedeliveryActors extends StrictLogging {
def start(clients: KafkaClients, config: KmqConfig): Closeable = {
start(clients, config)
}

def start(clients: KafkaClients, config: KmqConfig, extraConfig: java.util.Map[String, Object] = Collections.emptyMap()): Closeable = {
val system = ActorSystem("kmq-redelivery")

val consumeMakersActor = system.actorOf(Props(new ConsumeMarkersActor(clients, config, extraConfig)), "consume-markers-actor")
val consumeMakersActor = system.actorOf(Props(new ConsumeMarkersActor(clients, config)), "consume-markers-actor")
consumeMakersActor ! DoConsume

logger.info("Started redelivery actors")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,21 @@ class StandaloneConfig {
static final KmqConfig KMQ_CONFIG = new KmqConfig("queue", "markers", "kmq_client",
"kmq_redelivery", Duration.ofSeconds(90).toMillis(), 1000);

/* EXAMPLE with extraConfig : SSL Encryption & SSL Authentication
Map extraConfig = new HashMap();
//configure the following three settings for SSL Encryption
extraConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/directory/kafka.client.truststore.jks");
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
// configure the following three settings for SSL Authentication
extraConfig.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/directory/kafka.client.keystore.jks");
extraConfig.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
extraConfig.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
static final KafkaClients KAFKA_CLIENTS = new KafkaClients("localhost:9092", extraConfig);
*/

static final KafkaClients KAFKA_CLIENTS = new KafkaClients("localhost:9092");
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,26 @@
import com.softwaremill.kmq.KmqClient;
import com.softwaremill.kmq.example.UncaughtExceptionHandling;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.config.SslConfigs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.util.Map;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import static com.softwaremill.kmq.example.standalone.StandaloneConfig.*;
import static com.softwaremill.kmq.example.standalone.StandaloneConfig.KAFKA_CLIENTS;
import static com.softwaremill.kmq.example.standalone.StandaloneConfig.KMQ_CONFIG;

class StandaloneProcessor {
private final static Logger LOG = LoggerFactory.getLogger(StandaloneProcessor.class);

public static void main(String[] args) throws InterruptedException, IOException {
public static void main(String[] args) {
UncaughtExceptionHandling.setup();

/* EXAMPLE with extraConfig : SSL Encryption & SSL Authentication
Map extraConfig = new HashMap();
//configure the following three settings for SSL Encryption
extraConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/directory/kafka.client.truststore.jks");
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
// configure the following three settings for SSL Authentication
extraConfig.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/directory/kafka.client.keystore.jks");
extraConfig.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
extraConfig.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
KmqClient<ByteBuffer, ByteBuffer> kmqClient = new KmqClient<>(KMQ_CONFIG, KAFKA_CLIENTS,
ByteBufferDeserializer.class, ByteBufferDeserializer.class, 100, extraConfig);
*/

KmqClient<ByteBuffer, ByteBuffer> kmqClient = new KmqClient<>(KMQ_CONFIG, KAFKA_CLIENTS,
ByteBufferDeserializer.class, ByteBufferDeserializer.class, 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,20 @@

import com.softwaremill.kmq.RedeliveryTracker;
import com.softwaremill.kmq.example.UncaughtExceptionHandling;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static com.softwaremill.kmq.example.standalone.StandaloneConfig.*;
import static com.softwaremill.kmq.example.standalone.StandaloneConfig.KAFKA_CLIENTS;
import static com.softwaremill.kmq.example.standalone.StandaloneConfig.KMQ_CONFIG;

class StandaloneRedeliveryTracker {
private final static Logger LOG = LoggerFactory.getLogger(StandaloneRedeliveryTracker.class);

public static void main(String[] args) throws InterruptedException, IOException {
public static void main(String[] args) throws IOException {
UncaughtExceptionHandling.setup();

/* EXAMPLE with extraConfig : SSL Encryption & SSL Authentication
Map extraConfig = new HashMap();
//configure the following three settings for SSL Encryption
extraConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/directory/kafka.client.truststore.jks");
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
// configure the following three settings for SSL Authentication
extraConfig.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/directory/kafka.client.keystore.jks");
extraConfig.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
extraConfig.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
Closeable redelivery = RedeliveryTracker.start(KAFKA_CLIENTS, KMQ_CONFIG, scala.Option.apply(extraConfig));
*/

Closeable redelivery = RedeliveryTracker.start(KAFKA_CLIENTS, KMQ_CONFIG);
LOG.info("Redelivery tracker started");
Expand Down

0 comments on commit 7dbd1eb

Please sign in to comment.