Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

feat/core: split up large and deduplicate group messages #1038

Merged
merged 3 commits into from Jun 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions .travis.yml
Expand Up @@ -14,9 +14,6 @@ matrix:
allow_failures:
- rust: nightly
sudo: false
branches:
only:
- master
cache:
cargo: true
directories:
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -13,7 +13,7 @@ version = "0.21.0"
[dependencies]
accumulator = "~0.4.0"
clippy = {version = "~0.0.73", optional = true}
crust = "~0.14.0"
crust = { git = "https://github.com/maidsafe/crust.git", branch = "async" }
itertools = "~0.4.15"
kademlia_routing_table = "~0.6.0"
log = "~0.3.6"
Expand Down
3 changes: 0 additions & 3 deletions appveyor.yml
Expand Up @@ -4,9 +4,6 @@ environment:
Features: "use-mock-crust"
matrix:
- RUST_VERSION: stable
branches:
only:
- master

clone_depth: 50

Expand Down
2 changes: 1 addition & 1 deletion examples/utils/example_client.rs
Expand Up @@ -47,7 +47,7 @@ impl ExampleClient {
let sign_keys = crypto::sign::gen_keypair();
let encrypt_keys = crypto::box_::gen_keypair();
let full_id = FullId::with_keys(encrypt_keys.clone(), sign_keys.clone());
let routing_client = unwrap_result!(Client::new(sender, Some(full_id), false));
let routing_client = unwrap_result!(Client::new(sender, Some(full_id)));

// Wait indefinitely for a `Connected` event, notifying us that we are now ready to send
// requests to the network.
Expand Down
7 changes: 3 additions & 4 deletions examples/utils/example_node.rs
Expand Up @@ -52,7 +52,7 @@ impl ExampleNode {
/// Creates a new node and attempts to establish a connection to the network.
pub fn new(first: bool) -> ExampleNode {
let (sender, receiver) = ::std::sync::mpsc::channel::<Event>();
let node = unwrap_result!(Node::new(sender.clone(), false, first));
let node = unwrap_result!(Node::new(sender.clone(), first));

ExampleNode {
node: node,
Expand Down Expand Up @@ -91,9 +91,8 @@ impl ExampleNode {
trace!("{} Received disconnected event", self.get_debug_name());
}
Event::GetNodeNameFailed => {
let _ =
mem::replace(&mut self.node,
unwrap_result!(Node::new(self.sender.clone(), false, false)));
let _ = mem::replace(&mut self.node,
unwrap_result!(Node::new(self.sender.clone(), false)));
}
event => {
trace!("{} Received {:?} event", self.get_debug_name(), event);
Expand Down
8 changes: 6 additions & 2 deletions src/action.rs
Expand Up @@ -19,7 +19,7 @@ use std::fmt::{self, Debug, Formatter};
use std::sync::mpsc::Sender;
use authority::Authority;
use error::InterfaceError;
use messages::{Request, RoutingMessage};
use messages::{Request, UserMessage};
use xor_name::XorName;

/// An Action initiates a message flow < A | B > where we are (a part of) A.
Expand All @@ -31,12 +31,16 @@ use xor_name::XorName;
#[derive(Clone)]
pub enum Action {
NodeSendMessage {
content: RoutingMessage,
src: Authority,
dst: Authority,
content: UserMessage,
priority: u8,
result_tx: Sender<Result<(), InterfaceError>>,
},
ClientSendRequest {
content: Request,
dst: Authority,
priority: u8,
result_tx: Sender<Result<(), InterfaceError>>,
},
CloseGroup {
Expand Down
35 changes: 18 additions & 17 deletions src/client.rs
Expand Up @@ -29,7 +29,7 @@ use core::{Core, Role};
use data::{Data, DataIdentifier};
use error::{InterfaceError, RoutingError};
use authority::Authority;
use messages::Request;
use messages::{Request, DEFAULT_PRIORITY, CLIENT_GET_PRIORITY};
use types::MessageId;

type RoutingResult = Result<(), RoutingError>;
Expand Down Expand Up @@ -63,14 +63,11 @@ impl Client {
/// cryptographically secure and uses group consensus. The restriction for the client name
/// exists to ensure that the client cannot choose its `ClientAuthority`.
#[cfg(not(feature = "use-mock-crust"))]
pub fn new(event_sender: Sender<Event>,
keys: Option<FullId>,
use_data_cache: bool)
-> Result<Client, RoutingError> {
pub fn new(event_sender: Sender<Event>, keys: Option<FullId>) -> Result<Client, RoutingError> {
sodiumoxide::init(); // enable shared global (i.e. safe to multithread now)

// start the handler for routing with a restriction to become a full node
let (action_sender, mut core) = Core::new(event_sender, Role::Client, keys, use_data_cache);
let (action_sender, mut core) = Core::new(event_sender, Role::Client, keys);
let (tx, rx) = channel();

let raii_joiner = RaiiThreadJoiner::new(thread!("Client thread", move || {
Expand All @@ -87,14 +84,11 @@ impl Client {

/// Create a new `Client` for unit testing.
#[cfg(feature = "use-mock-crust")]
pub fn new(event_sender: Sender<Event>,
keys: Option<FullId>,
use_data_cache: bool)
-> Result<Client, RoutingError> {
pub fn new(event_sender: Sender<Event>, keys: Option<FullId>) -> Result<Client, RoutingError> {
sodiumoxide::init(); // enable shared global (i.e. safe to multithread now)

// start the handler for routing with a restriction to become a full node
let (action_sender, core) = Core::new(event_sender, Role::Client, keys, use_data_cache);
let (action_sender, core) = Core::new(event_sender, Role::Client, keys);
let (tx, rx) = channel();

Ok(Client {
Expand Down Expand Up @@ -123,7 +117,7 @@ impl Client {
data_id: DataIdentifier,
message_id: MessageId)
-> Result<(), InterfaceError> {
self.send_action(Request::Get(data_id, message_id), dst)
self.send_action(Request::Get(data_id, message_id), dst, CLIENT_GET_PRIORITY)
}

/// Add something to the network
Expand All @@ -132,7 +126,7 @@ impl Client {
data: Data,
message_id: MessageId)
-> Result<(), InterfaceError> {
self.send_action(Request::Put(data, message_id), dst)
self.send_action(Request::Put(data, message_id), dst, DEFAULT_PRIORITY)
}

/// Change something already on the network
Expand All @@ -141,7 +135,7 @@ impl Client {
data: Data,
message_id: MessageId)
-> Result<(), InterfaceError> {
self.send_action(Request::Post(data, message_id), dst)
self.send_action(Request::Post(data, message_id), dst, DEFAULT_PRIORITY)
}

/// Remove something from the network
Expand All @@ -150,21 +144,28 @@ impl Client {
data: Data,
message_id: MessageId)
-> Result<(), InterfaceError> {
self.send_action(Request::Delete(data, message_id), dst)
self.send_action(Request::Delete(data, message_id), dst, DEFAULT_PRIORITY)
}

/// Request account information for the Client calling this function
pub fn send_get_account_info_request(&mut self,
dst: Authority,
message_id: MessageId)
-> Result<(), InterfaceError> {
self.send_action(Request::GetAccountInfo(message_id), dst)
self.send_action(Request::GetAccountInfo(message_id),
dst,
CLIENT_GET_PRIORITY)
}

fn send_action(&self, content: Request, dst: Authority) -> Result<(), InterfaceError> {
fn send_action(&self,
content: Request,
dst: Authority,
priority: u8)
-> Result<(), InterfaceError> {
let action = Action::ClientSendRequest {
content: content,
dst: dst,
priority: priority,
result_tx: self.interface_result_tx.clone(),
};

Expand Down