Skip to content

Commit

Permalink
Updates in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
jaymell committed Apr 10, 2024
1 parent fcffaba commit db98f2e
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 5 deletions.
27 changes: 23 additions & 4 deletions omniqueue/src/backends/azure_queue_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use azure_storage_queues::{
};
use serde::Serialize;

use crate::{queue::Acker, Delivery, QueueBackend, QueueError, Result};
use crate::{
builder::Static, queue::Acker, Delivery, QueueBackend, QueueBuilder, QueueError, Result,
};

fn get_client(cfg: &AqsConfig) -> QueueClient {
let storage_credentials =
Expand All @@ -27,14 +29,25 @@ fn get_client(cfg: &AqsConfig) -> QueueClient {
#[non_exhaustive]
pub struct AqsBackend;

impl AqsBackend {
/// Creates a new Azure Storage Queue builder with the given configuration.
pub fn builder(config: AqsConfig) -> QueueBuilder<Self, Static> {
QueueBuilder::new(config)
}
}

const DEFAULT_RECV_TIMEOUT: Duration = Duration::from_secs(180);
const DEFAULT_EMPTY_RECV_DELAY: Duration = Duration::from_millis(200);

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct AqsConfig {
pub queue_name: String,
pub empty_receive_delay: std::time::Duration,
pub message_ttl: std::time::Duration,
pub empty_receive_delay: Option<Duration>,
pub message_ttl: Duration,
pub storage_account: String,
pub access_key: String,
pub cloud_uri: Option<String>,
pub receive_timeout: Option<Duration>,
}

impl QueueBackend for AqsBackend {
Expand Down Expand Up @@ -157,6 +170,7 @@ impl AqsConsumer {
pub async fn receive(&mut self) -> Result<Delivery> {
self.client
.get_messages()
.visibility_timeout(self.config.receive_timeout.unwrap_or(DEFAULT_RECV_TIMEOUT))
.await
.map_err(QueueError::generic)
.and_then(|m| m.messages.into_iter().next().ok_or(QueueError::NoData))
Expand All @@ -169,13 +183,18 @@ impl AqsConsumer {
deadline: Duration,
) -> Result<Vec<Delivery>> {
let end = std::time::Instant::now() + deadline;
let mut interval = tokio::time::interval(self.config.empty_receive_delay);
let mut interval = tokio::time::interval(
self.config
.empty_receive_delay
.unwrap_or(DEFAULT_EMPTY_RECV_DELAY),
);
loop {
interval.tick().await;
let msgs = self
.client
.get_messages()
.number_of_messages(max_messages.try_into().unwrap_or(u8::MAX))
.visibility_timeout(self.config.receive_timeout.unwrap_or(DEFAULT_RECV_TIMEOUT))
.await
.map_err(QueueError::generic)
.map(|m| {
Expand Down
40 changes: 39 additions & 1 deletion omniqueue/tests/it/azure_queue_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,27 @@ use omniqueue::{
use serde::{Deserialize, Serialize};

async fn create_queue_get_a_pair() -> (AqsProducer, AqsConsumer) {
create_queue_get_a_pair_with_receive_timeout(None).await
}

async fn create_queue_get_a_pair_with_receive_timeout(
receive_timeout: Option<Duration>,
) -> (AqsProducer, AqsConsumer) {
let queue_name: String = std::iter::repeat_with(fastrand::lowercase)
.take(8)
.collect();

let cfg = AqsConfig {
queue_name,
empty_receive_delay: Duration::from_millis(1),
empty_receive_delay: None,
message_ttl: Duration::from_secs(90),
storage_account: azure_storage::EMULATOR_ACCOUNT.to_string(),
access_key: azure_storage::EMULATOR_ACCOUNT_KEY.to_string(),
cloud_uri: Some(format!(
"http://localhost:10001/{}",
azure_storage::EMULATOR_ACCOUNT
)),
receive_timeout,
};

let storage_credentials =
Expand Down Expand Up @@ -221,3 +228,34 @@ async fn test_empty_recv_all() {
assert!(now.elapsed() > deadline);
assert!(d.is_empty());
}

#[tokio::test]
async fn test_receive_timeout() {
let (producer, mut consumer) =
create_queue_get_a_pair_with_receive_timeout(Some(Duration::from_secs(2))).await;

let payload = "test123";
producer.send_raw(payload).await.unwrap();

let mut d = consumer.receive().await.unwrap();
assert_eq!(
payload,
&String::from_utf8(d.take_payload().unwrap()).unwrap()
);

tokio::time::sleep(Duration::from_secs(2) + Duration::from_millis(100)).await;

let mut d = consumer.receive().await.unwrap();
assert_eq!(
payload,
&String::from_utf8(d.take_payload().unwrap()).unwrap()
);
d.ack().await.unwrap();

tokio::time::sleep(Duration::from_secs(2) + Duration::from_millis(100)).await;

match consumer.receive().await {
Err(QueueError::NoData) => {}
_ => panic!("Unexpected result"),
}
}

0 comments on commit db98f2e

Please sign in to comment.