Skip to content

Commit

Permalink
WIP: SigmaAggr integration testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
oskin1 committed Jun 23, 2023
1 parent 90ecc43 commit 7ffa538
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 34 deletions.
77 changes: 76 additions & 1 deletion spectrum-network/src/protocol_handler/sigma_aggregation.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -318,13 +319,56 @@ struct AggregationTask<'a, H, PP> {
channel: Sender<Result<Aggregated<H>, ()>>,
}

#[repr(usize)]
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
enum StageTag {
PreCommit = 0,
Commit = 1,
Broadcast = 2,
Response = 3,
}

impl From<&SigmaAggrMessageV1> for StageTag {
fn from(m: &SigmaAggrMessageV1) -> Self {
match m {
SigmaAggrMessageV1::PreCommitments(_) => StageTag::PreCommit,
SigmaAggrMessageV1::Commitments(_) => StageTag::Commit,
SigmaAggrMessageV1::Broadcast(_) => StageTag::Broadcast,
SigmaAggrMessageV1::Responses(_) => StageTag::Response,
}
}
}

/// Stash of messages received during improper stage. Messages are groupped by stage.
struct MessageStash([HashMap<PeerId, SigmaAggrMessageV1>; 4]);

impl MessageStash {
fn new() -> Self {
Self([HashMap::new(), HashMap::new(), HashMap::new(), HashMap::new()])
}

fn stash(&mut self, peer: PeerId, m: SigmaAggrMessageV1) {
let stage = StageTag::from(&m);
self.0[stage as usize].insert(peer, m);
}

fn unstash(&mut self, stage: StageTag) -> HashMap<PeerId, SigmaAggrMessageV1> {
mem::replace(&mut self.0[stage as usize], HashMap::new())
}

fn flush(&mut self) {
self.0 = [HashMap::new(), HashMap::new(), HashMap::new(), HashMap::new()];
}
}

pub struct SigmaAggregation<'a, H, MPP, OB>
where
MPP: MakePeerPartitions,
{
host_sk: SecretKey,
handel_conf: HandelConfig,
task: Option<AggregationTask<'a, H, MPP::PP>>,
stash: MessageStash,
partitioner: MPP,
mcast_overlay_builder: OB,
inbox: Receiver<AggregationAction<H>>,
Expand Down Expand Up @@ -356,12 +400,25 @@ where
host_sk,
handel_conf,
task: None,
stash: MessageStash::new(),
partitioner,
mcast_overlay_builder,
inbox,
outbox: VecDeque::new(),
}
}

fn unstash_stage(&mut self, stage: StageTag)
where
H: Debug,
MPP: MakePeerPartitions + Clone + Send,
MPP::PP: Send + 'a,
OB: MakeDagOverlay + Clone,
{
for (p, m) in self.stash.unstash(stage) {
self.inject_message(p, SigmaAggrMessage::SigmaAggrMessageV1(m))
}
}
}

impl<'a, H, MPP, OB> ProtocolBehaviour for SigmaAggregation<'a, H, MPP, OB>
Expand Down Expand Up @@ -389,6 +446,9 @@ where
pre_commitment.host_ix,
);
pre_commitment.handel.inject_message(peer_id, pre_commits);
} else {
trace!("SigmaAggrMessageV1 expected PreCommitments, got {:?}", msg);
self.stash.stash(peer_id, msg);
}
}
Some(AggregationTask {
Expand All @@ -400,12 +460,21 @@ where
commitment.handel.inject_message(peer_id, commits);
} else {
trace!("SigmaAggrMessageV1 expected Commitments, got {:?}", msg);
self.stash.stash(peer_id, msg);
}
}
Some(AggregationTask {
state: AggregationState::BroadcastCommitments(ref mut bcast),
..
}) => {}
}) => {
if let SigmaAggrMessageV1::Broadcast(commits) = msg {
trace!("SigmaAggrMessageV1::Broadcast: {:?}", commits);
bcast.mcast.inject_message(peer_id, commits);
} else {
trace!("SigmaAggrMessageV1 expected Broadcast, got {:?}", msg);
self.stash.stash(peer_id, msg);
}
}
Some(AggregationTask {
state: AggregationState::AggregateResponses(ref mut response),
..
Expand All @@ -415,6 +484,7 @@ where
response.handel.inject_message(peer_id, resps);
} else {
trace!("SigmaAggrMessageV1 expected Responses, got {:?}", msg);
self.stash.stash(peer_id, msg);
}
}
None => {}
Expand All @@ -437,6 +507,7 @@ where
new_message,
channel,
} => {
self.stash.flush();
self.task = Some(AggregationTask {
state: AggregationState::AggregatePreCommitments(AggregatePreCommitments::init(
self.host_sk.clone(),
Expand Down Expand Up @@ -473,6 +544,7 @@ where
}
Either::Right(pre_commitments) => {
let host_ix = st.host_ix;
self.unstash_stage(StageTag::Commit);
self.task = Some(AggregationTask {
state: AggregationState::AggregateCommitments(
st.complete(pre_commitments, self.handel_conf),
Expand Down Expand Up @@ -508,6 +580,7 @@ where

Either::Right(commitments) => {
trace!("[SA] Got commitments");
self.unstash_stage(StageTag::Broadcast);
self.task = Some(AggregationTask {
state: AggregationState::BroadcastCommitments(st.complete(commitments)),
channel,
Expand Down Expand Up @@ -539,6 +612,7 @@ where
}
Either::Right(commitments) => {
trace!("[SA] Got commitments");
self.unstash_stage(StageTag::Response);
self.task = Some(AggregationTask {
state: AggregationState::AggregateResponses(
st.complete(commitments, self.handel_conf),
Expand Down Expand Up @@ -568,6 +642,7 @@ where
}
Either::Right(responses) => {
self.task = None;
self.stash.flush();
let res = st.complete(responses);
// todo: support error case.
trace!("[SA] Got responses: {:?}", res);
Expand Down
95 changes: 63 additions & 32 deletions spectrum-network/tests/integration_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,12 @@ async fn create_swarm<P>(
}
}

#[derive(Debug)]
struct Stats {
num_succ: usize,
num_fail: usize,
}

#[cfg_attr(feature = "test_peer_punish_too_slow", ignore)]
#[tokio::test]
async fn multicasting_normal() {
Expand All @@ -990,8 +996,8 @@ async fn multicasting_normal() {
redundancy_factor: 3,
seed: 64,
};
wasm_timer::Delay::new(Duration::from_millis(1000)).await.unwrap();

wasm_timer::Delay::new(Duration::from_secs(1)).await.unwrap();
println!("Starting testing ..");

let root_pid = peers[0].peer_id;
Expand All @@ -1006,23 +1012,20 @@ async fn multicasting_normal() {
} else {
None
};
async_std::task::block_on(peer.aggr_handler_mailbox.clone().send(SetTask {
initial_statement,
on_response: snd,
overlay,
}))
.unwrap();
peer.aggr_handler_mailbox
.clone()
.send(SetTask {
initial_statement,
on_response: snd,
overlay,
})
.await
.unwrap();
println!("Assigned task to peer {:?}", peer.peer_addr);
}

let started_at = Instant::now();

#[derive(Debug)]
struct Stats {
num_succ: usize,
num_fail: usize,
}

let stats = Arc::new(Mutex::new(Stats {
num_fail: 0,
num_succ: 0,
Expand All @@ -1037,7 +1040,11 @@ async fn multicasting_normal() {
let elapsed = finished_at.sub(started_at);
match res {
Ok(_) => {
println!("[Peer-{}] :: Finished mcast in {} millis", ix, elapsed.as_millis());
println!(
"[Peer-{}] :: Finished mcast in {} millis",
ix,
elapsed.as_millis()
);
stats.lock().await.num_succ += 1;
}
Err(_) => {
Expand All @@ -1057,15 +1064,14 @@ async fn multicasting_normal() {
peer.peer_handle.abort();
}

let stats = async_std::task::block_on(stats.lock());
println!("Finished. {:?}", stats);
println!("Finished. {:?}", stats.lock().await);
}

#[cfg_attr(feature = "test_peer_punish_too_slow", ignore)]
#[tokio::test]
async fn sigma_aggregation_normal() {
//init_logging_once_for(vec![], LevelFilter::Debug, None);
let mut peers = aggregation::setup_nodes(8);
let mut peers = aggregation::setup_nodes(16);
let md = blake2b256_hash(b"foo");
let committee: HashMap<PublicKey, Option<Multiaddr>> = peers
.iter()
Expand All @@ -1076,38 +1082,63 @@ async fn sigma_aggregation_normal() {
)
.collect();

wasm_timer::Delay::new(Duration::from_millis(100)).await.unwrap();
wasm_timer::Delay::new(Duration::from_secs(1)).await.unwrap();
println!("Starting testing ..");

let mut result_futures = Vec::new();
for peer in peers.iter_mut() {
let (snd, recv) = oneshot::channel();
result_futures.push(recv);
async_std::task::block_on(peer.aggr_handler_mailbox.clone().send(AggregationAction::Reset {
new_committee: committee.clone(),
new_message: md,
channel: snd,
}))
.unwrap();
peer.aggr_handler_mailbox
.clone()
.send(AggregationAction::Reset {
new_committee: committee.clone(),
new_message: md,
channel: snd,
})
.await
.unwrap();
println!("Assigned task to peer {:?}", peer.peer_addr);
}

let started_at = Instant::now();

let stats = Arc::new(Mutex::new(Stats {
num_fail: 0,
num_succ: 0,
}));

for (ix, fut) in result_futures.into_iter().enumerate() {
async_std::task::spawn(async move {
let res = fut.await;
let finished_at = Instant::now();
let elapsed = finished_at.sub(started_at);
match res {
Ok(_) => println!("PEER:{} :: Finished aggr in {} millis", ix, elapsed.as_millis()),
Err(_) => println!("PEER:{} :: Failed aggr in {} millis", ix, elapsed.as_millis()),
async_std::task::spawn({
let stats = Arc::clone(&stats);
async move {
let res = fut.await;
let finished_at = Instant::now();
let elapsed = finished_at.sub(started_at);
match res {
Ok(_) => {
println!("[Peer-{}] :: Finished aggr in {} millis", ix, elapsed.as_millis());
stats.lock().await.num_succ += 1;
}
Err(_) => {
println!("[Peer-{}] :: Failed aggr in {} millis", ix, elapsed.as_millis());
stats.lock().await.num_fail += 1;
}
}
}
});
}

wasm_timer::Delay::new(Duration::from_secs(2)).await.unwrap();
wasm_timer::Delay::new(Duration::from_secs(5)).await.unwrap();

println!("Timeout");

for peer in &peers {
peer.peer_handle.abort();
}

wasm_timer::Delay::new(Duration::from_secs(1)).await.unwrap();
println!("Finished. {:?}", stats.lock().await);
}

fn make_swarm_components<P, F>(
Expand Down
2 changes: 1 addition & 1 deletion spectrum-network/tests/integration_tests/multicasting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ where
..
}) => {
println!("[Peer-{}] :: Got message {:?}", self.host_ix, content);
proc.inject_message(peer_id, content)
proc.inject_message(peer_id, content);
}
}
}
Expand Down

0 comments on commit 7ffa538

Please sign in to comment.