-
Notifications
You must be signed in to change notification settings - Fork 2
/
queue.rs
77 lines (62 loc) · 1.67 KB
/
queue.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use async_broadcast::{broadcast, Receiver, Sender};
use log::warn;
use tonic::Status;
use uuid::Uuid;
use crate::messages::Message;
pub type ChannelId = Uuid;
pub type QueueLabel = String;
pub struct Queue {
label: QueueLabel,
w: Sender<Result<Message, Status>>,
r: Receiver<Result<Message, Status>>,
}
pub struct ChannelReceiver {
pub id: ChannelId,
pub topic: Option<String>,
pub receiver: Receiver<Result<Message, Status>>,
}
pub struct BroadcastEnd {
sender: Sender<Result<Message, Status>>,
}
impl Queue {
pub fn new(k: usize, label: String) -> Queue {
let (w, mut r) = broadcast(k);
r.set_overflow(true);
Queue { label, w, r }
}
pub fn get_label(&self) -> &str {
return self.label.as_str();
}
pub fn duplicate_channel(&mut self, topic: Option<String>) -> ChannelReceiver {
let mut receiver = self.r.clone();
receiver.set_overflow(true);
let chan_receiver = ChannelReceiver {
id: Uuid::new_v4(),
topic,
receiver: receiver,
};
chan_receiver
}
pub fn get_broadcast_end(&self) -> BroadcastEnd {
BroadcastEnd {
sender: self.w.clone(),
}
}
pub fn destroy(self) {
self.w.close();
self.r.close();
}
}
impl BroadcastEnd {
pub fn broadcast(&self, msg: Message) -> Result<(), String> {
if let Err(err) = self.sender.try_broadcast(Ok(msg)) {
warn!("Failed to broadcast message: {err}");
Err("Broadcast error".to_string())
} else {
Ok(())
}
}
pub fn close(self) -> bool {
self.sender.close()
}
}