Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Disable automatic committing
This keeps CS and Kafka in sync by making sure all consumed messages have
been committed only after CS successfully applies them.
  • Loading branch information
markus456 committed Apr 3, 2019
1 parent 1514cef commit 3442823
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 0 deletions.
1 change: 1 addition & 0 deletions kafka-avro-adapter/src/controller.cpp
Expand Up @@ -48,6 +48,7 @@ void Controller::run()
if (m_queue.size() > 0 && should_flush())
{
m_producer.write(m_queue);
m_consumer.commit();
m_last_flush = Clock::now();
}
}
Expand Down
11 changes: 11 additions & 0 deletions kafka-avro-adapter/src/kafka_consumer.cpp
Expand Up @@ -142,6 +142,7 @@ KafkaConsumer::KafkaConsumer(const Options& options):
std::string errstr;
conf->set("metadata.broker.list", m_options.broker, errstr);
conf->set("group.id", m_options.group, errstr);
conf->set("enable.auto.commit", "false", errstr);

if (!m_options.debug.empty())
{
Expand Down Expand Up @@ -194,3 +195,13 @@ Result KafkaConsumer::read()

return rval;
}

void KafkaConsumer::commit()
{
RdKafka::ErrorCode err = m_consumer->commitSync();

if (err != RdKafka::ERR_NO_ERROR)
{
throw AdapterError(RdKafka::err2str(err));
}
}
1 change: 1 addition & 0 deletions kafka-avro-adapter/src/kafka_consumer.h
Expand Up @@ -28,6 +28,7 @@ class KafkaConsumer
KafkaConsumer(const Options& options);
~KafkaConsumer();
Result read();
void commit();

private:
const Options& m_options;
Expand Down

0 comments on commit 3442823

Please sign in to comment.