diff --git a/omniqueue/src/backends/azure_queue_storage.rs b/omniqueue/src/backends/azure_queue_storage.rs index ff935d5..d2cca92 100644 --- a/omniqueue/src/backends/azure_queue_storage.rs +++ b/omniqueue/src/backends/azure_queue_storage.rs @@ -7,7 +7,9 @@ use azure_storage_queues::{ }; use serde::Serialize; -use crate::{queue::Acker, Delivery, QueueBackend, QueueError, Result}; +use crate::{ + builder::Static, queue::Acker, Delivery, QueueBackend, QueueBuilder, QueueError, Result, +}; fn get_client(cfg: &AqsConfig) -> QueueClient { let storage_credentials = @@ -27,14 +29,25 @@ fn get_client(cfg: &AqsConfig) -> QueueClient { #[non_exhaustive] pub struct AqsBackend; +impl AqsBackend { + /// Creates a new Azure Storage Queue builder with the given configuration. + pub fn builder(config: AqsConfig) -> QueueBuilder { + QueueBuilder::new(config) + } +} + +const DEFAULT_RECV_TIMEOUT: Duration = Duration::from_secs(180); +const DEFAULT_EMPTY_RECV_DELAY: Duration = Duration::from_millis(200); + #[derive(Clone, Debug, Eq, PartialEq)] pub struct AqsConfig { pub queue_name: String, - pub empty_receive_delay: std::time::Duration, - pub message_ttl: std::time::Duration, + pub empty_receive_delay: Option, + pub message_ttl: Duration, pub storage_account: String, pub access_key: String, pub cloud_uri: Option, + pub receive_timeout: Option, } impl QueueBackend for AqsBackend { @@ -157,6 +170,7 @@ impl AqsConsumer { pub async fn receive(&mut self) -> Result { self.client .get_messages() + .visibility_timeout(self.config.receive_timeout.unwrap_or(DEFAULT_RECV_TIMEOUT)) .await .map_err(QueueError::generic) .and_then(|m| m.messages.into_iter().next().ok_or(QueueError::NoData)) @@ -169,13 +183,18 @@ impl AqsConsumer { deadline: Duration, ) -> Result> { let end = std::time::Instant::now() + deadline; - let mut interval = tokio::time::interval(self.config.empty_receive_delay); + let mut interval = tokio::time::interval( + self.config + .empty_receive_delay + .unwrap_or(DEFAULT_EMPTY_RECV_DELAY), + ); loop { interval.tick().await; let msgs = self .client .get_messages() .number_of_messages(max_messages.try_into().unwrap_or(u8::MAX)) + .visibility_timeout(self.config.receive_timeout.unwrap_or(DEFAULT_RECV_TIMEOUT)) .await .map_err(QueueError::generic) .map(|m| { diff --git a/omniqueue/tests/it/azure_queue_storage.rs b/omniqueue/tests/it/azure_queue_storage.rs index 3d3674e..eeb2079 100644 --- a/omniqueue/tests/it/azure_queue_storage.rs +++ b/omniqueue/tests/it/azure_queue_storage.rs @@ -12,13 +12,19 @@ use omniqueue::{ use serde::{Deserialize, Serialize}; async fn create_queue_get_a_pair() -> (AqsProducer, AqsConsumer) { + create_queue_get_a_pair_with_receive_timeout(None).await +} + +async fn create_queue_get_a_pair_with_receive_timeout( + receive_timeout: Option, +) -> (AqsProducer, AqsConsumer) { let queue_name: String = std::iter::repeat_with(fastrand::lowercase) .take(8) .collect(); let cfg = AqsConfig { queue_name, - empty_receive_delay: Duration::from_millis(1), + empty_receive_delay: None, message_ttl: Duration::from_secs(90), storage_account: azure_storage::EMULATOR_ACCOUNT.to_string(), access_key: azure_storage::EMULATOR_ACCOUNT_KEY.to_string(), @@ -26,6 +32,7 @@ async fn create_queue_get_a_pair() -> (AqsProducer, AqsConsumer) { "http://localhost:10001/{}", azure_storage::EMULATOR_ACCOUNT )), + receive_timeout, }; let storage_credentials = @@ -221,3 +228,34 @@ async fn test_empty_recv_all() { assert!(now.elapsed() > deadline); assert!(d.is_empty()); } + +#[tokio::test] +async fn test_receive_timeout() { + let (producer, mut consumer) = + create_queue_get_a_pair_with_receive_timeout(Some(Duration::from_secs(2))).await; + + let payload = "test123"; + producer.send_raw(payload).await.unwrap(); + + let mut d = consumer.receive().await.unwrap(); + assert_eq!( + payload, + &String::from_utf8(d.take_payload().unwrap()).unwrap() + ); + + tokio::time::sleep(Duration::from_secs(2) + Duration::from_millis(100)).await; + + let mut d = consumer.receive().await.unwrap(); + assert_eq!( + payload, + &String::from_utf8(d.take_payload().unwrap()).unwrap() + ); + d.ack().await.unwrap(); + + tokio::time::sleep(Duration::from_secs(2) + Duration::from_millis(100)).await; + + match consumer.receive().await { + Err(QueueError::NoData) => {} + _ => panic!("Unexpected result"), + } +}