diff --git a/futures/src/channel.rs b/futures/src/channel.rs index b59620cd..98f53d57 100644 --- a/futures/src/channel.rs +++ b/futures/src/channel.rs @@ -146,10 +146,10 @@ pub struct ChannelFlowOptions { impl Channel { /// create a channel - pub fn create(transport: Arc>>) -> Box + Send + 'static> { + pub fn create(transport: Arc>>) -> impl Future + Send + 'static { let channel_transport = transport.clone(); - Box::new(future::poll_fn(move || { + future::poll_fn(move || { let mut transport = lock_transport!(channel_transport); if let Some(id) = transport.conn.create_channel() { return Ok(Async::Ready(Channel { @@ -180,77 +180,77 @@ impl Channel { }).map(move |_| { channel }) - })) + }) } /// request access /// /// returns a future that resolves once the access is granted - pub fn access_request(&self, realm: &str, options: AccessRequestOptions) -> Box + Send + 'static> { + pub fn access_request(&self, realm: &str, options: AccessRequestOptions) -> impl Future + Send + 'static { let channel_id = self.id; let realm = realm.to_string(); - Box::new(self.run_on_locked_transport("access_request", "Could not request access", move |transport| { + self.run_on_locked_transport("access_request", "Could not request access", move |transport| { transport.conn.access_request(channel_id, realm, options.exclusive, options.passive, options.active, options.write, options.read).map(Some) - }).map(|_| ())) + }).map(|_| ()) } /// declares an exchange /// /// returns a future that resolves once the exchange is available - pub fn exchange_declare(&self, name: &str, exchange_type: &str, options: ExchangeDeclareOptions, arguments: FieldTable) -> Box + Send + 'static> { + pub fn exchange_declare(&self, name: &str, exchange_type: &str, options: ExchangeDeclareOptions, arguments: FieldTable) -> impl Future + Send + 'static { let channel_id = self.id; let name = name.to_string(); let exchange_type = exchange_type.to_string(); - Box::new(self.run_on_locked_transport("exchange_declare", "Could not declare exchange", move |transport| { + self.run_on_locked_transport("exchange_declare", "Could not declare exchange", move |transport| { transport.conn.exchange_declare(channel_id, options.ticket, name, exchange_type, options.passive, options.durable, options.auto_delete, options.internal, options.nowait, arguments).map(Some) - }).map(|_| ())) + }).map(|_| ()) } /// deletes an exchange /// /// returns a future that resolves once the exchange is deleted - pub fn exchange_delete(&self, name: &str, options: ExchangeDeleteOptions) -> Box + Send + 'static> { + pub fn exchange_delete(&self, name: &str, options: ExchangeDeleteOptions) -> impl Future + Send + 'static { let channel_id = self.id; let name = name.to_string(); - Box::new(self.run_on_locked_transport("exchange_delete", "Could not delete exchange", move |transport| { + self.run_on_locked_transport("exchange_delete", "Could not delete exchange", move |transport| { transport.conn.exchange_delete(channel_id, options.ticket, name, options.if_unused, options.nowait).map(Some) - }).map(|_| ())) + }).map(|_| ()) } /// binds an exchange to another exchange /// /// returns a future that resolves once the exchanges are bound - pub fn exchange_bind(&self, destination: &str, source: &str, routing_key: &str, options: ExchangeBindOptions, arguments: FieldTable) -> Box + Send + 'static> { + pub fn exchange_bind(&self, destination: &str, source: &str, routing_key: &str, options: ExchangeBindOptions, arguments: FieldTable) -> impl Future + Send + 'static { let channel_id = self.id; let destination = destination.to_string(); let source = source.to_string(); let routing_key = routing_key.to_string(); - Box::new(self.run_on_locked_transport("exchange_bind", "Could not bind exchange", move |transport| { + self.run_on_locked_transport("exchange_bind", "Could not bind exchange", move |transport| { transport.conn.exchange_bind(channel_id, options.ticket, destination, source, routing_key, options.nowait, arguments).map(Some) - }).map(|_| ())) + }).map(|_| ()) } /// unbinds an exchange from another one /// /// returns a future that resolves once the exchanges are unbound - pub fn exchange_unbind(&self, destination: &str, source: &str, routing_key: &str, options: ExchangeUnbindOptions, arguments: FieldTable) -> Box + Send + 'static> { + pub fn exchange_unbind(&self, destination: &str, source: &str, routing_key: &str, options: ExchangeUnbindOptions, arguments: FieldTable) -> impl Future + Send + 'static { let channel_id = self.id; let destination = destination.to_string(); let source = source.to_string(); let routing_key = routing_key.to_string(); - Box::new(self.run_on_locked_transport("exchange_unbind", "Could not unbind exchange", move |transport| { + self.run_on_locked_transport("exchange_unbind", "Could not unbind exchange", move |transport| { transport.conn.exchange_unbind(channel_id, options.ticket, destination, source, routing_key, options.nowait, arguments).map(Some) - }).map(|_| ())) + }).map(|_| ()) } /// declares a queue @@ -259,12 +259,12 @@ impl Channel { /// /// the `mandatory` and `ìmmediate` options can be set to true, /// but the return message will not be handled - pub fn queue_declare(&self, name: &str, options: QueueDeclareOptions, arguments: FieldTable) -> Box + Send + 'static> { + pub fn queue_declare(&self, name: &str, options: QueueDeclareOptions, arguments: FieldTable) -> impl Future + Send + 'static { let channel_id = self.id; let name = name.to_string(); let transport = self.transport.clone(); - Box::new(self.run_on_locked_transport("queue_declare", "Could not declare queue", move |transport| { + self.run_on_locked_transport("queue_declare", "Could not declare queue", move |transport| { transport.conn.queue_declare(channel_id, options.ticket, name, options.passive, options.durable, options.exclusive, options.auto_delete, options.nowait, arguments).map(Some) }).and_then(|request_id| { @@ -277,54 +277,54 @@ impl Channel { return Ok(Async::NotReady) } }) - }).map(Queue::new)) + }).map(Queue::new) } /// binds a queue to an exchange /// /// returns a future that resolves once the queue is bound to the exchange - pub fn queue_bind(&self, name: &str, exchange: &str, routing_key: &str, options: QueueBindOptions, arguments: FieldTable) -> Box + Send + 'static> { + pub fn queue_bind(&self, name: &str, exchange: &str, routing_key: &str, options: QueueBindOptions, arguments: FieldTable) -> impl Future + Send + 'static { let channel_id = self.id; let name = name.to_string(); let exchange = exchange.to_string(); let routing_key = routing_key.to_string(); - Box::new(self.run_on_locked_transport("queue_bind", "Could not bind queue", move |transport| { + self.run_on_locked_transport("queue_bind", "Could not bind queue", move |transport| { transport.conn.queue_bind(channel_id, options.ticket, name, exchange, routing_key, options.nowait, arguments).map(Some) - }).map(|_| ())) + }).map(|_| ()) } /// unbinds a queue from the exchange /// /// returns a future that resolves once the queue is unbound from the exchange - pub fn queue_unbind(&self, name: &str, exchange: &str, routing_key: &str, options: QueueUnbindOptions, arguments: FieldTable) -> Box + Send + 'static> { + pub fn queue_unbind(&self, name: &str, exchange: &str, routing_key: &str, options: QueueUnbindOptions, arguments: FieldTable) -> impl Future + Send + 'static { let channel_id = self.id; let name = name.to_string(); let exchange = exchange.to_string(); let routing_key = routing_key.to_string(); - Box::new(self.run_on_locked_transport("queue_unbind", "Could not unbind queue from the exchange", move |transport| { + self.run_on_locked_transport("queue_unbind", "Could not unbind queue from the exchange", move |transport| { transport.conn.queue_unbind(channel_id, options.ticket, name, exchange, routing_key, arguments).map(Some) - }).map(|_| ())) + }).map(|_| ()) } /// sets up confirm extension for this channel - pub fn confirm_select(&self, options: ConfirmSelectOptions) -> Box + Send + 'static> { + pub fn confirm_select(&self, options: ConfirmSelectOptions) -> impl Future + Send + 'static { let channel_id = self.id; - Box::new(self.run_on_locked_transport("confirm_select", "Could not activate confirm extension", move |transport| { + self.run_on_locked_transport("confirm_select", "Could not activate confirm extension", move |transport| { transport.conn.confirm_select(channel_id, options.nowait).map(Some) - }).map(|_| ())) + }).map(|_| ()) } /// specifies quality of service for a channel - pub fn basic_qos(&self, options: BasicQosOptions) -> Box + Send + 'static> { + pub fn basic_qos(&self, options: BasicQosOptions) -> impl Future + Send + 'static { let channel_id = self.id; - Box::new(self.run_on_locked_transport("basic_qos", "Could not setup qos", move |transport| { + self.run_on_locked_transport("basic_qos", "Could not setup qos", move |transport| { transport.conn.basic_qos(channel_id, options.prefetch_size, options.prefetch_count, options.global).map(|_| None) - }).map(|_| ())) + }).map(|_| ()) } /// publishes a message on a queue @@ -332,12 +332,12 @@ impl Channel { /// the future's result is: /// - `Some(request_id)` if we're on a confirm channel and the message was ack'd /// - `None` if we're not on a confirm channel or the message was nack'd - pub fn basic_publish(&self, exchange: &str, routing_key: &str, payload: &[u8], options: BasicPublishOptions, properties: BasicProperties) -> Box, Error = io::Error> + Send + 'static> { + pub fn basic_publish(&self, exchange: &str, routing_key: &str, payload: &[u8], options: BasicPublishOptions, properties: BasicProperties) -> impl Future, Error = io::Error> + Send + 'static { let channel_id = self.id; let exchange = exchange.to_string(); let routing_key = routing_key.to_string(); - Box::new(self.run_on_locked_transport_full("basic_publish", "Could not publish", move |transport| { + self.run_on_locked_transport_full("basic_publish", "Could not publish", move |transport| { transport.conn.basic_publish(channel_id, options.ticket, exchange, routing_key, options.mandatory, options.immediate).map(Some) }, move |conn, delivery_tag| { @@ -356,7 +356,7 @@ impl Channel { None } }).unwrap_or(Ok(Async::Ready(None))) - }, Some((payload.to_vec(), properties)))) + }, Some((payload.to_vec(), properties))) } /// creates a consumer stream @@ -365,7 +365,7 @@ impl Channel { /// /// `Consumer` implements `futures::Stream`, so it can be used with any of /// the usual combinators - pub fn basic_consume(&self, queue: &Queue, consumer_tag: &str, options: BasicConsumeOptions, arguments: FieldTable) -> Box, Error = io::Error> + Send + 'static> { + pub fn basic_consume(&self, queue: &Queue, consumer_tag: &str, options: BasicConsumeOptions, arguments: FieldTable) -> impl Future, Error = io::Error> + Send + 'static { let channel_id = self.id; let transport = self.transport.clone(); let consumer_tag = consumer_tag.to_string(); @@ -378,7 +378,7 @@ impl Channel { registered: false, }; - Box::new(self.run_on_locked_transport("basic_consume", "Could not start consumer", move |transport| { + self.run_on_locked_transport("basic_consume", "Could not start consumer", move |transport| { transport.conn.basic_consume(channel_id, options.ticket, queue_name, consumer_tag, options.no_local, options.no_ack, options.exclusive, options.no_wait, arguments).map(Some) }).and_then(move |request_id| { @@ -395,38 +395,38 @@ impl Channel { trace!("basic_consume received response, returning consumer"); consumer.consumer_tag = consumer_tag; consumer - })) + }) } /// acks a message - pub fn basic_ack(&self, delivery_tag: u64) -> Box + Send + 'static> { + pub fn basic_ack(&self, delivery_tag: u64) -> impl Future + Send + 'static { let channel_id = self.id; - Box::new(self.run_on_locked_transport("basic_ack", "Could not ack message", move |transport| { + self.run_on_locked_transport("basic_ack", "Could not ack message", move |transport| { transport.conn.basic_ack(channel_id, delivery_tag, false).map(|_| None) - }).map(|_| ())) + }).map(|_| ()) } /// nacks a message - pub fn basic_nack(&self, delivery_tag: u64, requeue: bool) -> Box + Send + 'static> { + pub fn basic_nack(&self, delivery_tag: u64, requeue: bool) -> impl Future + Send + 'static { let channel_id = self.id; - Box::new(self.run_on_locked_transport("basic_nack", "Could not nack message", move |transport| { + self.run_on_locked_transport("basic_nack", "Could not nack message", move |transport| { transport.conn.basic_nack(channel_id, delivery_tag, false, requeue).map(|_| None) - }).map(|_| ())) + }).map(|_| ()) } /// rejects a message - pub fn basic_reject(&self, delivery_tag: u64, requeue: bool) -> Box + Send + 'static> { + pub fn basic_reject(&self, delivery_tag: u64, requeue: bool) -> impl Future + Send + 'static { let channel_id = self.id; - Box::new(self.run_on_locked_transport("basic_reject", "Could not reject message", move |transport| { + self.run_on_locked_transport("basic_reject", "Could not reject message", move |transport| { transport.conn.basic_reject(channel_id, delivery_tag, requeue).map(|_| None) - }).map(|_| ())) + }).map(|_| ()) } /// gets a message - pub fn basic_get(&self, queue: &str, options: BasicGetOptions) -> Box + Send + 'static> { + pub fn basic_get(&self, queue: &str, options: BasicGetOptions) -> impl Future + Send + 'static { let channel_id = self.id; let _queue = queue.to_string(); let queue = queue.to_string(); @@ -440,7 +440,7 @@ impl Channel { Ok(Async::NotReady) }); - Box::new(self.run_on_locked_transport_full("basic_get", "Could not get message", move |transport| { + self.run_on_locked_transport_full("basic_get", "Could not get message", move |transport| { transport.conn.basic_get(channel_id, options.ticket, queue, options.no_ack).map(Some) }, |conn, request_id| { match conn.finished_get_result(request_id) { @@ -454,19 +454,19 @@ impl Channel { Ok(Async::NotReady) } } - }, None).and_then(|_| receive_future)) + }, None).and_then(|_| receive_future) } /// Purge a queue. /// /// This method removes all messages from a queue which are not awaiting acknowledgment. - pub fn queue_purge(&self, queue_name: &str, options: QueuePurgeOptions) -> Box + Send + 'static> { + pub fn queue_purge(&self, queue_name: &str, options: QueuePurgeOptions) -> impl Future + Send + 'static { let channel_id = self.id; let queue_name = queue_name.to_string(); - Box::new(self.run_on_locked_transport("queue_purge", "Could not purge queue", move |transport| { + self.run_on_locked_transport("queue_purge", "Could not purge queue", move |transport| { transport.conn.queue_purge(channel_id, options.ticket, queue_name, options.nowait).map(Some) - }).map(|_| ())) + }).map(|_| ()) } /// Delete a queue. @@ -478,53 +478,53 @@ impl Channel { /// If the queue has consumers the server does not delete it but raises a channel exception instead. /// /// If `if_empty` is set, the server will only delete the queue if it has no messages. - pub fn queue_delete(&self, queue_name: &str, options: QueueDeleteOptions) -> Box + Send + 'static> { + pub fn queue_delete(&self, queue_name: &str, options: QueueDeleteOptions) -> impl Future + Send + 'static { let channel_id = self.id; let queue_name = queue_name.to_string(); - Box::new(self.run_on_locked_transport("queue_purge", "Could not purge queue", move |transport| { + self.run_on_locked_transport("queue_purge", "Could not purge queue", move |transport| { transport.conn.queue_delete(channel_id, options.ticket, queue_name, options.if_unused, options.if_empty, options.no_wait).map(Some) - }).map(|_| ())) + }).map(|_| ()) } /// closes the channel - pub fn close(&self, code: u16, message: &str) -> Box + Send + 'static> { + pub fn close(&self, code: u16, message: &str) -> impl Future + Send + 'static { let channel_id = self.id; let message = message.to_string(); - Box::new(self.run_on_locked_transport("close", "Could not close channel", move |transport| { + self.run_on_locked_transport("close", "Could not close channel", move |transport| { transport.conn.channel_close(channel_id, code, message, 0, 0).map(|_| None) - }).map(|_| ())) + }).map(|_| ()) } /// ack a channel close - pub fn close_ok(&self) -> Box + Send + 'static> { + pub fn close_ok(&self) -> impl Future + Send + 'static { let channel_id = self.id; - Box::new(self.run_on_locked_transport("close_ok", "Could not ack closed channel", move |transport| { + self.run_on_locked_transport("close_ok", "Could not ack closed channel", move |transport| { transport.conn.channel_close_ok(channel_id).map(|_| None) - }).map(|_| ())) + }).map(|_| ()) } /// update a channel flow - pub fn channel_flow(&self, options: ChannelFlowOptions) -> Box + Send + 'static> { + pub fn channel_flow(&self, options: ChannelFlowOptions) -> impl Future + Send + 'static { let channel_id = self.id; - Box::new(self.run_on_locked_transport("channel_flow", "Could not update channel flow", move |transport| { + self.run_on_locked_transport("channel_flow", "Could not update channel flow", move |transport| { transport.conn.channel_flow(channel_id, options.active).map(|_| None) - }).map(|_| ())) + }).map(|_| ()) } /// ack an update to a channel flow - pub fn channel_flow_ok(&self, options: ChannelFlowOptions) -> Box + Send + 'static> { + pub fn channel_flow_ok(&self, options: ChannelFlowOptions) -> impl Future + Send + 'static { let channel_id = self.id; - Box::new(self.run_on_locked_transport("channel_flow_ok", "Could not ack update to channel flow", move |transport| { + self.run_on_locked_transport("channel_flow_ok", "Could not ack update to channel flow", move |transport| { transport.conn.channel_flow_ok(channel_id, options.active).map(|_| None) - }).map(|_| ())) + }).map(|_| ()) } - fn run_on_locked_transport_full(&self, method: &str, error: &str, action: Action, finished: Finished, payload: Option<(Vec, BasicProperties)>) -> Box, Error = io::Error> + Send + 'static> + fn run_on_locked_transport_full(&self, method: &str, error: &str, action: Action, finished: Finished, payload: Option<(Vec, BasicProperties)>) -> impl Future, Error = io::Error> + Send + 'static where Action: 'static + Send + FnOnce(&mut AMQPTransport) -> Result, lapin_async::error::Error>, Finished: 'static + Send + Fn(&mut Connection, RequestId) -> Poll, io::Error> { trace!("run on locked transport; method={:?}", method); @@ -538,7 +538,7 @@ impl Channel { let mut action = Some(action); let mut payload = Some(payload); - Box::new(future::poll_fn(move || { + future::poll_fn(move || { let mut transport = lock_transport!(transport); // The poll_fn here is only there for the lock_transport call above. // Once the lock_transport yields a Async::Ready transport, the rest of the function is @@ -564,7 +564,7 @@ impl Channel { trace!("{} returning closure", method); } - Box::new(future::poll_fn(move || { + future::poll_fn(move || { let mut transport = lock_transport!(_transport); if let Some(request_id) = request_id { @@ -572,8 +572,8 @@ impl Channel { } else { transport.poll().map(|r| r.map(|_| None)) } - })) - })) + }) + }) } fn run_on_lock_transport_basic_finished(conn: &mut Connection, request_id: RequestId) -> Poll, io::Error> { @@ -586,7 +586,7 @@ impl Channel { } } - fn run_on_locked_transport(&self, method: &str, error: &str, action: Action) -> Box, Error = io::Error> + Send + 'static> + fn run_on_locked_transport(&self, method: &str, error: &str, action: Action) -> impl Future, Error = io::Error> + Send + 'static where Action: 'static + Send + FnOnce(&mut AMQPTransport) -> Result, lapin_async::error::Error> { self.run_on_locked_transport_full(method, error, action, Self::run_on_lock_transport_basic_finished, None) } diff --git a/futures/src/client.rs b/futures/src/client.rs index abd0cea0..5d9c3b5e 100644 --- a/futures/src/client.rs +++ b/futures/src/client.rs @@ -207,35 +207,35 @@ impl Client { /// ); /// # } /// ``` - pub fn connect(stream: T, options: ConnectionOptions) -> Box + Send> { - Box::new(AMQPTransport::connect(stream, options).and_then(|transport| { + pub fn connect(stream: T, options: ConnectionOptions) -> impl Future + Send { + AMQPTransport::connect(stream, options).and_then(|transport| { debug!("got client service"); let configuration = transport.conn.configuration.clone(); let transport = Arc::new(Mutex::new(transport)); let heartbeat = Heartbeat::new(transport.clone(), configuration.heartbeat); let client = Client { configuration, transport }; Ok((client, heartbeat)) - })) + }) } /// creates a new channel /// /// returns a future that resolves to a `Channel` once the method succeeds - pub fn create_channel(&self) -> Box, Error = io::Error> + Send> { + pub fn create_channel(&self) -> impl Future, Error = io::Error> + Send { Channel::create(self.transport.clone()) } /// returns a future that resolves to a `Channel` once the method succeeds /// the channel will support RabbitMQ's confirm extension - pub fn create_confirm_channel(&self, options: ConfirmSelectOptions) -> Box, Error = io::Error> + Send> { + pub fn create_confirm_channel(&self, options: ConfirmSelectOptions) -> impl Future, Error = io::Error> + Send { //FIXME: maybe the confirm channel should be a separate type //especially, if we implement transactions, the methods should be available on the original channel //but not on the confirm channel. And the basic publish method should have different results - Box::new(self.create_channel().and_then(move |channel| { + self.create_channel().and_then(move |channel| { let ch = channel.clone(); channel.confirm_select(options).map(|_| ch) - })) + }) } } diff --git a/futures/src/transport.rs b/futures/src/transport.rs index e9f09a00..ce65eb1e 100644 --- a/futures/src/transport.rs +++ b/futures/src/transport.rs @@ -116,14 +116,14 @@ impl AMQPTransport /// starts the connection process /// /// returns a future of a `AMQPTransport` that is connected - pub fn connect(stream: T, options: ConnectionOptions) -> Box, Error = io::Error> + Send> { + pub fn connect(stream: T, options: ConnectionOptions) -> impl Future, Error = io::Error> + Send { let mut conn = Connection::new(); conn.set_credentials(&options.username, &options.password); conn.set_vhost(&options.vhost); conn.set_frame_max(options.frame_max); conn.set_heartbeat(options.heartbeat); - Box::new(future::result(conn.connect()).map_err(|e| { + future::result(conn.connect()).map_err(|e| { let err = format!("Failed to connect: {:?}", e); Error::new(ErrorKind::ConnectionAborted, err) }).and_then(|_| { @@ -139,7 +139,7 @@ impl AMQPTransport AMQPTransportConnector { transport: Some(t), } - })) + }) } /// Send a frame to the broker.