Skip to content

Commit

Permalink
Merge pull request #47 from spectrum-finance/multicast-dev-1131
Browse files Browse the repository at this point in the history
Multicasting trees
  • Loading branch information
oskin1 committed Jun 27, 2023
2 parents 57488a1 + d2db11e commit 73a663c
Show file tree
Hide file tree
Showing 18 changed files with 1,298 additions and 159 deletions.
65 changes: 65 additions & 0 deletions algebra-core/src/combinators.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::iso::Iso;

#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum EitherOrBoth<A, B> {
Left(A),
Right(B),
Both(A, B),
}

impl<A, B> EitherOrBoth<A, B> {
pub fn swap(self) -> EitherOrBoth<B, A> {
match self {
EitherOrBoth::Left(o1) => EitherOrBoth::Right(o1),
EitherOrBoth::Right(o2) => EitherOrBoth::Left(o2),
EitherOrBoth::Both(o1, o2) => EitherOrBoth::Both(o2, o1),
}
}

/// If `Left`, or `Both`, return `Some` with the left value, otherwise, return `None`.
pub fn left(self) -> Option<A> {
match self {
EitherOrBoth::Left(left) | EitherOrBoth::Both(left, _) => Some(left),
_ => None,
}
}

/// If `Right`, or `Both`, return `Some` with the right value, otherwise, return `None`.
pub fn right(self) -> Option<B> {
match self {
EitherOrBoth::Right(right) | EitherOrBoth::Both(_, right) => Some(right),
_ => None,
}
}

/// If Both, return `Some` tuple containing left and right.
pub fn both(self) -> Option<(A, B)> {
match self {
EitherOrBoth::Both(a, b) => Some((a, b)),
_ => None,
}
}

pub fn collect(self) -> Vec<B>
where
A: Iso<B>,
{
match self {
EitherOrBoth::Left(a) => vec![a.iso_into()],
EitherOrBoth::Right(b) => vec![b],
EitherOrBoth::Both(a, b) => vec![a.iso_into(), b],
}
}
}

impl<A, B> TryFrom<(Option<A>, Option<B>)> for EitherOrBoth<A, B> {
type Error = ();
fn try_from(pair: (Option<A>, Option<B>)) -> Result<Self, Self::Error> {
match pair {
(Some(l), Some(r)) => Ok(Self::Both(l, r)),
(Some(l), None) => Ok(Self::Left(l)),
(None, Some(r)) => Ok(Self::Right(r)),
_ => Err(()),
}
}
}
13 changes: 13 additions & 0 deletions algebra-core/src/iso.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
pub trait Iso<T> {
fn iso_into(self) -> T;
fn iso_from(that: T) -> Self;
}

impl<A> Iso<A> for A {
fn iso_into(self) -> A {
self
}
fn iso_from(that: A) -> Self {
that
}
}
3 changes: 3 additions & 0 deletions algebra-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod combinators;
pub mod iso;

/// https://ncatlab.org/nlab/show/commutative+semigroup
pub trait CommutativeSemigroup {
fn combine(&self, that: &Self) -> Self;
Expand Down
3 changes: 2 additions & 1 deletion spectrum-crypto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ serde_with = "3.0.0"
elliptic-curve = "0.13.*"
k256 = { version = "0.13.*", features = ["serde"] }
libp2p-identity = { version = "0.1.*", features = ["peerid", "secp256k1"] }
libsecp256k1 = "0.7.1"
libsecp256k1 = "0.7.1"
async-trait = "0.1.68"
13 changes: 13 additions & 0 deletions spectrum-crypto/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use async_trait::async_trait;

pub mod digest;
mod hash;
pub mod pubkey;
Expand All @@ -6,3 +8,14 @@ pub mod pubkey;
pub trait VerifiableAgainst<P> {
fn verify(&self, public_data: &P) -> bool;
}

/// Proof that given statement `S` is verified.
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Debug)]
pub struct Verified<S>(pub S);

/// Some statement which can be verified against public data `P`.
#[async_trait]
pub trait AsyncVerifiable<P>: Send + Sync + Sized {
type Err: Send;
async fn verify(self, public_data: &P) -> Result<Verified<Self>, Self::Err>;
}
3 changes: 2 additions & 1 deletion spectrum-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ libsecp256k1 = "0.7.1"
group = "0.13.*"
nonempty = "0.8.1"
higher = "0.2.0"
tokio = {version = "1", features = ["time", "rt", "macros"] }
tokio = {version = "1.28.*", features = ["time", "rt", "macros", "rt-multi-thread"] }
async-trait = "0.1.68"
rocksdb = "0.21.0"

Expand All @@ -48,3 +48,4 @@ libp2p = { version = "0.51.*", features = ["noise", "yamux", "async-std", "tcp"]
log4rs_test_utils = {version = "0.2.3", featuers = ["test_logging"]}
base16 = "0.2.1"
serde_yaml = "0.9.21"
itertools = "0.10.5"
1 change: 1 addition & 0 deletions spectrum-network/src/protocol_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod pool;
pub mod sigma_aggregation;
pub mod versioning;
pub mod void;
pub mod multicasting;

#[derive(Debug)]
pub enum NetworkAction<THandshake, TMessage> {
Expand Down
4 changes: 2 additions & 2 deletions spectrum-network/src/protocol_handler/cosi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ pub struct AnnouncementState<S> {
}

/// It basically waits for an announcement and then forwards it to it's subtrees.
impl<'d, S, R> TemporalProtocolStage<CoSiHandshake, CoSiMessage<S, R>, S> for CoSiAnnouncementStage<S, R>
impl<S, R> TemporalProtocolStage<CoSiHandshake, CoSiMessage<S, R>, S> for CoSiAnnouncementStage<S, R>
where
S: Eq + Clone + Send + Serialize + Deserialize<'d> + Debug,
S: Eq + Clone + Send + Serialize + Debug,
{
fn inject_message(&mut self, peer_id: PeerId, msg: CoSiMessage<S, R>) {
match msg {
Expand Down
139 changes: 69 additions & 70 deletions spectrum-network/src/protocol_handler/handel.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::cmp::{max, Ordering};
use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::future::Future;
use std::ops::{Add, Mul};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use async_std::task::sleep;
use either::{Either, Left, Right};
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use libp2p::PeerId;
use log::trace;

use algebra_core::CommutativePartialSemigroup;
use log::trace;
use spectrum_crypto::VerifiableAgainst;

use crate::protocol_handler::handel::message::HandelMessage;
Expand Down Expand Up @@ -86,9 +87,9 @@ pub struct HandelConfig {
pub window_shrinking_factor: usize,
pub initial_scoring_window: usize,
pub fast_path_window: usize,
pub dissemination_interval: Duration,
pub dissemination_delay: Duration,
pub level_activation_delay: Duration,
pub poll_fn_delay: Duration,
pub throttle_factor: u32,
}

/// A round of Handel protocol that drives aggregation of contribution `C`.
Expand All @@ -104,13 +105,13 @@ pub struct Handel<C, P, PP> {
/// Keeps track of the peers to whom we've sent our own contribution already.
own_contribution_recvs: HashSet<PeerIx>,
outbox: VecDeque<ProtocolBehaviourOut<VoidMessage, HandelMessage<C>>>,
next_dissemination_at: Instant,
level_activation_schedule: Vec<Option<Instant>>,
own_peer_ix: PeerIx,
/// Tracks peers who have indicated that they have completed particular contribution levels.
peers_completed_levels: HashMap<PeerIx, HashSet<u32>>,
/// We use a delay in the `poll` fn to prevent spinning.
delay: Option<Pin<Box<tokio::time::Sleep>>>,
next_processing: Option<Pin<Box<tokio::time::Sleep>>>,
next_dissemination: Pin<Box<tokio::time::Sleep>>,
next_activation: Pin<Box<tokio::time::Sleep>>,
}

impl<C, P, PP> Handel<C, P, PP>
Expand All @@ -133,7 +134,6 @@ where
});
levels[0] = Some(ActiveLevel::unit(own_contribution_scored.clone()));
levels[1] = Some(ActiveLevel::new(own_contribution_scored, 1));
let now = Instant::now();
Handel {
conf,
public_data,
Expand All @@ -144,44 +144,11 @@ where
byzantine_nodes: HashSet::new(),
own_contribution_recvs: HashSet::new(),
outbox: VecDeque::new(),
next_dissemination_at: now,
level_activation_schedule: vec![(); num_levels]
.into_iter()
.enumerate()
.map(|(i, _)| {
if i > 1 {
Some(now.add(conf.level_activation_delay.mul(i as u32)))
} else {
// 0'th and 1'st levels are already activated.
None
}
})
.collect(),
own_peer_ix,
peers_completed_levels: HashMap::default(),
delay: None,
}
}

fn try_disseminatate(&mut self) {
let now = Instant::now();
if now >= self.next_dissemination_at {
self.run_dissemination();
self.next_dissemination_at = now.add(self.conf.dissemination_interval);
}
}

fn try_activate_levels(&mut self) {
let now = Instant::now();
for (lvl, schedule) in self.level_activation_schedule.clone().into_iter().enumerate() {
if let Some(ts) = schedule {
if ts <= now {
self.try_activate_level(lvl);
self.level_activation_schedule[lvl] = None;
} else {
break;
}
}
next_processing: None,
next_dissemination: Box::pin(tokio::time::sleep(conf.dissemination_delay)),
next_activation: Box::pin(tokio::time::sleep(conf.level_activation_delay)),
}
}

Expand Down Expand Up @@ -575,6 +542,22 @@ where
self.levels.get(level).map(|l| l.is_some()).unwrap_or(false)
}

fn next_non_active_level(&self) -> Option<usize> {
self.levels
.iter()
.enumerate()
.take_while(|(_, l)| l.is_none())
.map(|(i, _)| i)
.max()
.and_then(|lvl| {
if lvl < self.peer_partitions.num_levels() {
Some(lvl)
} else {
None
}
})
}

fn get_own_contribution(&self) -> C {
self.levels[0]
.as_ref()
Expand Down Expand Up @@ -634,6 +617,8 @@ impl<C: Eq> Ord for ScoredContributionTraced<C> {
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Debug)]
struct Verified<C>(C);

const BASE_THROTTLE_DURATION: Duration = Duration::from_millis(1);

impl<C, P, PP> TemporalProtocolStage<VoidMessage, HandelMessage<C>, C> for Handel<C, P, PP>
where
C: CommutativePartialSemigroup + Weighted + VerifiableAgainst<P> + Clone + Eq + Debug,
Expand Down Expand Up @@ -662,37 +647,51 @@ where

fn poll(
&mut self,
cx: &mut Context<'_>,
cx: &mut Context,
) -> Poll<Either<ProtocolBehaviourOut<VoidMessage, HandelMessage<C>>, C>> {
if let Some(mut delay) = self.delay.take() {
match delay.poll_unpin(cx) {
Poll::Ready(_) => {
self.try_disseminatate();
self.try_activate_levels();
match self.next_dissemination.poll_unpin(cx) {
Poll::Ready(_) => {
self.run_dissemination();
self.next_dissemination = Box::pin(tokio::time::sleep(self.conf.dissemination_delay));
}
Poll::Pending => {}
}

if let Some(out) = self.outbox.pop_front() {
trace!(
"[Handel] {:?}: outbox.pop(), # outbox items left: {}",
self.own_peer_ix,
self.outbox.len()
);
return Poll::Ready(Left(out));
}
if let Some(ca) = self.get_complete_aggregate() {
Poll::Ready(Right(ca))
} else {
self.delay = Some(Box::pin(tokio::time::sleep(self.conf.poll_fn_delay)));
cx.waker().wake_by_ref();
Poll::Pending
}
match self.next_activation.poll_unpin(cx) {
Poll::Ready(_) => {
if let Some(lvl) = self.next_non_active_level() {
self.try_activate_level(lvl);
self.next_activation = Box::pin(tokio::time::sleep(self.conf.level_activation_delay));
}
}
Poll::Pending => {}
}

if let Some(mut delay) = self.next_processing.take() {
match delay.poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => {
self.delay = Some(delay);
Poll::Pending
self.next_processing = Some(delay);
return Poll::Pending;
}
}
}

if let Some(out) = self.outbox.pop_front() {
trace!(
"[Handel] {:?}: outbox.pop(), # outbox items left: {}",
self.own_peer_ix,
self.outbox.len()
);
self.next_processing = Some(Box::pin(tokio::time::sleep(BASE_THROTTLE_DURATION)));
return Poll::Ready(Left(out));
}
if let Some(ca) = self.get_complete_aggregate() {
Poll::Ready(Right(ca))
} else {
self.delay = Some(Box::pin(tokio::time::sleep(self.conf.poll_fn_delay)));
self.next_processing = Some(Box::pin(tokio::time::sleep(
BASE_THROTTLE_DURATION * self.conf.throttle_factor,
)));
cx.waker().wake_by_ref();
Poll::Pending
}
Expand Down Expand Up @@ -765,9 +764,9 @@ mod tests {
window_shrinking_factor: 2,
initial_scoring_window: 4,
fast_path_window: 4,
dissemination_interval: Duration::from_millis(2000),
dissemination_delay: Duration::from_millis(2000),
level_activation_delay: Duration::from_millis(400),
poll_fn_delay: Duration::from_millis(5),
throttle_factor: 5,
};

fn make_handel(
Expand Down

0 comments on commit 73a663c

Please sign in to comment.