Skip to content

Commit

Permalink
async cluster: Allow manual command routing.
Browse files Browse the repository at this point in the history
  • Loading branch information
nihohit committed Jul 24, 2023
1 parent d598d19 commit ae89b1d
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 85 deletions.
155 changes: 85 additions & 70 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,87 @@ where
ClusterConnection(tx)
})
}

/// Send a command to the given `routing`. If `routing` is [None], it will be computed from `cmd`.
pub async fn send_packed_command(
&mut self,
cmd: &Cmd,
routing: Option<RoutingInfo>,
) -> RedisResult<Value> {
trace!("send_packed_command");
let (sender, receiver) = oneshot::channel();
self.0
.send(Message {
cmd: CmdArg::Cmd {
cmd: Arc::new(cmd.clone()), // TODO Remove this clone?
func: |mut conn, cmd| {
Box::pin(async move {
conn.req_packed_command(&cmd).await.map(Response::Single)
})
},
routing: routing.or_else(|| RoutingInfo::for_routable(cmd)),
response_policy: RoutingInfo::response_policy(cmd),
},
sender,
})
.await
.map_err(|_| {
RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to send command",
))
})?;
receiver
.await
.unwrap_or_else(|_| {
Err(RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to receive command",
)))
})
.map(|response| match response {
Response::Single(value) => value,
Response::Multiple(_) => unreachable!(),
})
}

/// Send commands in `pipeline` to the given `route`. If `route` is [None], it will be computed from `pipeline`.
pub async fn send_packed_commands<'a>(
&'a mut self,
pipeline: &'a crate::Pipeline,
offset: usize,
count: usize,
route: Option<Route>,
) -> RedisResult<Vec<Value>> {
let (sender, receiver) = oneshot::channel();
self.0
.send(Message {
cmd: CmdArg::Pipeline {
pipeline: Arc::new(pipeline.clone()), // TODO Remove this clone?
offset,
count,
func: |mut conn, pipeline, offset, count| {
Box::pin(async move {
conn.req_packed_commands(&pipeline, offset, count)
.await
.map(Response::Multiple)
})
},
route: route.or_else(|| route_pipeline(pipeline)),
},
sender,
})
.await
.map_err(|_| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))?;

receiver
.await
.unwrap_or_else(|_| Err(RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe))))
.map(|response| match response {
Response::Multiple(values) => values,
Response::Single(_) => unreachable!(),
})
}
}

type ConnectionFuture<C> = future::Shared<BoxFuture<'static, C>>;
Expand Down Expand Up @@ -1075,46 +1156,10 @@ where

impl<C> ConnectionLike for ClusterConnection<C>
where
C: ConnectionLike + Send + 'static,
C: ConnectionLike + Send + Clone + Unpin + Sync + Connect + 'static,
{
fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
trace!("req_packed_command");
let (sender, receiver) = oneshot::channel();
Box::pin(async move {
self.0
.send(Message {
cmd: CmdArg::Cmd {
cmd: Arc::new(cmd.clone()), // TODO Remove this clone?
func: |mut conn, cmd| {
Box::pin(async move {
conn.req_packed_command(&cmd).await.map(Response::Single)
})
},
routing: RoutingInfo::for_routable(cmd),
response_policy: RoutingInfo::response_policy(cmd),
},
sender,
})
.await
.map_err(|_| {
RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to send command",
))
})?;
receiver
.await
.unwrap_or_else(|_| {
Err(RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to receive command",
)))
})
.map(|response| match response {
Response::Single(value) => value,
Response::Multiple(_) => unreachable!(),
})
})
self.send_packed_command(cmd, None).boxed()
}

fn req_packed_commands<'a>(
Expand All @@ -1123,38 +1168,8 @@ where
offset: usize,
count: usize,
) -> RedisFuture<'a, Vec<Value>> {
let (sender, receiver) = oneshot::channel();
Box::pin(async move {
self.0
.send(Message {
cmd: CmdArg::Pipeline {
pipeline: Arc::new(pipeline.clone()), // TODO Remove this clone?
offset,
count,
func: |mut conn, pipeline, offset, count| {
Box::pin(async move {
conn.req_packed_commands(&pipeline, offset, count)
.await
.map(Response::Multiple)
})
},
route: route_pipeline(pipeline),
},
sender,
})
.await
.map_err(|_| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))?;

receiver
.await
.unwrap_or_else(|_| {
Err(RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
})
.map(|response| match response {
Response::Multiple(values) => values,
Response::Single(_) => unreachable!(),
})
})
self.send_packed_commands(pipeline, offset, count, None)
.boxed()
}

fn get_db(&self) -> i64 {
Expand Down
46 changes: 31 additions & 15 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,32 @@ pub(crate) enum ResponsePolicy {
Special,
}

/// Defines whether a request should be routed to a single node, or multiple ones.
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum RoutingInfo {
pub enum RoutingInfo {
/// Route to single node
SingleNode(SingleNodeRoutingInfo),
/// Route to multiple nodes
MultiNode(MultipleNodeRoutingInfo),
}

/// Defines which single node should receive a request.
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum SingleNodeRoutingInfo {
pub enum SingleNodeRoutingInfo {
/// Route to any node at random
Random,
/// Route to the node that matches the [route]
SpecificNode(Route),
}

/// Defines which collection of nodes should receive a request
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum MultipleNodeRoutingInfo {
pub enum MultipleNodeRoutingInfo {
/// route to all nodes in the clusters
AllNodes,
/// Route to all primaries in the cluster
AllMasters,
// Instructions on how to split a multi-slot command (e.g. MGET, MSET) into sub-commands. Each tuple is the route for each subcommand and the indices of the arguments from the original command that should be copied to the subcommand.
/// Instructions on how to split a multi-slot command (e.g. MGET, MSET) into sub-commands. Each tuple is the route for each subcommand and the indices of the arguments from the original command that should be copied to the subcommand.
MultiSlot(Vec<(Route, Vec<usize>)>),
}

Expand Down Expand Up @@ -182,7 +191,8 @@ pub(crate) fn combine_and_sort_array_results<'a>(
Ok(Value::Bulk(results))
}

fn get_slot(key: &[u8]) -> u16 {
/// Returns the slot that matches `key`.
pub fn get_slot(key: &[u8]) -> u16 {
let key = match get_hashtag(key) {
Some(tag) => tag,
None => key,
Expand Down Expand Up @@ -276,7 +286,8 @@ impl RoutingInfo {
}
}

pub(crate) fn for_routable<R>(r: &R) -> Option<RoutingInfo>
/// Returns the routing info for `r`.
pub fn for_routable<R>(r: &R) -> Option<RoutingInfo>
where
R: Routable + ?Sized,
{
Expand Down Expand Up @@ -345,17 +356,18 @@ impl RoutingInfo {
}
}

pub fn for_key(cmd: &[u8], key: &[u8]) -> RoutingInfo {
fn for_key(cmd: &[u8], key: &[u8]) -> RoutingInfo {
RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(get_route(
is_readonly_cmd(cmd),
key,
)))
}
}

pub(crate) trait Routable {
// Convenience function to return ascii uppercase version of the
// the first argument (i.e., the command).
/// Objects that implement this trait define a request that can be routed by a cluster client to different nodes in the cluster.
pub trait Routable {
/// Convenience function to return ascii uppercase version of the
/// the first argument (i.e., the command).
fn command(&self) -> Option<Vec<u8>> {
let primary_command = self.arg_idx(0).map(|x| x.to_ascii_uppercase())?;
let mut primary_command = match primary_command.as_slice() {
Expand All @@ -379,10 +391,10 @@ pub(crate) trait Routable {
})
}

// Returns a reference to the data for the argument at `idx`.
/// Returns a reference to the data for the argument at `idx`.
fn arg_idx(&self, idx: usize) -> Option<&[u8]>;

// Returns index of argument that matches `candidate`, if it exists
/// Returns index of argument that matches `candidate`, if it exists
fn position(&self, candidate: &[u8]) -> Option<usize>;
}

Expand Down Expand Up @@ -456,9 +468,12 @@ impl Slot {
}
}

/// What type of node should a request be routed to.
#[derive(Eq, PartialEq, Clone, Copy, Debug, Hash)]
pub(crate) enum SlotAddr {
pub enum SlotAddr {
/// Primary node
Master,
/// Replica node
Replica,
}

Expand Down Expand Up @@ -577,10 +592,11 @@ impl SlotMap {
/// Defines the slot and the [`SlotAddr`] to which
/// a command should be sent
#[derive(Eq, PartialEq, Clone, Copy, Debug, Hash)]
pub(crate) struct Route(u16, SlotAddr);
pub struct Route(u16, SlotAddr);

impl Route {
pub(crate) fn new(slot: u16, slot_addr: SlotAddr) -> Self {
/// Returns a new Route.
pub fn new(slot: u16, slot_addr: SlotAddr) -> Self {
Self(slot, slot_addr)
}

Expand Down

0 comments on commit ae89b1d

Please sign in to comment.