This library integrates Rama with external Apache Kafka clusters. It enables Kafka to be used as a source for Rama topologies and makes it easy to publish records to Kafka topics from topologies.
rama-kafka
is available in the following Nexus repository:
<repository>
<id>nexus-releases</id>
<url>https://nexus.redplanetlabs.com/repository/maven-public-releases</url>
</repository>
The latest release is:
<dependency>
<groupId>com.rpl</groupId>
<artifactId>rama-kafka</artifactId>
<version>0.9.0</version>
</dependency>
General information about integrating Rama with external systems can be found on this page.
Here's an example of using rama-kafka
to consume from one Kafka topic and publish to another Kafka topic:
public class KafkaIntegrationExampleModule implements RamaModule {
@Override
public void define(Setup setup, Topologies topologies) {
Map<String, Object> kafkaConfig = new HashMap<>();
kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.mycompany.com:9092,kafka2.mycompany.com:9092");
kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
setup.declareObject("*kafka", new KafkaExternalDepot(kafkaConfig, "myTopic"));
StreamTopology s = topologies.stream("s");
s.source("*kafka").out("*tuple")
.each(Ops.EXPAND, "*tuple").out("*key", "*value")
.each((String k, Long v) -> new ProducerRecord("anotherTopic", k, v * 8),
"*key", "*value").out("*producerRecord")
.eachAsync(new KafkaAppend(), "*kafka", "*producerRecord");
}
}
A topic on a remote Kafka cluster is declared in a Rama module by creating a KafkaExternalDepot
and passing it to a declareObject
call. In this example the topic "myTopic"
on a Kafka cluster is associated with the var "*kafka"
. KafkaExternalDepot
is parameterized with a map of configs, the same way you would create a KafkaConsumer or KafkaProducer. The configs accepted are the same.
KafkaExternalDepot
uses the "bootstrap.servers"
config to identify a Kafka cluster. Internally it will only make one KafkaConsumer
client per task thread per Kafka cluster. So if you are consuming multiple Kafka topics from the same cluster in the same module, as long as each KafkaExternalDepot
is declared with the same "bootstrap.servers"
config only one KafkaConsumer
will be created for that cluster per task thread.
The configs "enable.auto.commit"
and "group.id"
are not allowed to be specified as configs because Rama handles all offset management.
Serialization/deserialization to and from a Kafka topic is handled by Kafka, and you can specify the serializations to use via the config object.
When used as a source for a topology, a KafkaExternalDepot
emits two-element lists containing the key and value of each record. As shown here, Ops.EXPAND
is useful for extracting the key and value from each emitted list. There's no difference in functionality between using KafkaExternalDepot
as a source versus a built-in depot (e.g. all "start from" options are supported).
The KafkaAppend
function publishes records to a Kafka topic from within a topology. It takes as input a KafkaExternalDepot
and ProducerRecord. The ProducerRecord
contains the topic and partition information for publishing as well as the key and value of the record.
KafkaAppend
should be used with eachAsync
and emits the RecordMetadata returned by Kafka.
By default KafkaAppend
in an eachAsync
doesn't emit until Kafka has returned the RecordMetadata
. This ties success of the topology doing the append with success of the record being published to Kafka. So if the Kafka append fails, the topology will retry. If you don't care about this and prefer an at-most once append guarantee, you can parameterize the KafkaAppend
constructor to tell it to emit without waiting to hear back from Kafka. For example:
.eachAsync(new KafkaAppend(false), "*kafka", "*producerRecord")
In this case the eachAsync
call will emit null
and the topology will succeed without waiting for Kafka to acknowledge the append.