Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multicasting trees #47

Merged
merged 14 commits into from
Jun 27, 2023
65 changes: 65 additions & 0 deletions algebra-core/src/combinators.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::iso::Iso;

#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum EitherOrBoth<A, B> {
Left(A),
Right(B),
Both(A, B),
}

impl<A, B> EitherOrBoth<A, B> {
pub fn swap(self) -> EitherOrBoth<B, A> {
match self {
EitherOrBoth::Left(o1) => EitherOrBoth::Right(o1),
EitherOrBoth::Right(o2) => EitherOrBoth::Left(o2),
EitherOrBoth::Both(o1, o2) => EitherOrBoth::Both(o2, o1),
}
}

/// If `Left`, or `Both`, return `Some` with the left value, otherwise, return `None`.
pub fn left(self) -> Option<A> {
match self {
EitherOrBoth::Left(left) | EitherOrBoth::Both(left, _) => Some(left),
_ => None,
}
}

/// If `Right`, or `Both`, return `Some` with the right value, otherwise, return `None`.
pub fn right(self) -> Option<B> {
match self {
EitherOrBoth::Right(right) | EitherOrBoth::Both(_, right) => Some(right),
_ => None,
}
}

/// If Both, return `Some` tuple containing left and right.
pub fn both(self) -> Option<(A, B)> {
match self {
EitherOrBoth::Both(a, b) => Some((a, b)),
_ => None,
}
}

pub fn collect(self) -> Vec<B>
where
A: Iso<B>,
{
match self {
EitherOrBoth::Left(a) => vec![a.iso_into()],
EitherOrBoth::Right(b) => vec![b],
EitherOrBoth::Both(a, b) => vec![a.iso_into(), b],
}
}
}

impl<A, B> TryFrom<(Option<A>, Option<B>)> for EitherOrBoth<A, B> {
type Error = ();
fn try_from(pair: (Option<A>, Option<B>)) -> Result<Self, Self::Error> {
match pair {
(Some(l), Some(r)) => Ok(Self::Both(l, r)),
(Some(l), None) => Ok(Self::Left(l)),
(None, Some(r)) => Ok(Self::Right(r)),
_ => Err(()),
}
}
}
13 changes: 13 additions & 0 deletions algebra-core/src/iso.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
pub trait Iso<T> {
fn iso_into(self) -> T;
fn iso_from(that: T) -> Self;
}

impl<A> Iso<A> for A {
fn iso_into(self) -> A {
self
}
fn iso_from(that: A) -> Self {
that
}
}
3 changes: 3 additions & 0 deletions algebra-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod combinators;
pub mod iso;

/// https://ncatlab.org/nlab/show/commutative+semigroup
pub trait CommutativeSemigroup {
fn combine(&self, that: &Self) -> Self;
Expand Down
3 changes: 2 additions & 1 deletion spectrum-crypto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ serde_with = "3.0.0"
elliptic-curve = "0.13.*"
k256 = { version = "0.13.*", features = ["serde"] }
libp2p-identity = { version = "0.1.*", features = ["peerid", "secp256k1"] }
libsecp256k1 = "0.7.1"
libsecp256k1 = "0.7.1"
async-trait = "0.1.68"
13 changes: 13 additions & 0 deletions spectrum-crypto/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use async_trait::async_trait;

pub mod digest;
mod hash;
pub mod pubkey;
Expand All @@ -6,3 +8,14 @@ pub mod pubkey;
pub trait VerifiableAgainst<P> {
fn verify(&self, public_data: &P) -> bool;
}

/// Proof that given statement `S` is verified.
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Debug)]
pub struct Verified<S>(pub S);

/// Some statement which can be verified against public data `P`.
#[async_trait]
pub trait AsyncVerifiable<P>: Send + Sync + Sized {
type Err: Send;
async fn verify(self, public_data: &P) -> Result<Verified<Self>, Self::Err>;
}
1 change: 1 addition & 0 deletions spectrum-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ libp2p = { version = "0.51.*", features = ["noise", "yamux", "async-std", "tcp"]
log4rs_test_utils = {version = "0.2.3", featuers = ["test_logging"]}
base16 = "0.2.1"
serde_yaml = "0.9.21"
itertools = "0.10.5"
1 change: 1 addition & 0 deletions spectrum-network/src/protocol_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod pool;
pub mod sigma_aggregation;
pub mod versioning;
pub mod void;
pub mod multicasting;

#[derive(Debug)]
pub enum NetworkAction<THandshake, TMessage> {
Expand Down
4 changes: 2 additions & 2 deletions spectrum-network/src/protocol_handler/cosi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ pub struct AnnouncementState<S> {
}

/// It basically waits for an announcement and then forwards it to it's subtrees.
impl<'d, S, R> TemporalProtocolStage<CoSiHandshake, CoSiMessage<S, R>, S> for CoSiAnnouncementStage<S, R>
impl<S, R> TemporalProtocolStage<CoSiHandshake, CoSiMessage<S, R>, S> for CoSiAnnouncementStage<S, R>
where
S: Eq + Clone + Send + Serialize + Deserialize<'d> + Debug,
S: Eq + Clone + Send + Serialize + Debug,
{
fn inject_message(&mut self, peer_id: PeerId, msg: CoSiMessage<S, R>) {
match msg {
Expand Down
139 changes: 69 additions & 70 deletions spectrum-network/src/protocol_handler/handel.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::cmp::{max, Ordering};
use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::future::Future;
use std::ops::{Add, Mul};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use async_std::task::sleep;
use either::{Either, Left, Right};
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use libp2p::PeerId;
use log::trace;

use algebra_core::CommutativePartialSemigroup;
use log::trace;
use spectrum_crypto::VerifiableAgainst;

use crate::protocol_handler::handel::message::HandelMessage;
Expand Down Expand Up @@ -86,9 +87,9 @@ pub struct HandelConfig {
pub window_shrinking_factor: usize,
pub initial_scoring_window: usize,
pub fast_path_window: usize,
pub dissemination_interval: Duration,
pub dissemination_delay: Duration,
pub level_activation_delay: Duration,
pub poll_fn_delay: Duration,
pub throttle_factor: u32,
}

/// A round of Handel protocol that drives aggregation of contribution `C`.
Expand All @@ -104,13 +105,13 @@ pub struct Handel<C, P, PP> {
/// Keeps track of the peers to whom we've sent our own contribution already.
own_contribution_recvs: HashSet<PeerIx>,
outbox: VecDeque<ProtocolBehaviourOut<VoidMessage, HandelMessage<C>>>,
next_dissemination_at: Instant,
level_activation_schedule: Vec<Option<Instant>>,
own_peer_ix: PeerIx,
/// Tracks peers who have indicated that they have completed particular contribution levels.
peers_completed_levels: HashMap<PeerIx, HashSet<u32>>,
/// We use a delay in the `poll` fn to prevent spinning.
delay: Option<Pin<Box<tokio::time::Sleep>>>,
next_processing: Option<Pin<Box<tokio::time::Sleep>>>,
next_dissemination: Pin<Box<tokio::time::Sleep>>,
next_activation: Pin<Box<tokio::time::Sleep>>,
}

impl<C, P, PP> Handel<C, P, PP>
Expand All @@ -133,7 +134,6 @@ where
});
levels[0] = Some(ActiveLevel::unit(own_contribution_scored.clone()));
levels[1] = Some(ActiveLevel::new(own_contribution_scored, 1));
let now = Instant::now();
Handel {
conf,
public_data,
Expand All @@ -144,44 +144,11 @@ where
byzantine_nodes: HashSet::new(),
own_contribution_recvs: HashSet::new(),
outbox: VecDeque::new(),
next_dissemination_at: now,
level_activation_schedule: vec![(); num_levels]
.into_iter()
.enumerate()
.map(|(i, _)| {
if i > 1 {
Some(now.add(conf.level_activation_delay.mul(i as u32)))
} else {
// 0'th and 1'st levels are already activated.
None
}
})
.collect(),
own_peer_ix,
peers_completed_levels: HashMap::default(),
delay: None,
}
}

fn try_disseminatate(&mut self) {
let now = Instant::now();
if now >= self.next_dissemination_at {
self.run_dissemination();
self.next_dissemination_at = now.add(self.conf.dissemination_interval);
}
}

fn try_activate_levels(&mut self) {
let now = Instant::now();
for (lvl, schedule) in self.level_activation_schedule.clone().into_iter().enumerate() {
if let Some(ts) = schedule {
if ts <= now {
self.try_activate_level(lvl);
self.level_activation_schedule[lvl] = None;
} else {
break;
}
}
next_processing: None,
next_dissemination: Box::pin(tokio::time::sleep(conf.dissemination_delay)),
next_activation: Box::pin(tokio::time::sleep(conf.level_activation_delay)),
}
}

Expand Down Expand Up @@ -575,6 +542,22 @@ where
self.levels.get(level).map(|l| l.is_some()).unwrap_or(false)
}

fn next_non_active_level(&self) -> Option<usize> {
self.levels
.iter()
.enumerate()
.take_while(|(_, l)| l.is_none())
.map(|(i, _)| i)
.max()
.and_then(|lvl| {
if lvl < self.peer_partitions.num_levels() {
Some(lvl)
} else {
None
}
})
}

fn get_own_contribution(&self) -> C {
self.levels[0]
.as_ref()
Expand Down Expand Up @@ -634,6 +617,8 @@ impl<C: Eq> Ord for ScoredContributionTraced<C> {
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Debug)]
struct Verified<C>(C);

const BASE_THROTTLE_DURATION: Duration = Duration::from_millis(1);

impl<C, P, PP> TemporalProtocolStage<VoidMessage, HandelMessage<C>, C> for Handel<C, P, PP>
where
C: CommutativePartialSemigroup + Weighted + VerifiableAgainst<P> + Clone + Eq + Debug,
Expand Down Expand Up @@ -662,37 +647,51 @@ where

fn poll(
&mut self,
cx: &mut Context<'_>,
cx: &mut Context,
) -> Poll<Either<ProtocolBehaviourOut<VoidMessage, HandelMessage<C>>, C>> {
if let Some(mut delay) = self.delay.take() {
match delay.poll_unpin(cx) {
Poll::Ready(_) => {
self.try_disseminatate();
self.try_activate_levels();
match self.next_dissemination.poll_unpin(cx) {
Poll::Ready(_) => {
self.run_dissemination();
self.next_dissemination = Box::pin(tokio::time::sleep(self.conf.dissemination_delay));
}
Poll::Pending => {}
}

if let Some(out) = self.outbox.pop_front() {
trace!(
"[Handel] {:?}: outbox.pop(), # outbox items left: {}",
self.own_peer_ix,
self.outbox.len()
);
return Poll::Ready(Left(out));
}
if let Some(ca) = self.get_complete_aggregate() {
Poll::Ready(Right(ca))
} else {
self.delay = Some(Box::pin(tokio::time::sleep(self.conf.poll_fn_delay)));
cx.waker().wake_by_ref();
Poll::Pending
}
match self.next_activation.poll_unpin(cx) {
Poll::Ready(_) => {
if let Some(lvl) = self.next_non_active_level() {
self.try_activate_level(lvl);
self.next_activation = Box::pin(tokio::time::sleep(self.conf.level_activation_delay));
}
}
Poll::Pending => {}
}

if let Some(mut delay) = self.next_processing.take() {
match delay.poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => {
self.delay = Some(delay);
Poll::Pending
self.next_processing = Some(delay);
return Poll::Pending;
}
}
}

if let Some(out) = self.outbox.pop_front() {
trace!(
"[Handel] {:?}: outbox.pop(), # outbox items left: {}",
self.own_peer_ix,
self.outbox.len()
);
self.next_processing = Some(Box::pin(tokio::time::sleep(BASE_THROTTLE_DURATION)));
return Poll::Ready(Left(out));
}
if let Some(ca) = self.get_complete_aggregate() {
Poll::Ready(Right(ca))
} else {
self.delay = Some(Box::pin(tokio::time::sleep(self.conf.poll_fn_delay)));
self.next_processing = Some(Box::pin(tokio::time::sleep(
BASE_THROTTLE_DURATION * self.conf.throttle_factor,
)));
cx.waker().wake_by_ref();
Poll::Pending
}
Expand Down Expand Up @@ -765,9 +764,9 @@ mod tests {
window_shrinking_factor: 2,
initial_scoring_window: 4,
fast_path_window: 4,
dissemination_interval: Duration::from_millis(2000),
dissemination_delay: Duration::from_millis(2000),
level_activation_delay: Duration::from_millis(400),
poll_fn_delay: Duration::from_millis(5),
throttle_factor: 5,
};

fn make_handel(
Expand Down