Skip to content

Commit

Permalink
Merge pull request maidsafe#336 from brian-js/new_routing
Browse files Browse the repository at this point in the history
Updates incorporating routing changes.
  • Loading branch information
Fraser Hutchison committed Feb 23, 2016
2 parents bdbfc64 + f3b9ba4 commit 6d4045f
Show file tree
Hide file tree
Showing 29 changed files with 2,054 additions and 894 deletions.
17 changes: 8 additions & 9 deletions Cargo.toml
Expand Up @@ -11,23 +11,22 @@ homepage = "http://maidsafe.net"

[dependencies]
chunk_store = "~0.2.0"
clippy = {version = "~0.0.37", optional = true}
crust = "~0.8.0"
ctrlc = "~1.0.1"
itertools = "~0.4.5"
clippy = {version = "~0.0.42", optional = true}
config_file_handler = "~0.0.1"
ctrlc = "~1.1.0"
log = "~0.3.5"
lru_time_cache = "~0.2.6"
maidsafe_utilities = "~0.1.5"
routing = "~0.6.3"
mpid_messaging = "~0.1.0"
routing = "~0.7.0"
rustc-serialize = "~0.3.18"
sodiumoxide = "~0.0.9"
time = "~0.1.34"
xor_name = "~0.0.2"
mpid_messaging = "~0.1.0"
xor_name = "~0.0.3"

[dev-dependencies]
kademlia_routing_table = "~0.0.5"
rand = "~0.3.13"
kademlia_routing_table = "~0.3.0"
rand = "~0.3.14"

[features]
use-mock-routing = []
Expand Down
10 changes: 0 additions & 10 deletions rustfmt.toml

This file was deleted.

1 change: 1 addition & 0 deletions src/error.rs
Expand Up @@ -38,6 +38,7 @@ pub enum InternalError {
UnknownMessageType(RoutingMessage),
UnknownRefreshType(Authority, Authority, Refresh),
InvalidResponse,
NotInCloseGroup,
ChunkStore(chunk_store::Error),
MpidMessaging(mpid_messaging::Error),
Serialisation(SerialisationError),
Expand Down
3 changes: 1 addition & 2 deletions src/main.rs
Expand Up @@ -47,11 +47,10 @@ extern crate log;
extern crate maidsafe_utilities;
extern crate mpid_messaging;
extern crate chunk_store;
extern crate crust;
extern crate config_file_handler;
extern crate ctrlc;
#[cfg(all(test, feature = "use-mock-routing"))]
extern crate kademlia_routing_table;
extern crate itertools;
extern crate lru_time_cache;
#[cfg(all(test, feature = "use-mock-routing"))]
extern crate rand;
Expand Down
63 changes: 38 additions & 25 deletions src/mock_routing/mock_routing_impl.rs
Expand Up @@ -15,21 +15,31 @@
// Please review the Licences for the specific language governing permissions and limitations
// relating to use of the SAFE Network Software.

use kademlia_routing_table::{group_size, optimal_table_size};
use kademlia_routing_table::{GROUP_SIZE, ContactInfo, RoutingTable};
use maidsafe_utilities::thread::RaiiThreadJoiner;
use rand::random;
use routing::{Authority, Data, DataRequest, Event, InterfaceError, MessageId, RequestContent, RequestMessage,
ResponseContent, ResponseMessage};
use routing::{Authority, Data, DataRequest, Event, InterfaceError, MessageId, RequestContent,
RequestMessage, ResponseContent, ResponseMessage};
use sodiumoxide::crypto::hash::sha512;
use std::cmp::{Ordering, min};
use std::sync::mpsc;
use std::thread::sleep;
use std::time::Duration;
use xor_name::{XorName, closer_to_target};

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct NodeInfo(XorName);

impl ContactInfo for NodeInfo {
fn name(&self) -> &XorName {
&self.0
}
}

pub struct MockRoutingNodeImpl {
name: XorName,
peers: Vec<XorName>,
// TODO: Use RT crate instead of this Vec<XorName> (provides realistic result for `close_nodes`)
routing_table: RoutingTable<NodeInfo>,
sender: mpsc::Sender<Event>,
client_sender: mpsc::Sender<Event>,
simulated_latency: Duration,
Expand All @@ -53,20 +63,14 @@ impl MockRoutingNodeImpl {
pub fn new(sender: mpsc::Sender<Event>) -> MockRoutingNodeImpl {
let (client_sender, _) = mpsc::channel();
let name: XorName = random();
let mut peers = Vec::with_capacity(optimal_table_size());
for _ in 0..optimal_table_size() {
peers.push(random());
let mut routing_table = RoutingTable::new(NodeInfo(name));
for _ in 0..1000 {
let _ = routing_table.add(NodeInfo(random()));
}
peers.sort_by(|a, b| {
match closer_to_target(&a, &b, &name) {
true => Ordering::Less,
false => Ordering::Greater,
}
});

MockRoutingNodeImpl {
name: name,
peers: peers,
routing_table: routing_table,
sender: sender,
client_sender: client_sender,
simulated_latency: Duration::from_millis(200),
Expand Down Expand Up @@ -122,13 +126,17 @@ impl MockRoutingNodeImpl {
"Mock Client Delete Request");
}

pub fn churn_event(&mut self, event_id: MessageId, lost_close_node: Option<XorName>) {
pub fn node_added_event(&mut self, node_added: XorName) {
let cloned_sender = self.sender.clone();
self.thread_joiners.push(RaiiThreadJoiner::new(thread!("Mock NodeAdded Event", move || {
let _ = cloned_sender.send(Event::NodeAdded(node_added));
})));
}

pub fn node_lost_event(&mut self, node_lost: XorName) {
let cloned_sender = self.sender.clone();
self.thread_joiners.push(RaiiThreadJoiner::new(thread!("Mock Churn Event", move || {
let _ = cloned_sender.send(Event::Churn {
id: event_id,
lost_close_node: lost_close_node,
});
self.thread_joiners.push(RaiiThreadJoiner::new(thread!("Mock NodeLost Event", move || {
let _ = cloned_sender.send(Event::NodeLost(node_lost));
})));
}

Expand Down Expand Up @@ -337,18 +345,23 @@ impl MockRoutingNodeImpl {
Ok(self.delete_failures_given.push(message))
}

pub fn send_refresh_request(&mut self, src: Authority, content: Vec<u8>) -> Result<(), InterfaceError> {
pub fn send_refresh_request(&mut self,
src: Authority,
content: Vec<u8>)
-> Result<(), InterfaceError> {
let content = RequestContent::Refresh(content);
let message = self.send_request(src.clone(), src, content, "Mock Refresh Request");
Ok(self.refresh_requests_given.push(message))
}

pub fn name(&self) -> Result<XorName, InterfaceError> {
Ok(self.name.clone())
pub fn close_group(&self, name: XorName) -> Result<Option<Vec<XorName>>, InterfaceError> {
Ok(self.routing_table
.close_nodes(&name)
.map(|infos| infos.iter().map(|info| &info.0).cloned().collect()))
}

pub fn close_group(&self) -> Result<Vec<XorName>, InterfaceError> {
Ok(self.peers.iter().take(group_size()).cloned().collect())
pub fn name(&self) -> Result<XorName, InterfaceError> {
Ok(self.name.clone())
}

fn send_request(&mut self,
Expand Down
59 changes: 39 additions & 20 deletions src/mock_routing/mod.rs
Expand Up @@ -22,8 +22,9 @@
mod mock_routing_impl;

use self::mock_routing_impl::MockRoutingNodeImpl;
use routing::{Authority, Data, DataRequest, Event, ImmutableData, ImmutableDataType, InterfaceError, MessageId,
RequestContent, RequestMessage, ResponseContent, ResponseMessage, RoutingError};
use routing::{Authority, Data, DataRequest, Event, ImmutableData, ImmutableDataType,
InterfaceError, MessageId, RequestContent, RequestMessage, ResponseContent,
ResponseMessage, RoutingError};
use sodiumoxide::crypto::hash::sha512;
use sodiumoxide::crypto::sign::PublicKey;
use std::sync::{Arc, Mutex, mpsc};
Expand All @@ -38,30 +39,41 @@ impl MockRoutingNode {
Ok(MockRoutingNode { pimpl: Arc::new(Mutex::new(MockRoutingNodeImpl::new(event_sender))) })
}

pub fn get_client_receiver(&mut self) -> mpsc::Receiver<Event> {
pub fn get_client_receiver(&self) -> mpsc::Receiver<Event> {
unwrap_result!(self.pimpl.lock()).get_client_receiver()
}

// ----------- the following methods are for testing purpose only ------------- //
pub fn client_get(&self, client_address: XorName, client_pub_key: PublicKey, data_request: DataRequest) {
unwrap_result!(self.pimpl.lock()).client_get(Self::client_authority(client_address, client_pub_key),
data_request)
pub fn client_get(&self,
client_address: XorName,
client_pub_key: PublicKey,
data_request: DataRequest) {
unwrap_result!(self.pimpl.lock())
.client_get(Self::client_authority(client_address, client_pub_key),
data_request)
}

pub fn client_put(&self, client_address: XorName, client_pub_key: PublicKey, data: Data) {
unwrap_result!(self.pimpl.lock()).client_put(Self::client_authority(client_address, client_pub_key), data)
unwrap_result!(self.pimpl.lock())
.client_put(Self::client_authority(client_address, client_pub_key), data)
}

pub fn client_post(&self, client_address: XorName, client_pub_key: PublicKey, data: Data) {
unwrap_result!(self.pimpl.lock()).client_post(Self::client_authority(client_address, client_pub_key), data)
unwrap_result!(self.pimpl.lock())
.client_post(Self::client_authority(client_address, client_pub_key), data)
}

pub fn client_delete(&self, client_address: XorName, client_pub_key: PublicKey, data: Data) {
unwrap_result!(self.pimpl.lock()).client_delete(Self::client_authority(client_address, client_pub_key), data)
unwrap_result!(self.pimpl.lock())
.client_delete(Self::client_authority(client_address, client_pub_key), data)
}

pub fn churn_event(&self, event_id: MessageId, lost_close_node: Option<XorName>) {
unwrap_result!(self.pimpl.lock()).churn_event(event_id, lost_close_node)
pub fn node_added_event(&self, node_added: XorName) {
unwrap_result!(self.pimpl.lock()).node_added_event(node_added)
}

pub fn node_lost_event(&self, node_lost: XorName) {
unwrap_result!(self.pimpl.lock()).node_lost_event(node_lost)
}

pub fn get_requests_given(&self) -> Vec<RequestMessage> {
Expand Down Expand Up @@ -171,7 +183,8 @@ impl MockRoutingNode {
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
unwrap_result!(self.pimpl.lock()).send_get_failure(src, dst, request, external_error_indicator, id)
unwrap_result!(self.pimpl.lock())
.send_get_failure(src, dst, request, external_error_indicator, id)
}

pub fn send_put_success(&self,
Expand All @@ -190,7 +203,8 @@ impl MockRoutingNode {
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
unwrap_result!(self.pimpl.lock()).send_put_failure(src, dst, request, external_error_indicator, id)
unwrap_result!(self.pimpl.lock())
.send_put_failure(src, dst, request, external_error_indicator, id)
}

pub fn send_post_success(&self,
Expand All @@ -209,7 +223,8 @@ impl MockRoutingNode {
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
unwrap_result!(self.pimpl.lock()).send_post_failure(src, dst, request, external_error_indicator, id)
unwrap_result!(self.pimpl.lock())
.send_post_failure(src, dst, request, external_error_indicator, id)
}

pub fn send_delete_success(&self,
Expand All @@ -228,19 +243,23 @@ impl MockRoutingNode {
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
unwrap_result!(self.pimpl.lock()).send_delete_failure(src, dst, request, external_error_indicator, id)
unwrap_result!(self.pimpl.lock())
.send_delete_failure(src, dst, request, external_error_indicator, id)
}

pub fn send_refresh_request(&self, src: Authority, content: Vec<u8>) -> Result<(), InterfaceError> {
pub fn send_refresh_request(&self,
src: Authority,
content: Vec<u8>)
-> Result<(), InterfaceError> {
unwrap_result!(self.pimpl.lock()).send_refresh_request(src, content)
}

pub fn name(&self) -> Result<XorName, InterfaceError> {
unwrap_result!(self.pimpl.lock()).name()
pub fn close_group(&self, name: XorName) -> Result<Option<Vec<XorName>>, InterfaceError> {
unwrap_result!(self.pimpl.lock()).close_group(name)
}

pub fn close_group(&self) -> Result<Vec<XorName>, InterfaceError> {
unwrap_result!(self.pimpl.lock()).close_group()
pub fn name(&self) -> Result<XorName, InterfaceError> {
unwrap_result!(self.pimpl.lock()).name()
}

fn client_authority(client_address: XorName, client_pub_key: PublicKey) -> Authority {
Expand Down

0 comments on commit 6d4045f

Please sign in to comment.