Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix: add missing routing to client of relayed client message
Browse files Browse the repository at this point in the history
  • Loading branch information
oetyng committed Feb 23, 2021
1 parent 0f3418b commit fbde5b1
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 8 deletions.
47 changes: 42 additions & 5 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ use bytes::Bytes;
use ed25519_dalek::Verifier;
use itertools::Itertools;
use resource_proof::ResourceProof;
use sn_data_types::PublicKey as EndUserPK;
use sn_messaging::{
client::Message as ClientMessage,
node::NodeMessage,
section_info::{
Error as TargetSectionError, GetSectionResponse, Message as SectionInfoMsg, SectionInfo,
Expand Down Expand Up @@ -112,10 +114,17 @@ impl Approved {
self.end_users.get_enduser_by_addr(sender)
}

pub fn get_socket_addr(&self, id: &SocketId) -> Option<&SocketAddr> {
pub fn get_socket_addr(&self, id: SocketId) -> Option<&SocketAddr> {
self.end_users.get_socket_addr(id)
}

pub fn get_all_socket_addr<'a>(
&'a self,
end_user: &'a EndUserPK,
) -> impl Iterator<Item = &'a SocketAddr> {
self.end_users.get_all_socket_addr(end_user)
}

pub fn node(&self) -> &Node {
&self.node
}
Expand Down Expand Up @@ -638,8 +647,7 @@ impl Approved {
self.handle_join_request(msg.src().to_node_peer(sender)?, *join_request.clone())
}
Variant::UserMessage(content) => {
self.handle_user_message(msg.src().src_location(), *msg.dst(), content.clone());
Ok(vec![])
self.handle_user_message(msg.src().src_location(), *msg.dst(), content.clone())
}
Variant::BouncedUntrustedMessage(message) => {
let sender = sender.ok_or(Error::InvalidSrcLocation)?;
Expand Down Expand Up @@ -951,8 +959,37 @@ impl Approved {
Ok(commands)
}

fn handle_user_message(&self, src: SrcLocation, dst: DstLocation, content: Bytes) {
self.send_event(Event::MessageReceived { content, src, dst })
fn handle_user_message(
&self,
src: SrcLocation,
dst: DstLocation,
content: Bytes,
) -> Result<Vec<Command>> {
if let DstLocation::EndUser(end_user) = &dst {
let recipients = match end_user {
EndUser::AllClients(public_key) => {
self.get_all_socket_addr(public_key).copied().collect()
}
EndUser::Client { socket_id, .. } => {
if let Some(socket_addr) = self.get_socket_addr(*socket_id).copied() {
vec![socket_addr]
} else {
vec![]
}
}
};
if recipients.is_empty() {
return Err(Error::CannotRoute);
};
return Ok(vec![Command::SendMessage {
recipients,
delivery_group_size: 1,
message: MessageType::ClientMessage(ClientMessage::from(content)?),
}]);
}

self.send_event(Event::MessageReceived { content, src, dst });
Ok(vec![])
}

fn handle_sync(&mut self, section: Section, network: Network) -> Result<Vec<Command>> {
Expand Down
14 changes: 12 additions & 2 deletions src/routing/enduser_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,18 @@ impl EndUserRegistry {
self.clients.get(socketaddr)
}

pub fn get_socket_addr(&self, socket_id: &SocketId) -> Option<&SocketAddr> {
self.socket_id_mapping.get(socket_id)
pub fn get_socket_addr(&self, socket_id: SocketId) -> Option<&SocketAddr> {
self.socket_id_mapping.get(&socket_id)
}

pub fn get_all_socket_addr<'a>(
&'a self,
end_user_pk: &'a EndUserPK,
) -> impl Iterator<Item = &'a SocketAddr> {
self.clients
.iter()
.filter(move |(_, end_user)| end_user.id() == end_user_pk)
.map(|(socket_addr, _)| socket_addr)
}

pub fn try_add(
Expand Down
2 changes: 1 addition & 1 deletion src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl Routing {
.state
.lock()
.await
.get_socket_addr(&socket_id)
.get_socket_addr(socket_id)
.copied();

if let Some(socket_addr) = socket_addr {
Expand Down

0 comments on commit fbde5b1

Please sign in to comment.