Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions include/cppkafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,20 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
* \return The topic partition list
*/
TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const;

/**
* \brief Gets the offsets committed for the given topic/partition list with a timeout
*
* This translates into a call to rd_kafka_committed
*
* \param topic_partitions The topic/partition list to be queried
*
* \param timeout The timeout for this operation. Supersedes the default consumer timeout.
*
* \return The topic partition list
*/
TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions,
std::chrono::milliseconds timeout) const;

/**
* \brief Gets the offset positions for the given topic/partition list
Expand Down
85 changes: 83 additions & 2 deletions include/cppkafka/kafka_handle_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,20 @@ class CPPKAFKA_API KafkaHandleBase {
* \return A pair of watermark offsets {low, high}
*/
OffsetTuple query_offsets(const TopicPartition& topic_partition) const;

/**
* \brief Queries the offset for the given topic/partition with a given timeout
*
* This translates into a call to rd_kafka_query_watermark_offsets
*
* \param topic_partition The topic/partition to be queried
*
* \timeout The timeout for this operation. This supersedes the default handle timeout.
*
* \return A pair of watermark offsets {low, high}
*/
OffsetTuple query_offsets(const TopicPartition& topic_partition,
std::chrono::milliseconds timeout) const;

/**
* \brief Gets the rdkafka handle
Expand Down Expand Up @@ -177,6 +191,20 @@ class CPPKAFKA_API KafkaHandleBase {
* \return The metadata
*/
Metadata get_metadata(bool all_topics = true) const;

/**
* \brief Gets metadata for brokers, topics, partitions, etc with a timeout
*
* This translates into a call to rd_kafka_metadata
*
* \param all_topics Whether to fetch metadata about all topics or only locally known ones
*
* \param timeout The timeout for this operation. Supersedes the default handle timeout.
*
* \return The metadata
*/
Metadata get_metadata(bool all_topics,
std::chrono::milliseconds timeout) const;

/**
* \brief Gets general metadata but only fetches metadata for the given topic rather than
Expand All @@ -189,6 +217,21 @@ class CPPKAFKA_API KafkaHandleBase {
* \return The topic metadata
*/
TopicMetadata get_metadata(const Topic& topic) const;

/**
* \brief Gets general metadata but only fetches metadata for the given topic rather than
* all of them. Uses a timeout to limit the operation execution time.
*
* This translates into a call to rd_kafka_metadata
*
* \param topic The topic to fetch information for
*
* \param timeout The timeout for this operation. Supersedes the default handle timeout.
*
* \return The topic metadata
*/
TopicMetadata get_metadata(const Topic& topic,
std::chrono::milliseconds timeout) const;

/**
* \brief Gets the consumer group information
Expand All @@ -198,13 +241,34 @@ class CPPKAFKA_API KafkaHandleBase {
* \return The group information
*/
GroupInformation get_consumer_group(const std::string& name);

/**
* \brief Gets the consumer group information with a timeout
*
* \param name The name of the consumer group to look up
*
* \param timeout The timeout for this operation. Supersedes the default handle timeout.
*
* \return The group information
*/
GroupInformation get_consumer_group(const std::string& name,
std::chrono::milliseconds timeout);

/**
* \brief Gets all consumer groups
*
* \return A list of consumer groups
*/
GroupInformationList get_consumer_groups();

/**
* \brief Gets all consumer groups with a timeout
*
* \param timeout The timeout for this operation. Supersedes the default handle timeout.
*
* \return A list of consumer groups
*/
GroupInformationList get_consumer_groups(std::chrono::milliseconds timeout);

/**
* \brief Gets topic/partition offsets based on timestamps
Expand All @@ -216,6 +280,20 @@ class CPPKAFKA_API KafkaHandleBase {
* \return A topic partition list
*/
TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const;

/**
* \brief Gets topic/partition offsets based on timestamps with a timeout
*
* This translates into a call to rd_kafka_offsets_for_times
*
* \param queries A map from topic/partition to the timestamp to be used
*
* \param timeout The timeout for this operation. This supersedes the default handle timeout.
*
* \return A topic partition list
*/
TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries,
std::chrono::milliseconds timeout) const;

/**
* \brief Get the kafka handle name
Expand Down Expand Up @@ -283,8 +361,11 @@ class CPPKAFKA_API KafkaHandleBase {
using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>;

Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
Metadata get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const;
GroupInformationList fetch_consumer_groups(const char* name);
Metadata get_metadata(bool all_topics,
rd_kafka_topic_t* topic_ptr,
std::chrono::milliseconds timeout) const;
GroupInformationList fetch_consumer_groups(const char* name,
std::chrono::milliseconds timeout);
void save_topic_config(const std::string& topic_name, TopicConfiguration config);

std::chrono::milliseconds timeout_ms_;
Expand Down
8 changes: 7 additions & 1 deletion src/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,15 @@ KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_p

TopicPartitionList
Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) const {
return get_offsets_committed(topic_partitions, get_timeout());
}

TopicPartitionList
Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions,
milliseconds timeout) const {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(),
static_cast<int>(get_timeout().count()));
static_cast<int>(timeout.count()));
check_error(error, topic_list_handle.get());
return convert(topic_list_handle);
}
Expand Down
62 changes: 48 additions & 14 deletions src/kafka_handle_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,24 +108,40 @@ Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration config)

KafkaHandleBase::OffsetTuple
KafkaHandleBase::query_offsets(const TopicPartition& topic_partition) const {
return query_offsets(topic_partition, timeout_ms_);
}

KafkaHandleBase::OffsetTuple
KafkaHandleBase::query_offsets(const TopicPartition& topic_partition,
milliseconds timeout) const {
int64_t low;
int64_t high;
const string& topic = topic_partition.get_topic();
const int partition = topic_partition.get_partition();
const int timeout = static_cast<int>(timeout_ms_.count());
const int timeout_ms = static_cast<int>(timeout.count());
rd_kafka_resp_err_t result = rd_kafka_query_watermark_offsets(handle_.get(), topic.data(),
partition, &low, &high,
timeout);
timeout_ms);
check_error(result);
return make_tuple(low, high);
}

Metadata KafkaHandleBase::get_metadata(bool all_topics) const {
return get_metadata(all_topics, nullptr);
return get_metadata(all_topics, nullptr, timeout_ms_);
}

Metadata KafkaHandleBase::get_metadata(bool all_topics,
milliseconds timeout) const {
return get_metadata(all_topics, nullptr, timeout);
}

TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const {
Metadata md = get_metadata(false, topic.get_handle());
return get_metadata(topic, timeout_ms_);
}

TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic,
milliseconds timeout) const {
Metadata md = get_metadata(false, topic.get_handle(), timeout);
auto topics = md.get_topics();
if (topics.empty()) {
throw ElementNotFound("topic metadata", topic.get_name());
Expand All @@ -134,29 +150,44 @@ TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const {
}

GroupInformation KafkaHandleBase::get_consumer_group(const string& name) {
auto result = fetch_consumer_groups(name.c_str());
return get_consumer_group(name, timeout_ms_);
}

GroupInformation KafkaHandleBase::get_consumer_group(const string& name,
milliseconds timeout) {
auto result = fetch_consumer_groups(name.c_str(), timeout);
if (result.empty()) {
throw ElementNotFound("consumer group information", name);
}
return move(result[0]);
}

vector<GroupInformation> KafkaHandleBase::get_consumer_groups() {
return fetch_consumer_groups(nullptr);
return get_consumer_groups(timeout_ms_);
}

vector<GroupInformation> KafkaHandleBase::get_consumer_groups(milliseconds timeout) {
return fetch_consumer_groups(nullptr, timeout);
}

TopicPartitionList
KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const {
return get_offsets_for_times(queries, timeout_ms_);
}

TopicPartitionList
KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries,
milliseconds timeout) const {
TopicPartitionList topic_partitions;
for (const auto& query : queries) {
const TopicPartition& topic_partition = query.first;
topic_partitions.emplace_back(topic_partition.get_topic(), topic_partition.get_partition(),
query.second.count());
}
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
const int timeout = static_cast<int>(timeout_ms_.count());
const int timeout_ms = static_cast<int>(timeout.count());
rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(),
timeout);
timeout_ms);
check_error(result, topic_list_handle.get());
return convert(topic_list_handle);
}
Expand Down Expand Up @@ -193,19 +224,22 @@ Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf
return Topic(topic);
}

Metadata KafkaHandleBase::get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const {
Metadata KafkaHandleBase::get_metadata(bool all_topics,
rd_kafka_topic_t* topic_ptr,
milliseconds timeout) const {
const rd_kafka_metadata_t* metadata;
const int timeout = static_cast<int>(timeout_ms_.count());
const int timeout_ms = static_cast<int>(timeout.count());
rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), !!all_topics,
topic_ptr, &metadata, timeout);
topic_ptr, &metadata, timeout_ms);
check_error(error);
return Metadata(metadata);
}

vector<GroupInformation> KafkaHandleBase::fetch_consumer_groups(const char* name) {
vector<GroupInformation> KafkaHandleBase::fetch_consumer_groups(const char* name,
milliseconds timeout) {
const rd_kafka_group_list* list = nullptr;
const int timeout = static_cast<int>(timeout_ms_.count());
auto result = rd_kafka_list_groups(get_handle(), name, &list, timeout);
const int timeout_ms = static_cast<int>(timeout.count());
auto result = rd_kafka_list_groups(get_handle(), name, &list, timeout_ms);
check_error(result);

// Wrap this in a unique_ptr so it gets auto deleted
Expand Down