Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add an example of producer batching usage #78

Closed
Geal opened this issue May 5, 2020 · 1 comment
Closed

add an example of producer batching usage #78

Geal opened this issue May 5, 2020 · 1 comment

Comments

@Geal
Copy link
Collaborator

Geal commented May 5, 2020

I just implemented message batching in TopicProducer in 182d450 and I have found that using it can be a bit tricky. calling send() on a prducer returns a future of CommandSendReceipt. When batching, that future will not resolved until the batch has been sent, so we cannot do producer.send(...).await in a loop, as is done in the round trip example, because we would be stuck at the first send() https://github.com/wyyerd/pulsar-rs/blob/182d45071c02e6f5885dcfa8293bd2a299178d6c/examples/round_trip.rs#L74-L81
Instead, we need to collect the receipt futures, and await on them all at once:

let producer = pulsar.create_producer(
            "test",
            Some("my-producer".to_string()),
            producer::ProducerOptions {
                batch_size: Some(5),
                ..Default::default()
            }).await.unwrap();

let mut receipts = Vec::new();
        loop {
            let receipt = async {
                producer.send(
                    TestData {
                        data: "data".to_string(),
                    },
                    ).await
            };
            receipts.push(receipt);
            //.await.unwrap();
            counter += 1;
            if counter % 5 == 0 {
                println!("sent {} messages", counter);
                break;
            }
        }

        println!("received receipts: {:?}",join_all(receipts).await);

Maybe there's a better way to represent this in the API?

@Geal
Copy link
Collaborator Author

Geal commented Jul 2, 2020

done in 81f0bbb after changing the message sending API in 024dd08

@Geal Geal closed this as completed Jul 2, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant