Skip to content

Commit

Permalink
add comments about the mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
neuronull committed Apr 11, 2023
1 parent 12111d2 commit f1a45e8
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/sinks/pulsar/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ impl MetaDescriptive for PulsarRequest {
}

pub struct PulsarService<Exe: Executor> {
// NOTE: the reason for the Mutex here is because the `Producer` from the pulsar crate
// needs to be `mut`, and the `Service::call()` returns a Future.
producer: Arc<Mutex<MultiTopicProducer<Exe>>>,
}

Expand Down Expand Up @@ -121,6 +123,12 @@ impl<Exe: Executor> Service<PulsarRequest> for PulsarService<Exe> {
..Default::default()
};

// The locking if this mutex is not normal in `Service::call()` implementations, but we
// at least can limit the scope of the lock by placing it here, and reduce the
// possibility of performance impact by checking the `try_lock()` result in
// `poll_ready()`. This sink is already limited to sequential request handling due to
// the pulsar API, so this shouldn't impact performance from a concurrent requests
// standpoint.
let fut = producer.lock().await.send(topic, message).await;

match fut {
Expand Down

0 comments on commit f1a45e8

Please sign in to comment.