Skip to content

Commit

Permalink
Add check_topic_metadata kafka source configuration flag
Browse files Browse the repository at this point in the history
Signed-off-by: Darach Ennis <darach@gmail.com>
  • Loading branch information
darach authored and Licenser committed Oct 13, 2021
1 parent 2d3013c commit cd4bc86
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -4,6 +4,7 @@

### New featues

- Add `check_topic_metadata` configuration flag to kafka source to bypass topic metadata fetch
- Add support for `FLOAT4` and `FLOAT8` serialization/deserialization to postgres connectors
- Add `std::path::try_default` fn
- Experimental ARM support
Expand Down
63 changes: 40 additions & 23 deletions src/source/kafka.rs
Expand Up @@ -79,6 +79,8 @@ pub struct Config {
/// This should not be used when you expect persistent errors (e.g. if the message content is malformed and will lead to repeated errors)
#[serde(default = "default_retry_failed_events")]
pub retry_failed_events: bool,
#[serde(default = "default_check_topic_metadata")]
pub check_topic_metadata: bool,

/// Optional rdkafka configuration
///
Expand All @@ -98,6 +100,11 @@ fn default_retry_failed_events() -> bool {
true
}

/// defaults to `true` to keep backwards compatibility
fn default_check_topic_metadata() -> bool {
false
}

impl ConfigImpl for Config {}

pub struct Kafka {
Expand Down Expand Up @@ -554,30 +561,40 @@ impl Source for Int {

let mut good_topics = Vec::new();
for topic in topics {
match consumer.fetch_metadata(Some(topic), Duration::from_secs(1)) {
Ok(m) => {
let errors: Vec<_> = m
.topics()
.iter()
.map(rdkafka::metadata::MetadataTopic::error)
.collect();
match errors.as_slice() {
[None] => good_topics.push(topic),
[Some(e)] => error!(
"[Source::{}] Kafka error for topic '{}': {:?}. Not subscribing!",
self.onramp_id, topic, e
),
_ => error!(
"[Source::{}] Unknown kafka error for topic '{}'. Not subscribing!",
self.onramp_id, topic
),
if self.config.check_topic_metadata {
// Unless explicitly configured otherwise - we preserve the legacy behaviour
// - We disabuse topic metadata to validate topic existance which isn't ideal
// - At the time of writing - Oct 2021 - this is currently manifesting as a production
// issue that is affecting our systems and we are bypassing this check as a countermeasure
//
match consumer.fetch_metadata(Some(topic), Duration::from_secs(1)) {
Ok(m) => {
let errors: Vec<_> = m
.topics()
.iter()
.map(rdkafka::metadata::MetadataTopic::error)
.collect();
match errors.as_slice() {
[None] => good_topics.push(topic),
[Some(e)] => error!(
"[Source::{}] Kafka error for topic '{}': {:?}. Not subscribing!",
self.onramp_id, topic, e
),
_ => error!(
"[Source::{}] Unknown kafka error for topic '{}'. Not subscribing!",
self.onramp_id, topic
),
}
}
}
Err(e) => error!(
"[Source::{}] Kafka error for topic '{}': {}. Not subscribing!",
self.onramp_id, topic, e
),
};
Err(e) => error!(
"[Source::{}] Kafka error for topic '{}': {}. Not subscribing!",
self.onramp_id, topic, e
),
};
} else {
// By configuration, we presume topics are good
good_topics.push(topic);
}
}

// bail out if there is no topic left to subscribe to
Expand Down

0 comments on commit cd4bc86

Please sign in to comment.