Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgmq metadata #78

Merged
merged 19 commits into from
Feb 4, 2023
6 changes: 3 additions & 3 deletions crates/pgmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn main() {
let msg1 = serde_json::json!({
"foo": "bar"
});
let msg_id1: i64 = queue.enqueue(&myqueue, &msg1).await.expect("Failed to enqueue message");
let msg_id1: i64 = queue.send(&myqueue, &msg1).await.expect("Failed to enqueue message");

// SEND A STRUCT
#[derive(Serialize, Debug, Deserialize)]
Expand All @@ -40,7 +40,7 @@ async fn main() {
let msg2 = MyMessage {
foo: "bar".to_owned(),
};
let msg_id2: i64 = queue.enqueue(&myqueue, &msg2).await.expect("Failed to enqueue message");
let msg_id2: i64 = queue.send(&myqueue, &msg2).await.expect("Failed to enqueue message");

// READ A MESSAGE as `serde_json::Value`
let vt: i32 = 30;
Expand All @@ -62,7 +62,7 @@ async fn main() {
```
## Sending messages

`queue.enqueue()` can be passed any type that implements `serde::Serialize`. This means you can prepare your messages as JSON or as a struct.
`queue.send()` can be passed any type that implements `serde::Serialize`. This means you can prepare your messages as JSON or as a struct.

## Reading messages
Reading a message will make it invisible (unavailable for consumption) for the duration of the visibility timeout (vt).
Expand Down
4 changes: 2 additions & 2 deletions crates/pgmq/examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn main() {
"foo": "bar"
});
let _msg_id1: i64 = queue
.enqueue(&myqueue, &msg1)
.send(&myqueue, &msg1)
.await
.expect("Failed to enqueue message");

Expand All @@ -31,7 +31,7 @@ async fn main() {
foo: "bar".to_owned(),
};
let _msg_id2: i64 = queue
.enqueue(&myqueue, &msg2)
.send(&myqueue, &msg2)
.await
.expect("Failed to enqueue message");

Expand Down
19 changes: 11 additions & 8 deletions crates/pgmq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
//! let msg1 = serde_json::json!({
//! "foo": "bar"
//! });
//! let msg_id1: i64 = queue.enqueue(&myqueue, &msg1).await.expect("Failed to enqueue message");
//! let msg_id1: i64 = queue.send(&myqueue, &msg1).await.expect("Failed to enqueue message");
//!
//! // SEND A STRUCT
//! #[derive(Serialize, Debug, Deserialize)]
Expand All @@ -40,7 +40,7 @@
//! let msg2 = MyMessage {
//! foo: "bar".to_owned(),
//! };
//! let msg_id2: i64 = queue.enqueue(&myqueue, &msg2).await.expect("Failed to enqueue message");
//! let msg_id2: i64 = queue.send(&myqueue, &msg2).await.expect("Failed to enqueue message");
//!
//! // READ A MESSAGE as `serde_json::Value`
//! let vt: i32 = 30;
Expand All @@ -62,7 +62,7 @@
//! ```
//! ## Sending messages
//!
//! `queue.enqueue()` can be passed any type that implements `serde::Serialize`. This means you can prepare your messages as JSON or as a struct.
//! `queue.send()` can be passed any type that implements `serde::Serialize`. This means you can prepare your messages as JSON or as a struct.
//!
//! ## Reading messages
//! Reading a message will make it invisible (unavailable for consumption) for the duration of the visibility timeout (vt).
Expand Down Expand Up @@ -131,15 +131,17 @@ impl PGMQueue {

/// Create a queue
pub async fn create(&self, queue_name: &str) -> Result<(), errors::PgmqError> {
let create = query::create(queue_name);
let index: String = query::create_index(queue_name);
sqlx::query(&create).execute(&self.connection).await?;
sqlx::query(&index).execute(&self.connection).await?;
let mut tx = self.connection.begin().await?;
let setup = query::init_queue(queue_name);
for q in setup {
sqlx::query(&q).execute(&mut tx).await?;
}
tx.commit().await?;
Comment on lines +134 to +139
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creating a queue now does a bunch of things, all are no-ops if the data already exists. metadata table, queue table, queue archive table, create index, insert metadata to queue metadata table

Ok(())
}

/// Send a message to the queue
pub async fn enqueue<T: Serialize>(
pub async fn send<T: Serialize>(
&self,
queue_name: &str,
message: &T,
Expand Down Expand Up @@ -195,6 +197,7 @@ async fn fetch_one_message<T: for<'de> Deserialize<'de>>(
query: &str,
connection: &Pool<Postgres>,
) -> Result<Option<Message<T>>, errors::PgmqError> {
// explore: .fetch_optional()
let row: Result<PgRow, Error> = sqlx::query(query).fetch_one(connection).await;
match row {
Ok(row) => {
Expand Down
71 changes: 66 additions & 5 deletions crates/pgmq/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,66 @@
pub const TABLE_PREFIX: &str = r#"pgmq"#;

pub fn create(name: &str) -> String {
pub fn init_queue(name: &str) -> Vec<String> {
vec![
create_meta(),
create_queue(name),
create_index(name),
create_archive(name),
insert_meta(name),
]
}

pub fn create_queue(name: &str) -> String {
format!(
"
CREATE TABLE IF NOT EXISTS {TABLE_PREFIX}_{name} (
msg_id BIGSERIAL,
read_ct INT DEFAULT 0,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc'),
vt TIMESTAMP WITH TIME ZONE,
message JSON
);
"
)
}

pub fn create_archive(name: &str) -> String {
format!(
"
CREATE TABLE IF NOT EXISTS {TABLE_PREFIX}_{name}_archive (
msg_id BIGSERIAL,
read_ct INT DEFAULT 0,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc'),
deleted_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc'),
vt TIMESTAMP WITH TIME ZONE,
message JSON
);
"
)
}

pub fn create_meta() -> String {
format!(
"
CREATE TABLE IF NOT EXISTS {TABLE_PREFIX}_meta (
queue_name VARCHAR UNIQUE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc')
);
"
)
}

pub fn insert_meta(name: &str) -> String {
format!(
"
INSERT INTO {TABLE_PREFIX}_meta (queue_name)
VALUES ('{name}')
ON CONFLICT
DO NOTHING;
"
)
}

pub fn create_index(name: &str) -> String {
format!(
"
Expand Down Expand Up @@ -46,15 +95,17 @@ pub fn read(name: &str, vt: &i32) -> String {
"
WITH cte AS
(
SELECT msg_id, vt, message
SELECT *
FROM {TABLE_PREFIX}_{name}
WHERE vt <= now() at time zone 'utc'
ORDER BY msg_id ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE {TABLE_PREFIX}_{name}
SET vt = (now() at time zone 'utc' + interval '{vt} seconds')
SET
vt = (now() at time zone 'utc' + interval '{vt} seconds'),
read_ct = read_ct + 1
WHERE msg_id = (select msg_id from cte)
RETURNING *;
"
Expand All @@ -70,12 +121,22 @@ pub fn delete(name: &str, msg_id: &i64) -> String {
)
}

pub fn archive(name: &str, msg_id: &i64) -> String {
format!(
"
INSERT INTO {TABLE_PREFIX}_{name}_archive
SELECT * FROM {TABLE_PREFIX}_{name}
WHERE msg_id = {msg_id};
"
)
}

pub fn pop(name: &str) -> String {
format!(
"
WITH cte AS
(
SELECT msg_id, vt, message
SELECT *
FROM {TABLE_PREFIX}_{name}
WHERE vt <= now() at time zone 'utc'
ORDER BY msg_id ASC
Expand All @@ -95,7 +156,7 @@ mod tests {

#[test]
fn test_create() {
let query = create("yolo");
let query = create_queue("yolo");
assert!(query.contains("pgmq_yolo"));
}

Expand Down
24 changes: 12 additions & 12 deletions crates/pgmq/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn test_lifecycle() {
let msg = serde_json::json!({
"foo": "bar"
});
let msg_id = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg_id = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg_id, 1);
let num_rows = rowcount(&test_queue, &queue.connection).await;

Expand Down Expand Up @@ -124,11 +124,11 @@ async fn test_fifo() {
let msg = serde_json::json!({
"foo": "bar1"
});
let msg_id1 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg_id1 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg_id1, 1);
let msg_id2 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg_id2 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg_id2, 2);
let msg_id3 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg_id3 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg_id3, 3);

let vt: i32 = 1;
Expand Down Expand Up @@ -183,7 +183,7 @@ async fn test_serde() {
foo: "bar".to_owned(),
num: rng.gen_range(0..100000),
};
let msg1 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg1 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg1, 1);

let msg_read = queue
Expand All @@ -200,7 +200,7 @@ async fn test_serde() {
"foo": "bar",
"num": rng.gen_range(0..100000)
});
let msg2 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg2 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg2, 2);

let msg_read = queue
Expand All @@ -219,7 +219,7 @@ async fn test_serde() {
"num": rng.gen_range(0..100000)
});

let msg3 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg3 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg3, 3);

let msg_read = queue
Expand All @@ -236,7 +236,7 @@ async fn test_serde() {
foo: "bar".to_owned(),
num: rng.gen_range(0..100000),
};
let msg4 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg4 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg4, 4);
let msg_read = queue
.read::<Value>(&test_queue, Some(&30_i32))
Expand All @@ -254,7 +254,7 @@ async fn test_serde() {
"foo": "bar".to_owned(),
"num": rng.gen_range(0..100000),
});
let msg5 = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg5 = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg5, 5);
let msg_read: crate::pgmq::Message = queue
.read(&test_queue, Some(&30_i32)) // no turbofish on this line
Expand All @@ -274,7 +274,7 @@ async fn test_pop() {
let test_queue = "test_pop_queue".to_owned();
let queue = init_queue(&test_queue).await;
let msg = MyMessage::default();
let msg = queue.enqueue(&test_queue, &msg).await.unwrap();
let msg = queue.send(&test_queue, &msg).await.unwrap();
assert_eq!(msg, 1);
let popped_msg = queue.pop::<MyMessage>(&test_queue).await.unwrap().unwrap();
assert_eq!(popped_msg.msg_id, 1);
Expand All @@ -291,7 +291,7 @@ async fn test_database_error_modes() {
.await
.expect("failed to connect to postgres");
// let's not create the queues and make sure we get an error
let msg_id = queue.enqueue("doesNotExist", &"foo").await;
let msg_id = queue.send("doesNotExist", &"foo").await;
assert!(msg_id.is_err());

// read from a queue that does not exist should error
Expand Down Expand Up @@ -321,7 +321,7 @@ async fn test_parsing_error_modes() {
let test_queue = "test_parsing_queue".to_owned();
let queue = init_queue(&test_queue).await;
let msg = MyMessage::default();
let _ = queue.enqueue(&test_queue, &msg).await.unwrap();
let _ = queue.send(&test_queue, &msg).await.unwrap();

// we sent MyMessage, so trying to parse into YoloMessage should error
let read_msg = queue.read::<YoloMessage>(&test_queue, Some(&10_i32)).await;
Expand Down
Loading