Skip to content

Commit

Permalink
udpate producer and consumer (#217)
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio committed Apr 24, 2024
1 parent 551d00e commit 55c742d
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ let environment = Environment::builder().build().await?;
let producer = environment.producer().name("myproducer").build("mystream").await?;
for i in 0..10 {
producer
.send(Message::builder().body(format!("message{}", i)).build())
.send_with_confirm(Message::builder().body(format!("message{}", i)).build())
.await?;
}
producer.close().await?;
Expand All @@ -111,10 +111,13 @@ let environment = Environment::builder().build().await?;
let mut consumer = environment.consumer().build("mystream").await?;
let handle = consumer.handle();
task::spawn(async move {
while let Some(delivery) = consumer.next().await {
println!("Got message {:?}",delivery);
}
});
while let Some(delivery) = consumer.next().await {
let d = delivery.unwrap();
println!("Got message: {:#?} with offset: {}",
d.message().data().map(|data| String::from_utf8(data.to_vec()).unwrap()),
d.offset(),);
}
});
// wait 10 second and then close the consumer
sleep(Duration::from_secs(10)).await;
handle.close().await?;
Expand Down

0 comments on commit 55c742d

Please sign in to comment.