-
Notifications
You must be signed in to change notification settings - Fork 536
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): implement pulsar sink #12286
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/connector/src/sink/pulsar.rs
Outdated
#[serde( | ||
rename = "properties.retry.max", | ||
default = "_default_max_retries", | ||
deserialize_with = "deserialize_u32_from_string" | ||
)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may use serde_as
instead, be like
risingwave/src/connector/src/common.rs
Line 157 in 59bb645
#[serde_as(as = "Option<DisplayFromStr>")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ACK
src/connector/src/sink/pulsar.rs
Outdated
))); | ||
} | ||
|
||
// TODO: validate pulsar connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can do as the source enumerator does here. but you only need to know whether the call is successful or not
risingwave/src/connector/src/source/pulsar/enumerator/client.rs
Lines 80 to 108 in 59bb645
async fn list_splits(&mut self) -> anyhow::Result<Vec<PulsarSplit>> { | |
let offset = self.start_offset.clone(); | |
// MessageId is only used when recovering from a State | |
assert!(!matches!(offset, PulsarEnumeratorOffset::MessageId(_))); | |
let topic_partitions = self | |
.client | |
.lookup_partitioned_topic_number(&self.topic.to_string()) | |
.await | |
.map_err(|e| anyhow!(e))?; | |
let splits = if topic_partitions > 0 { | |
// partitioned topic | |
(0..topic_partitions as i32) | |
.map(|p| PulsarSplit { | |
topic: self.topic.sub_topic(p).unwrap(), | |
start_offset: offset.clone(), | |
}) | |
.collect_vec() | |
} else { | |
// non partitioned topic | |
vec![PulsarSplit { | |
topic: self.topic.clone(), | |
start_offset: offset.clone(), | |
}] | |
}; | |
Ok(splits) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Producer in pulsar-rs has a check_connection
method. However, calling it on a valid producer client which can send messages successfully, causes a connection error for no reason. Therefore I just build a producer client to validate the connection. The topic/url/token will be validated when building the producer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
src/connector/src/sink/pulsar.rs
Outdated
break; | ||
} | ||
// error upon sending | ||
Err(e) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic in KafkaSinkWriter
here is to retry just when the queue is full or the message time out.
But according to the error type in pulsar, we have the below types.
pub enum Error {
Connection(ConnectionError),
Consumer(ConsumerError),
Producer(ProducerError),
ServiceDiscovery(ServiceDiscoveryError),
Authentication(AuthenticationError),
Custom(String),
Executor,
}
In which cases (errors) we should retry maybe future considered.
Personally I'd prefer adding Producer
& Consumer
error checking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ACK
// a SendFuture holding the message receipt | ||
// or error after sending is returned | ||
Ok(send_future) => { | ||
// Check if send_future_buffer is greater than the preset limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The limit is set to 65536 for kafka b/c rdkafka
has the corresponding properties (number of messages that will be batched before sending).
Do we have the same parameter properties in pulsar
crate? (I have not found one)
See the screenshot below or refer to the original PR for details.
cc @hzxa21.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to create batch.num
and batch.size
in the config fields, with the same default value (10k and 1MB) as rdkafka. linger
can not be specified in pulsar-rs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question: since maximum number and size of messages in a batch is handled by the kafka/pulsar client, why should we check the 65536 threshold of buffered DeliveryFuture/SendFuture
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question: since maximum number and size of messages in a batch is handled by the kafka/pulsar client, why should we check the 65536 threshold of buffered DeliveryFuture/SendFuture?
The batched messages in rdkafka
or pulsar
are not essentially the same as the buffered futures in RW's connector.
In general, after a message is successfully sent to the client, we'll have a corresponding future buffered. The reason for this is to ensure the message has eventually been sent to the downstream (i.e., The Kafka Broker) by the rdkafka
or pulsar
. Otherwise the status of the messages can not be known without this.
If the messages are eventually delivered, there's nothing to do. If not, we will directly rollback to the latest checkpoint when any error is returned through the process.
Also, when the barrier is triggered by the sink coordinator, we'll group commit all the current buffered future to ensure every message is delivered. If not, also rollback to the latest checkpoint.
This is why we need an extra threshold for the buffered futures, in which case will ensure a efficient interval to check the status of all the sent but not yet delivered messages.
Ok(()) | ||
} | ||
|
||
async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In pulsar
crate we have a send_batch
function, to ensure all batched messages are sent.
Should we add a send_batch
before commit_inner
, in this case we do not need to await
unnecessary futures, cc @tabVersion.
/// sends the current batch of messages
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send_batch(&mut self) -> Result<(), Error> {
match &mut self.inner {
ProducerInner::Single(p) => p.send_batch().await,
ProducerInner::Partitioned(p) => {
try_join_all(p.producers.iter_mut().map(|p| p.send_batch()))
.await
.map(drop)
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my understanding, send_batch
ensures the messages in the current batch are sent, but does not ensure the receipt of the message held by the SendFuture
(returned by producer.send
). If messages are sent in batches, indeed we need to call send_batch
first during commit. But we still need to await on all SendFuture
s to ensure the receipts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to accelerate the later await
speed, since we already ensure the receipt of the messages from this send_batch
. So I think it's good to add one before the actual group await.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks for the effort!
aa5e798
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
WIP: implement pulsar sink
TODO:
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.