Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactions gossiping #649

Merged
merged 16 commits into from Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 32 additions & 1 deletion core/mempool/src/lib.rs
Expand Up @@ -15,8 +15,13 @@ use storage::{GenericStorage, ShardChainStorage, Trie, TrieUpdate};
const POISONED_LOCK_ERR: &str = "The lock was poisoned.";

pub mod pool_task;
pub mod tx_gossip;

use crate::pool_task::MemPoolControl;
use primitives::signature::SecretKey;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

secret keys shouldn't be sent around. We have {Block/Transaction}Signer interface instead.

See #653 for the propagation of that.


#[macro_use]
extern crate serde_derive;

/// mempool that stores transactions and receipts for a chain
pub struct Pool {
Expand All @@ -30,6 +35,7 @@ pub struct Pool {
pub authority_id: RwLock<Option<AuthorityId>>,
/// Number of authorities currently.
num_authorities: RwLock<Option<usize>>,
owner_secret_key: RwLock<Option<SecretKey>>,
/// Map from hash of tx/receipt to hashset of authorities it is known.
known_to: RwLock<HashMap<CryptoHash, HashSet<AuthorityId>>>,
/// List of requested snapshots that can't be fetched yet.
Expand All @@ -49,6 +55,7 @@ impl Pool {
snapshots: Default::default(),
authority_id: Default::default(),
num_authorities: Default::default(),
owner_secret_key: Default::default(),
known_to: Default::default(),
pending_snapshots: Default::default(),
ready_snapshots: Default::default(),
Expand All @@ -58,15 +65,17 @@ impl Pool {
/// Reset MemPool: clear snapshots, switch to new authorities and own authority id.
pub fn reset(&self, control: MemPoolControl) {
match control {
MemPoolControl::Reset { authority_id, num_authorities, .. } => {
MemPoolControl::Reset { authority_id, num_authorities, owner_secret_key, .. } => {
info!(target: "mempool", "MemPool reset for {}", authority_id);
*self.authority_id.write().expect(POISONED_LOCK_ERR) = Some(authority_id);
*self.num_authorities.write().expect(POISONED_LOCK_ERR) = Some(num_authorities);
*self.owner_secret_key.write().expect(POISONED_LOCK_ERR) = Some(owner_secret_key);
}
MemPoolControl::Stop => {
info!(target: "mempool", "MemPool stopped");
*self.authority_id.write().expect(POISONED_LOCK_ERR) = None;
*self.num_authorities.write().expect(POISONED_LOCK_ERR) = None;
*self.owner_secret_key.write().expect(POISONED_LOCK_ERR) = None;
}
}
self.snapshots.write().expect(POISONED_LOCK_ERR).clear();
Expand Down Expand Up @@ -105,6 +114,7 @@ impl Pool {
originator
));
}
self.known_to.write().expect(POISONED_LOCK_ERR).insert(transaction.get_hash(), HashSet::new());
self.transactions.write().expect(POISONED_LOCK_ERR).insert(transaction);
Ok(())
}
Expand Down Expand Up @@ -137,9 +147,27 @@ impl Pool {
Ok(())
}

pub fn add_payload_with_author(&self, payload: ChainPayload, author: AuthorityId) -> Result<(), String> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function has exactly the same signature (and almost the same logic) as function below - add_payload_snapshot

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

their the expectation is that it's final snapshot of proposal though, but seems reasonable to do the same things for keeping track of tx/receipts/known

for transaction in payload.transactions {
let hash = transaction.get_hash();
self.add_transaction(transaction)?;
self.known_to
.write()
.expect(POISONED_LOCK_ERR)
.entry(hash)
.or_insert(HashSet::new())
.insert(author);
}
for receipt in payload.receipts {
self.add_receipt(receipt)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not tracking receipts as well?

}
Ok(())
}

pub fn snapshot_payload(&self) -> CryptoHash {
let transactions: Vec<_> =
self.transactions.write().expect(POISONED_LOCK_ERR).drain().collect();
self.known_to.write().expect(POISONED_LOCK_ERR).drain();
let receipts: Vec<_> = self.receipts.write().expect(POISONED_LOCK_ERR).drain().collect();
let snapshot = ChainPayload { transactions, receipts };
if snapshot.is_empty() {
Expand Down Expand Up @@ -188,6 +216,7 @@ impl Pool {

pub fn import_block(&self, block: &SignedShardBlock) {
for transaction in block.body.transactions.iter() {
self.known_to.write().expect(POISONED_LOCK_ERR).remove(&transaction.get_hash());
self.transactions.write().expect(POISONED_LOCK_ERR).remove(transaction);
}
for receipt in block.body.receipts.iter() {
Expand Down Expand Up @@ -263,6 +292,7 @@ mod tests {
let transaction = SignedTransaction::new(signature, tx_body);
pool.add_transaction(transaction.clone()).unwrap();
assert_eq!(pool.transactions.read().expect(POISONED_LOCK_ERR).len(), 1);
assert_eq!(pool.known_to.read().expect(POISONED_LOCK_ERR).len(), 1);
let block = SignedShardBlock::new(
0,
0,
Expand All @@ -274,5 +304,6 @@ mod tests {
);
pool.import_block(&block);
assert_eq!(pool.transactions.read().expect(POISONED_LOCK_ERR).len(), 0);
assert_eq!(pool.known_to.read().expect(POISONED_LOCK_ERR).len(), 0);
}
}
76 changes: 76 additions & 0 deletions core/mempool/src/pool_task.rs
Expand Up @@ -18,6 +18,10 @@ use primitives::types::AuthorityId;
use nightshade::nightshade_task::Control;

use crate::Pool;
use crate::tx_gossip::TxGossip;
use std::collections::HashSet;

const POISONED_LOCK_ERR: &str = "The lock was poisoned.";

#[derive(Clone, Debug)]
pub enum MemPoolControl {
Expand All @@ -43,7 +47,10 @@ pub fn spawn_pool(
payload_announce_tx: Sender<(AuthorityId, ChainPayload)>,
payload_request_tx: Sender<PayloadRequest>,
payload_response_rx: Receiver<PayloadResponse>,
inc_tx_gossip_rx: Receiver<TxGossip>,
out_tx_gossip_tx: Sender<TxGossip>,
payload_announce_period: Duration,
gossip_tx_period: Duration,
) {
// Handle request from NightshadeTask for confirmation on a payload.
// If the payload can't be built from the mempool task to fetch necessary data is spawned and the
Expand Down Expand Up @@ -176,4 +183,73 @@ pub fn spawn_pool(
future::ok(())
});
tokio::spawn(task);

// Receive transaction gossips
let pool5 = pool.clone();
let task = inc_tx_gossip_rx.for_each(move |tx_gossip| {
// TODO: verify signature
if let Err(e) = pool5.add_payload_with_author(tx_gossip.payload, tx_gossip.sender_id) {
warn!(target: "pool", "Failed to add payload from tx gossip: {}", e);
}

future::ok(())
});
tokio::spawn(task);

let pool6 = pool.clone();
let task = Interval::new_interval(gossip_tx_period)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code above:

    let pool2 = pool.clone();
    let task = Interval::new_interval(payload_announce_period)
        .for_each(move |_| {
            if let Some((authority_id, payload)) = pool2.prepare_payload_announce() {

is suppose to do exactly the same thing.
the idea ws to put logic of deciding who to announce/gossip what payloads in prepare_payload_announce

.for_each(move |_| {
let my_authority_id = match *pool6.authority_id.read().expect(POISONED_LOCK_ERR) {
Some(x) => { x },
None => { return future::ok(()) },
};
for their_authority_id in 0..pool6.num_authorities.read().expect(POISONED_LOCK_ERR).unwrap_or(0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably should not send to all authorities all the time? the point of gossip is to send to subset from time to time IMO.

if their_authority_id == my_authority_id {
continue;
}
let sk = match pool6.owner_secret_key.write().expect(POISONED_LOCK_ERR).clone() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be outside the for loop.

Some(sk) => sk,
None => continue
};

let mut to_send = vec![];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all this logic should go into prepare_payload_announce.

for tx in pool6.transactions.read().expect(POISONED_LOCK_ERR).iter() {
let mut locked_known_to = pool6.known_to.write().expect(POISONED_LOCK_ERR);
match locked_known_to.get_mut(&tx.get_hash()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some(known_to) => {
if !known_to.contains(&their_authority_id) {
to_send.push(tx.clone());
known_to.insert(their_authority_id);
}
},
None => {
to_send.push(tx.clone());
let mut known_to = HashSet::new();
known_to.insert(their_authority_id);
locked_known_to.insert(tx.get_hash(), known_to);
},
}
}
if to_send.len() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if to_send.len() == 0 {
if to_send.is_empty() {

continue;
}
let payload = ChainPayload{ transactions: to_send, receipts: vec![] };
let gossip = TxGossip::new(
my_authority_id,
their_authority_id,
payload,
sk,
);
tokio::spawn(
out_tx_gossip_tx
.clone()
.send(gossip)
.map(|_| ())
.map_err(|e| warn!(target: "pool", "Error sending message: {}", e)),
);
}
future::ok(())
})
.map_err(|e| error!(target: "pool", "Timer error: {}", e));
tokio::spawn(task);
}
26 changes: 26 additions & 0 deletions core/mempool/src/tx_gossip.rs
@@ -0,0 +1,26 @@
use primitives::types::AuthorityId;
use primitives::chain::ChainPayload;
use primitives::signature::Signature;
use primitives::hash::hash_struct;
use primitives::signature::SecretKey;
use primitives::signature::sign;

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct TxGossip {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be PayloadGossip?
Also should gossip first send Hashs of tx/receipts and then other client retrieves content if doesn't have it?

pub sender_id: AuthorityId,
pub receiver_id: AuthorityId,
pub payload: ChainPayload,
signature: Signature,
}

impl TxGossip {
pub fn new(sender_id: AuthorityId, receiver_id: AuthorityId, payload: ChainPayload, sk: SecretKey) -> Self {
let hash = hash_struct(&(receiver_id, &payload));
TxGossip {
sender_id,
receiver_id,
payload,
signature: sign(hash.as_ref(), &sk),
}
}
}
7 changes: 7 additions & 0 deletions node/alphanet/src/lib.rs
Expand Up @@ -53,6 +53,8 @@ pub fn start_from_client(
// Create all the consensus channels.
let (inc_gossip_tx, inc_gossip_rx) = channel(1024);
let (out_gossip_tx, out_gossip_rx) = channel(1024);
let (inc_tx_gossip_tx, inc_tx_gossip_rx) = channel(1024);
let (out_tx_gossip_tx, out_tx_gossip_rx) = channel(1024);
let (consensus_tx, consensus_rx) = channel(1024);
let (control_tx, control_rx) = channel(1024);

Expand All @@ -70,7 +72,10 @@ pub fn start_from_client(
payload_announce_tx,
payload_request_tx,
payload_response_rx,
inc_tx_gossip_rx,
out_tx_gossip_tx,
network_cfg.gossip_interval,
network_cfg.tx_gossip_interval,
);

// Launch block syncing / importing.
Expand Down Expand Up @@ -103,6 +108,8 @@ pub fn start_from_client(
network_cfg,
inc_gossip_tx,
out_gossip_rx,
inc_tx_gossip_tx,
out_tx_gossip_rx,
inc_block_tx,
out_block_rx,
payload_announce_rx,
Expand Down
1 change: 1 addition & 0 deletions node/alphanet/src/testing_utils.rs
Expand Up @@ -73,6 +73,7 @@ impl Node {
boot_nodes,
reconnect_delay: Duration::from_millis(50),
gossip_interval: Duration::from_millis(50),
tx_gossip_interval: Duration::from_millis(50),
gossip_sample_size: 10,
};

Expand Down
10 changes: 10 additions & 0 deletions node/configs/src/network.rs
Expand Up @@ -21,6 +21,7 @@ pub struct NetworkConfig {
pub boot_nodes: Vec<PeerInfo>,
pub reconnect_delay: Duration,
pub gossip_interval: Duration,
pub tx_gossip_interval: Duration,
pub gossip_sample_size: usize,
}

Expand Down Expand Up @@ -65,6 +66,12 @@ pub fn get_args<'a, 'b>() -> Vec<Arg<'a, 'b>> {
.help("Delay in ms between gossiping peers info with known peers.")
.default_value(DEFAULT_GOSSIP_INTERVAL_MS)
.takes_value(true),
Arg::with_name("tx_gossip_interval_ms")
.long("tx-gossip-interval-ms")
.value_name("TRANSACTIONS GOSSIP_INTERVAL_MS")
.help("Delay in ms between gossiping transactions to peers.")
.default_value(DEFAULT_GOSSIP_INTERVAL_MS)
.takes_value(true),
Arg::with_name("gossip_sample_size")
.long("gossip-sample-size")
.value_name("GOSSIP_SAMPLE_SIZE")
Expand Down Expand Up @@ -112,6 +119,8 @@ pub fn from_matches(client_config: &ClientConfig, matches: &ArgMatches) -> Netwo
matches.value_of("reconnect_delay_ms").map(|x| x.parse::<u64>().unwrap()).unwrap();
let gossip_interval_ms =
matches.value_of("gossip_interval_ms").map(|x| x.parse::<u64>().unwrap()).unwrap();
let tx_gossip_interval_ms =
matches.value_of("tx_gossip_interval_ms").map(|x| x.parse::<u64>().unwrap()).unwrap();
let gossip_sample_size =
matches.value_of("gossip_sample_size").map(|x| x.parse::<usize>().unwrap()).unwrap();

Expand All @@ -129,6 +138,7 @@ pub fn from_matches(client_config: &ClientConfig, matches: &ArgMatches) -> Netwo
boot_nodes,
reconnect_delay: Duration::from_millis(reconnect_delay_ms),
gossip_interval: Duration::from_millis(gossip_interval_ms),
tx_gossip_interval: Duration::from_millis(tx_gossip_interval_ms),
gossip_sample_size,
}
}
1 change: 1 addition & 0 deletions node/network/Cargo.toml
Expand Up @@ -29,6 +29,7 @@ near-protos = { path = "../../core/protos" }
shard = { path = "../shard" }
storage = { path = "../../core/storage", features=["test-utils"] }
nightshade = { path = "../../core/nightshade" }
mempool = { path = "../../core/mempool" }

[features]
test-utils = []
3 changes: 3 additions & 0 deletions node/network/src/message.rs
@@ -1,6 +1,7 @@
use serde_derive::{Deserialize, Serialize};

use nightshade::nightshade_task::Gossip;
use mempool::tx_gossip::TxGossip;
use primitives::beacon::SignedBeaconBlock;
use primitives::chain::{ChainPayload, ReceiptBlock, SignedShardBlock};
use primitives::hash::CryptoHash;
Expand Down Expand Up @@ -45,6 +46,8 @@ pub enum Message {

/// Nightshade gossip.
Gossip(Box<Gossip>),
/// Transaction gossip
TxGossip(Box<TxGossip>),
/// Announce of tx/receipts between authorities.
PayloadAnnounce(ChainPayload),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should remove payload announce then - that was suppose to be to send each other payloads.

/// Request specific tx/receipts.
Expand Down