-
Notifications
You must be signed in to change notification settings - Fork 85
/
Copy pathbasic.rs
157 lines (140 loc) · 5.12 KB
/
basic.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use pgmq::{errors::PgmqError, Message, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
// Initialize a connection to Postgres
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
// Create a queue
println!("Creating a queue 'my_queue'");
let my_queue = "my_basic_queue".to_owned();
queue
.create(&my_queue)
.await
.expect("Failed to create queue");
// Send a message as JSON
let json_message = serde_json::json!({
"foo": "bar"
});
println!("Enqueueing a JSON message: {json_message}");
let json_message_id: i64 = queue
.send(&my_queue, &json_message)
.await
.expect("Failed to enqueue message");
// Messages can also be sent from structs
#[derive(Serialize, Debug, Deserialize)]
struct MyMessage {
foo: String,
}
let struct_message = MyMessage {
foo: "bar".to_owned(),
};
println!("Enqueueing a struct message: {struct_message:?}");
let struct_message_id: i64 = queue
.send(&my_queue, &struct_message)
.await
.expect("Failed to enqueue message");
// Use a visibility timeout of 30 seconds.
//
// Messages that are not deleted within the
// visibility timeout will return to the queue.
let visibility_timeout_seconds: i32 = 30;
// Read the JSON message
let received_json_message: Message<Value> = queue
.read::<Value>(&my_queue, Some(visibility_timeout_seconds))
.await
.unwrap()
.expect("No messages in the queue");
println!("Received a message: {received_json_message:?}");
// Compare message IDs
assert_eq!(received_json_message.msg_id, json_message_id);
// Read the struct message
let received_struct_message: Message<MyMessage> = queue
.read::<MyMessage>(&my_queue, Some(visibility_timeout_seconds))
.await
.unwrap()
.expect("No messages in the queue");
println!("Received a message: {received_struct_message:?}");
assert_eq!(received_struct_message.msg_id, struct_message_id);
// Delete the messages to remove them from the queue
let _ = queue
.delete(&my_queue, received_json_message.msg_id)
.await
.expect("Failed to delete message");
let _ = queue
.delete(&my_queue, received_struct_message.msg_id)
.await
.expect("Failed to delete message");
println!("Deleted the messages from the queue");
// No messages are remaining
let no_message: Option<Message<Value>> = queue
.read::<Value>(&my_queue, Some(visibility_timeout_seconds))
.await
.unwrap();
assert!(no_message.is_none());
// We can also send and receive messages in batches
// Send a batch of JSON messages
let json_message_batch = vec![
serde_json::json!({"foo": "bar1"}),
serde_json::json!({"foo": "bar2"}),
serde_json::json!({"foo": "bar3"}),
];
println!("Enqueuing a batch of messages: {json_message_batch:?}");
let json_message_batch_ids = queue
.send_batch(&my_queue, &json_message_batch)
.await
.expect("Failed to enqueue messages");
// Receive a batch of JSON messages
let batch_size = 3;
let batch: Vec<Message<Value>> = queue
.read_batch::<Value>(&my_queue, Some(visibility_timeout_seconds), batch_size)
.await
.unwrap()
.expect("no messages in the queue!");
println!("Received a batch of messages: {batch:?}");
for (_, message) in batch.iter().enumerate() {
assert!(json_message_batch_ids.contains(&message.msg_id));
let _ = queue
.delete(&my_queue, message.msg_id)
.await
.expect("Failed to delete message");
println!("Deleted message {}", message.msg_id);
}
// Send a batch of struct messages
let struct_message_batch = vec![
MyMessage {
foo: "bar1".to_owned(),
},
MyMessage {
foo: "bar2".to_owned(),
},
MyMessage {
foo: "bar3".to_owned(),
},
];
println!("Enqueuing a batch of messages: {struct_message_batch:?}");
let struct_message_batch_ids = queue
.send_batch(&my_queue, &struct_message_batch)
.await
.expect("Failed to enqueue messages");
// Receive a batch of struct messages
let batch_size = 3;
let batch: Vec<Message<MyMessage>> = queue
.read_batch::<MyMessage>(&my_queue, Some(visibility_timeout_seconds), batch_size)
.await
.unwrap()
.expect("no messages in the queue!");
println!("Received a batch of messages: {batch:?}");
for (_, message) in batch.iter().enumerate() {
assert!(struct_message_batch_ids.contains(&message.msg_id));
let _ = queue
.delete(&my_queue, message.msg_id)
.await
.expect("Failed to delete message");
println!("Deleted message {}", message.msg_id);
}
Ok(())
}