Skip to content

Commit

Permalink
Allow passing routing information to cluster. (#899)
Browse files Browse the repository at this point in the history
* Expose routing functions and types.

This will allow users to implement their own Routables, and find their
own routing.

* async cluster: Allow manual command routing.

* Require passing routing info.

* Apply suggestions from code review

Co-authored-by: James Lucas <jaymell@users.noreply.github.com>

* rename functions

* add tests.

* fix test

* fix lint

* remov bind check, it's not included in redis 6.2

---------

Co-authored-by: James Lucas <jaymell@users.noreply.github.com>
  • Loading branch information
nihohit and jaymell committed Sep 18, 2023
1 parent aa9f15d commit 5e977e9
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 106 deletions.
190 changes: 105 additions & 85 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,89 @@ where
ClusterConnection(tx)
})
}

/// Send a command to the given `routing`, and aggregate the response according to `response_policy`.
/// If `routing` is [None], the request will be sent to a random node.
pub async fn route_command(
&mut self,
cmd: &Cmd,
routing: RoutingInfo,
response_policy: Option<ResponsePolicy>,
) -> RedisResult<Value> {
trace!("send_packed_command");
let (sender, receiver) = oneshot::channel();
self.0
.send(Message {
cmd: CmdArg::Cmd {
cmd: Arc::new(cmd.clone()), // TODO Remove this clone?
func: |mut conn, cmd| {
Box::pin(async move {
conn.req_packed_command(&cmd).await.map(Response::Single)
})
},
routing,
response_policy,
},
sender,
})
.await
.map_err(|_| {
RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to send command",
))
})?;
receiver
.await
.unwrap_or_else(|_| {
Err(RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to receive command",
)))
})
.map(|response| match response {
Response::Single(value) => value,
Response::Multiple(_) => unreachable!(),
})
}

/// Send commands in `pipeline` to the given `route`. If `route` is [None], it will be sent to a random node.
pub async fn route_pipeline<'a>(
&'a mut self,
pipeline: &'a crate::Pipeline,
offset: usize,
count: usize,
route: SingleNodeRoutingInfo,
) -> RedisResult<Vec<Value>> {
let (sender, receiver) = oneshot::channel();
self.0
.send(Message {
cmd: CmdArg::Pipeline {
pipeline: Arc::new(pipeline.clone()), // TODO Remove this clone?
offset,
count,
func: |mut conn, pipeline, offset, count| {
Box::pin(async move {
conn.req_packed_commands(&pipeline, offset, count)
.await
.map(Response::Multiple)
})
},
route,
},
sender,
})
.await
.map_err(|_| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))?;

receiver
.await
.unwrap_or_else(|_| Err(RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe))))
.map(|response| match response {
Response::Multiple(values) => values,
Response::Single(_) => unreachable!(),
})
}
}

type ConnectionFuture<C> = future::Shared<BoxFuture<'static, C>>;
Expand Down Expand Up @@ -120,15 +203,15 @@ enum CmdArg<C> {
Cmd {
cmd: Arc<Cmd>,
func: fn(C, Arc<Cmd>) -> RedisFuture<'static, Response>,
routing: Option<RoutingInfo>,
routing: RoutingInfo,
response_policy: Option<ResponsePolicy>,
},
Pipeline {
pipeline: Arc<crate::Pipeline>,
offset: usize,
count: usize,
func: fn(C, Arc<crate::Pipeline>, usize, usize) -> RedisFuture<'static, Response>,
route: Option<Route>,
route: SingleNodeRoutingInfo,
},
}

Expand Down Expand Up @@ -657,20 +740,17 @@ where
cmd: Arc<Cmd>,
func: fn(C, Arc<Cmd>) -> RedisFuture<'static, Response>,
redirect: Option<Redirect>,
routing: Option<RoutingInfo>,
routing: RoutingInfo,
response_policy: Option<ResponsePolicy>,
core: Core<C>,
asking: bool,
) -> (OperationTarget, RedisResult<Response>) {
let route_option = match routing
.as_ref()
.unwrap_or(&RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
{
let route_option = match routing {
RoutingInfo::MultiNode(multi_node_routing) => {
return Self::execute_on_multiple_nodes(
func,
&cmd,
multi_node_routing,
&multi_node_routing,
core,
response_policy,
)
Expand All @@ -680,7 +760,7 @@ where
RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(route)) => Some(route),
};

let (addr, conn) = Self::get_connection(redirect, route_option, core, asking).await;
let (addr, conn) = Self::get_connection(redirect, route_option.into(), core, asking).await;
let result = func(conn, cmd).await;
(addr.into(), result)
}
Expand Down Expand Up @@ -733,7 +813,7 @@ where
offset,
count,
func,
Self::get_connection(info.redirect, route.as_ref(), core, asking),
Self::get_connection(info.redirect, route, core, asking),
)
.await
}
Expand All @@ -742,7 +822,7 @@ where

async fn get_connection(
mut redirect: Option<Redirect>,
route: Option<&Route>,
route: SingleNodeRoutingInfo,
core: Core<C>,
asking: bool,
) -> (String, C) {
Expand All @@ -751,10 +831,13 @@ where
let conn = match redirect.take() {
Some(Redirect::Moved(moved_addr)) => Some(moved_addr),
Some(Redirect::Ask(ask_addr)) => Some(ask_addr),
None => route
.as_ref()
.and_then(|route| read_guard.1.slot_addr_for_route(route))
.map(|addr| addr.to_string()),
None => match route {
SingleNodeRoutingInfo::Random => None,
SingleNodeRoutingInfo::SpecificNode(route) => read_guard
.1
.slot_addr_for_route(&route)
.map(|addr| addr.to_string()),
},
}
.map(|addr| {
let conn = read_guard.0.get(&addr).cloned();
Expand Down Expand Up @@ -1076,46 +1159,13 @@ where

impl<C> ConnectionLike for ClusterConnection<C>
where
C: ConnectionLike + Send + 'static,
C: ConnectionLike + Send + Clone + Unpin + Sync + Connect + 'static,
{
fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
trace!("req_packed_command");
let (sender, receiver) = oneshot::channel();
Box::pin(async move {
self.0
.send(Message {
cmd: CmdArg::Cmd {
cmd: Arc::new(cmd.clone()), // TODO Remove this clone?
func: |mut conn, cmd| {
Box::pin(async move {
conn.req_packed_command(&cmd).await.map(Response::Single)
})
},
routing: RoutingInfo::for_routable(cmd),
response_policy: RoutingInfo::response_policy(cmd),
},
sender,
})
.await
.map_err(|_| {
RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to send command",
))
})?;
receiver
.await
.unwrap_or_else(|_| {
Err(RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to receive command",
)))
})
.map(|response| match response {
Response::Single(value) => value,
Response::Multiple(_) => unreachable!(),
})
})
let routing = RoutingInfo::for_routable(cmd)
.unwrap_or(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random));
let response_policy = ResponsePolicy::for_routable(cmd);
self.route_command(cmd, routing, response_policy).boxed()
}

fn req_packed_commands<'a>(
Expand All @@ -1124,38 +1174,8 @@ where
offset: usize,
count: usize,
) -> RedisFuture<'a, Vec<Value>> {
let (sender, receiver) = oneshot::channel();
Box::pin(async move {
self.0
.send(Message {
cmd: CmdArg::Pipeline {
pipeline: Arc::new(pipeline.clone()), // TODO Remove this clone?
offset,
count,
func: |mut conn, pipeline, offset, count| {
Box::pin(async move {
conn.req_packed_commands(&pipeline, offset, count)
.await
.map(Response::Multiple)
})
},
route: route_pipeline(pipeline),
},
sender,
})
.await
.map_err(|_| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))?;

receiver
.await
.unwrap_or_else(|_| {
Err(RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
})
.map(|response| match response {
Response::Multiple(values) => values,
Response::Single(_) => unreachable!(),
})
})
let route = route_pipeline(pipeline).into();
self.route_pipeline(pipeline, offset, count, route).boxed()
}

fn get_db(&self) -> i64 {
Expand Down
Loading

0 comments on commit 5e977e9

Please sign in to comment.