Skip to content

Commit

Permalink
Allow running SharedConnection with any other runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
hadashiA committed Aug 15, 2019
1 parent 6fc9632 commit 0a7d6a2
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 17 deletions.
36 changes: 21 additions & 15 deletions src/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ use std::net::ToSocketAddrs;
use tokio_uds::UnixStream;

use tokio_codec::{Decoder, Framed};
use tokio_executor;
use tokio_io::{self, AsyncWrite};
use tokio_tcp::TcpStream;

use futures::future::Either;
use futures::future::{Either, Executor};
use futures::{future, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
use tokio_sync::{mpsc, oneshot};

Expand Down Expand Up @@ -431,18 +430,21 @@ where
T::Error: Send,
T::Error: ::std::fmt::Debug,
{
fn new(sink_stream: T) -> Self {
fn new<E>(sink_stream: T, executor: E) -> Self
where
E: Executor<Box<dyn Future<Item = (), Error = ()> + Send>>,
{
const BUFFER_SIZE: usize = 50;
let (sender, receiver) = mpsc::channel(BUFFER_SIZE);
tokio_executor::spawn(
receiver
.map_err(|_| ())
.forward(PipelineSink {
sink_stream,
in_flight: VecDeque::new(),
})
.map(|_| ()),
);
let f = receiver
.map_err(|_| ())
.forward(PipelineSink {
sink_stream,
in_flight: VecDeque::new(),
})
.map(|_| ());

executor.execute(Box::new(f));
Pipeline(sender)
}

Expand Down Expand Up @@ -498,17 +500,20 @@ pub struct SharedConnection {
}

impl SharedConnection {
pub fn new(con: Connection) -> impl Future<Item = Self, Error = RedisError> {
pub fn new<E>(con: Connection, executor: E) -> impl Future<Item = Self, Error = RedisError>
where
E: Executor<Box<dyn Future<Item = (), Error = ()> + Send>>,
{
future::lazy(|| {
let pipeline = match con.con {
ActualConnection::Tcp(tcp) => {
let codec = ValueCodec::default().framed(tcp.into_inner());
ActualPipeline::Tcp(Pipeline::new(codec))
ActualPipeline::Tcp(Pipeline::new(codec, executor))
}
#[cfg(unix)]
ActualConnection::Unix(unix) => {
let codec = ValueCodec::default().framed(unix.into_inner());
ActualPipeline::Unix(Pipeline::new(codec))
ActualPipeline::Unix(Pipeline::new(codec, executor))
}
};
Ok(SharedConnection {
Expand All @@ -517,6 +522,7 @@ impl SharedConnection {
})
})
}

}

impl ConnectionLike for SharedConnection {
Expand Down
16 changes: 15 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use futures::Future;

use connection::{connect, Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo};
use types::{RedisError, RedisResult, Value};
use futures::future::Executor;

/// The client type.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -53,9 +54,22 @@ impl Client {
pub fn get_shared_async_connection(
&self,
) -> impl Future<Item = ::aio::SharedConnection, Error = RedisError> {
let executor = tokio_executor::DefaultExecutor::current();
self.get_async_connection()
.and_then(move |con| ::aio::SharedConnection::new(con))
.and_then(move |con| ::aio::SharedConnection::new(con, executor))
}

pub fn get_shared_async_connection_with_executor<E>(
&self,
executor: E,
) -> impl Future<Item = ::aio::SharedConnection, Error = RedisError>
where
E: Executor<Box<dyn Future<Item = (), Error = ()> + Send>>,
{
self.get_async_connection()
.and_then(move |con| ::aio::SharedConnection::new(con, executor))
}

}

impl ConnectionLike for Client {
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ extern crate sha1;
extern crate url;
#[macro_use]
extern crate futures;
extern crate tokio_executor;
#[macro_use]
extern crate tokio_io;
extern crate tokio_codec;
Expand Down

0 comments on commit 0a7d6a2

Please sign in to comment.