/
client.rs
117 lines (111 loc) · 3.98 KB
/
client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// SPDX-License-Identifier: Apache-2.0 AND MIT
//! `Client` and `Connection` structs
use std::default::Default;
/// A [non-consuming] [Connection] builder.
///
/// [Connection]: struct.Connection.html
/// [non-consuming]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html#non-consuming-builders-(preferred):
pub struct Client {
props: lapin::ConnectionProperties,
}
impl Client {
pub fn new() -> Self {
Self {
..Default::default()
}
}
pub async fn connect(&self, uri: &str) -> crate::Result<Connection> {
let c = lapin::Connection::connect(uri, self.props.clone())
.await
.map_err(crate::Error::from)?;
Ok(Connection(c))
}
}
impl Default for Client {
fn default() -> Self {
Self {
props: lapin::ConnectionProperties::default(),
}
}
}
/// A [non-consuming] [ProducerBuilder] and [ConsumerBuilder] builder.
///
/// [ProducerBuilder]: ../produce/struct.ProducerBuilder.html
/// [ConsumerBuilder]: ../consume/struct.ConsumerBuilder.html
/// [non-consuming]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html#non-consuming-builders-(preferred):
#[derive(Clone)]
pub struct Connection(lapin::Connection);
#[derive(Clone)]
pub struct QueueOptions {
pub kind: lapin::ExchangeKind,
pub ex_opts: lapin::options::ExchangeDeclareOptions,
pub ex_field: lapin::types::FieldTable,
pub queue_opts: lapin::options::QueueDeclareOptions,
pub queue_field: lapin::types::FieldTable,
pub bind_opts: lapin::options::QueueBindOptions,
pub bind_field: lapin::types::FieldTable,
}
impl Connection {
/// Build a [non-consuming] [ProducerBuilder].
///
/// [ProducerBuilder]: ../consume/struct.ProducerBuilder.html
/// [non-consuming]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html#non-consuming-builders-(preferred):
pub fn producer_builder(&self) -> crate::ProducerBuilder {
crate::ProducerBuilder::new(self.clone())
}
/// Build a [non-consuming] [ConsumerBuilder].
///
/// [ConsumerBuilder]: ../consume/struct.ConsumerBuilder.html
/// [non-consuming]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html#non-consuming-builders-(preferred):
pub fn consumer_builder(&self) -> crate::ConsumerBuilder {
crate::ConsumerBuilder::new(self.clone())
}
/// channel creates a channel over the [Connection]
/// and returns the `Future<Output = <lapin::Channel>>`.
pub async fn channel(&self) -> crate::Result<lapin::Channel> {
self.0.create_channel().await.map_err(crate::Error::from)
}
/// queue creates a channel and a queue over the [Connection]
/// and returns the `Future<Output = <lapin::Channel, lapin::Queue>>`.
pub async fn queue(
&self,
ex: &str,
queue: &str,
opts: QueueOptions,
) -> crate::Result<(lapin::Channel, lapin::Queue)> {
let ch = self.0.create_channel().await.map_err(crate::Error::from)?;
let q = ch
.queue_declare(queue, opts.queue_opts, opts.queue_field)
.await
.map_err(crate::Error::from)?;
if Self::is_default_exchange(ex) {
// We don't need to bind to the exchange in case of the default
// exchange.
return Ok((ch, q));
}
ch.exchange_declare(ex, opts.kind, opts.ex_opts, opts.ex_field)
.await
.map_err(crate::Error::from)?;
let routing_key = if Self::is_ephemeral_queue(queue) {
q.name().as_str()
} else {
queue
};
ch.queue_bind(
queue,
ex,
routing_key,
opts.bind_opts.clone(),
opts.bind_field.clone(),
)
.await
.map_err(crate::Error::from)?;
Ok((ch, q))
}
fn is_default_exchange(name: &str) -> bool {
name == crate::DEFAULT_EXCHANGE
}
fn is_ephemeral_queue(name: &str) -> bool {
name == crate::EPHEMERAL_QUEUE
}
}