Skip to content

Commit

Permalink
pgmq metadata (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHend authored Feb 4, 2023
1 parent e111577 commit fb8631c
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 37 deletions.
6 changes: 3 additions & 3 deletions crates/pgmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn main() {
let msg1 = serde_json::json!({
"foo": "bar"
});
let msg_id1: i64 = queue.enqueue(&myqueue, &msg1).await.expect("Failed to enqueue message");
let msg_id1: i64 = queue.send(&myqueue, &msg1).await.expect("Failed to enqueue message");

// SEND A STRUCT
#[derive(Serialize, Debug, Deserialize)]
Expand All @@ -40,7 +40,7 @@ async fn main() {
let msg2 = MyMessage {
foo: "bar".to_owned(),
};
let msg_id2: i64 = queue.enqueue(&myqueue, &msg2).await.expect("Failed to enqueue message");
let msg_id2: i64 = queue.send(&myqueue, &msg2).await.expect("Failed to enqueue message");

// READ A MESSAGE as `serde_json::Value`
let vt: i32 = 30;
Expand All @@ -62,7 +62,7 @@ async fn main() {
```
## Sending messages

`queue.enqueue()` can be passed any type that implements `serde::Serialize`. This means you can prepare your messages as JSON or as a struct.
`queue.send()` can be passed any type that implements `serde::Serialize`. This means you can prepare your messages as JSON or as a struct.

## Reading messages
Reading a message will make it invisible (unavailable for consumption) for the duration of the visibility timeout (vt).
Expand Down
4 changes: 2 additions & 2 deletions crates/pgmq/examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn main() {
"foo": "bar"
});
let _msg_id1: i64 = queue
.enqueue(&myqueue, &msg1)
.send(&myqueue, &msg1)
.await
.expect("Failed to enqueue message");

Expand All @@ -31,7 +31,7 @@ async fn main() {
foo: "bar".to_owned(),
};
let _msg_id2: i64 = queue
.enqueue(&myqueue, &msg2)
.send(&myqueue, &msg2)
.await
.expect("Failed to enqueue message");

Expand Down
19 changes: 11 additions & 8 deletions crates/pgmq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
//! let msg1 = serde_json::json!({
//! "foo": "bar"
//! });
//! let msg_id1: i64 = queue.enqueue(&myqueue, &msg1).await.expect("Failed to enqueue message");
//! let msg_id1: i64 = queue.send(&myqueue, &msg1).await.expect("Failed to enqueue message");
//!
//! // SEND A STRUCT
//! #[derive(Serialize, Debug, Deserialize)]
Expand All @@ -40,7 +40,7 @@
//! let msg2 = MyMessage {
//! foo: "bar".to_owned(),
//! };
//! let msg_id2: i64 = queue.enqueue(&myqueue, &msg2).await.expect("Failed to enqueue message");
//! let msg_id2: i64 = queue.send(&myqueue, &msg2).await.expect("Failed to enqueue message");
//!
//! // READ A MESSAGE as `serde_json::Value`
//! let vt: i32 = 30;
Expand All @@ -62,7 +62,7 @@
//! ```
//! ## Sending messages
//!
//! `queue.enqueue()` can be passed any type that implements `serde::Serialize`. This means you can prepare your messages as JSON or as a struct.
//! `queue.send()` can be passed any type that implements `serde::Serialize`. This means you can prepare your messages as JSON or as a struct.
//!
//! ## Reading messages
//! Reading a message will make it invisible (unavailable for consumption) for the duration of the visibility timeout (vt).
Expand Down Expand Up @@ -131,15 +131,17 @@ impl PGMQueue {

/// Create a queue
pub async fn create(&self, queue_name: &str) -> Result<(), errors::PgmqError> {
let create = query::create(queue_name);
let index: String = query::create_index(queue_name);
sqlx::query(&create).execute(&self.connection).await?;
sqlx::query(&index).execute(&self.connection).await?;
let mut tx = self.connection.begin().await?;
let setup = query::init_queue(queue_name);
for q in setup {
sqlx::query(&q).execute(&mut tx).await?;
}
tx.commit().await?;
Ok(())
}

/// Send a message to the queue
pub async fn enqueue<T: Serialize>(
pub async fn send<T: Serialize>(
&self,
queue_name: &str,
message: &T,
Expand Down Expand Up @@ -195,6 +197,7 @@ async fn fetch_one_message<T: for<'de> Deserialize<'de>>(
query: &str,
connection: &Pool<Postgres>,
) -> Result<Option<Message<T>>, errors::PgmqError> {
// explore: .fetch_optional()
let row: Result<PgRow, Error> = sqlx::query(query).fetch_one(connection).await;
match row {
Ok(row) => {
Expand Down
71 changes: 66 additions & 5 deletions crates/pgmq/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,66 @@
pub const TABLE_PREFIX: &str = r#"pgmq"#;

pub fn create(name: &str) -> String {
pub fn init_queue(name: &str) -> Vec<String> {
vec![
create_meta(),
create_queue(name),
create_index(name),
create_archive(name),
insert_meta(name),
]
}

pub fn create_queue(name: &str) -> String {
format!(
"
CREATE TABLE IF NOT EXISTS {TABLE_PREFIX}_{name} (
msg_id BIGSERIAL,
read_ct INT DEFAULT 0,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc'),
vt TIMESTAMP WITH TIME ZONE,
message JSON
);
"
)
}

pub fn create_archive(name: &str) -> String {
format!(
"
CREATE TABLE IF NOT EXISTS {TABLE_PREFIX}_{name}_archive (
msg_id BIGSERIAL,
read_ct INT DEFAULT 0,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc'),
deleted_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc'),
vt TIMESTAMP WITH TIME ZONE,
message JSON
);
"
)
}

pub fn create_meta() -> String {
format!(
"
CREATE TABLE IF NOT EXISTS {TABLE_PREFIX}_meta (
queue_name VARCHAR UNIQUE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc')
);
"
)
}

pub fn insert_meta(name: &str) -> String {
format!(
"
INSERT INTO {TABLE_PREFIX}_meta (queue_name)
VALUES ('{name}')
ON CONFLICT
DO NOTHING;
"
)
}

pub fn create_index(name: &str) -> String {
format!(
"
Expand Down Expand Up @@ -46,15 +95,17 @@ pub fn read(name: &str, vt: &i32) -> String {
"
WITH cte AS
(
SELECT msg_id, vt, message
SELECT *
FROM {TABLE_PREFIX}_{name}
WHERE vt <= now() at time zone 'utc'
ORDER BY msg_id ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE {TABLE_PREFIX}_{name}
SET vt = (now() at time zone 'utc' + interval '{vt} seconds')
SET
vt = (now() at time zone 'utc' + interval '{vt} seconds'),
read_ct = read_ct + 1
WHERE msg_id = (select msg_id from cte)
RETURNING *;
"
Expand All @@ -70,12 +121,22 @@ pub fn delete(name: &str, msg_id: &i64) -> String {
)
}

pub fn archive(name: &str, msg_id: &i64) -> String {
format!(
"
INSERT INTO {TABLE_PREFIX}_{name}_archive
SELECT * FROM {TABLE_PREFIX}_{name}
WHERE msg_id = {msg_id};
"
)
}

pub fn pop(name: &str) -> String {
format!(
"
WITH cte AS
(
SELECT msg_id, vt, message
SELECT *
FROM {TABLE_PREFIX}_{name}
WHERE vt <= now() at time zone 'utc'
ORDER BY msg_id ASC
Expand All @@ -95,7 +156,7 @@ mod tests {

#[test]
fn test_create() {
let query = create("yolo");
let query = create_queue("yolo");
assert!(query.contains("pgmq_yolo"));
}

Expand Down
24 changes: 12 additions & 12 deletions crates/pgmq/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn test_lifecycle() {
let msg = serde_json::json!({
"foo": "bar"
});
let msg_id = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg_id = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg_id, 1);
let num_rows = rowcount(&test_queue, &queue.connection).await;

Expand Down Expand Up @@ -124,11 +124,11 @@ async fn test_fifo() {
let msg = serde_json::json!({
"foo": "bar1"
});
let msg_id1 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg_id1 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg_id1, 1);
let msg_id2 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg_id2 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg_id2, 2);
let msg_id3 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg_id3 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg_id3, 3);

let vt: i32 = 1;
Expand Down Expand Up @@ -183,7 +183,7 @@ async fn test_serde() {
foo: "bar".to_owned(),
num: rng.gen_range(0..100000),
};
let msg1 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg1 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg1, 1);

let msg_read = queue
Expand All @@ -200,7 +200,7 @@ async fn test_serde() {
"foo": "bar",
"num": rng.gen_range(0..100000)
});
let msg2 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg2 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg2, 2);

let msg_read = queue
Expand All @@ -219,7 +219,7 @@ async fn test_serde() {
"num": rng.gen_range(0..100000)
});

let msg3 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg3 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg3, 3);

let msg_read = queue
Expand All @@ -236,7 +236,7 @@ async fn test_serde() {
foo: "bar".to_owned(),
num: rng.gen_range(0..100000),
};
let msg4 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg4 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg4, 4);
let msg_read = queue
.read::<Value>(&test_queue, Some(&30_i32))
Expand All @@ -254,7 +254,7 @@ async fn test_serde() {
"foo": "bar".to_owned(),
"num": rng.gen_range(0..100000),
});
let msg5 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg5 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg5, 5);
let msg_read: crate::pgmq::Message = queue
.read(&test_queue, Some(&30_i32)) // no turbofish on this line
Expand All @@ -274,7 +274,7 @@ async fn test_pop() {
let test_queue = "test_pop_queue".to_owned();
let queue = init_queue(&test_queue).await;
let msg = MyMessage::default();
let msg = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg, 1);
let popped_msg = queue.pop::<MyMessage>(&test_queue).await.unwrap().unwrap();
assert_eq!(popped_msg.msg_id, 1);
Expand All @@ -291,7 +291,7 @@ async fn test_database_error_modes() {
.await
.expect("failed to connect to postgres");
// let's not create the queues and make sure we get an error
let msg_id = queue.enqueue("doesNotExist", &"foo").await;
let msg_id = queue.send("doesNotExist", &"foo").await;
assert!(msg_id.is_err());

// read from a queue that does not exist should error
Expand Down Expand Up @@ -321,7 +321,7 @@ async fn test_parsing_error_modes() {
let test_queue = "test_parsing_queue".to_owned();
let queue = init_queue(&test_queue).await;
let msg = MyMessage::default();
let _ = queue.enqueue(&test_queue, &msg).await.unwrap();
let _ = queue.send(&test_queue, &msg).await.unwrap();

// we sent MyMessage, so trying to parse into YoloMessage should error
let read_msg = queue.read::<YoloMessage>(&test_queue, Some(&10_i32)).await;
Expand Down
Loading

0 comments on commit fb8631c

Please sign in to comment.