Skip to content

Commit

Permalink
cycylon, vicinity, ringcast, poldercast
Browse files Browse the repository at this point in the history
  • Loading branch information
tg-x committed Oct 27, 2019
0 parents commit 6f54891
Show file tree
Hide file tree
Showing 67 changed files with 4,925 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
@@ -0,0 +1,5 @@
*~
.*
\#*
_build
*.install
660 changes: 660 additions & 0 deletions LICENSE.md

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions Makefile
@@ -0,0 +1,15 @@
.PHONY: all build doc test clean

all: build doc test

build:
dune build

doc:
dune build @doc

test:
dune runtest -f --no-buffer -j 1

clean:
dune clean
56 changes: 56 additions & 0 deletions README.md
@@ -0,0 +1,56 @@
[![Build Status](https://travis-ci.org/p2pcollab/ocaml-p2p.svg?branch=master)](https://travis-ci.org/p2pcollab/ocaml-p2p)

# Gossip-based protocols for P2P collaboration

P2Pcollab is a collacection of composable libraries
implementing gossip-based protocols for P2P collaboration.

These libraries are distributed under the AGPL-3.0-only license.

## Modules

- PolderCast: P2P topic-based pub/sub
- RingCast: P2P hybrid dissemination
- VICINITY: P2P clustering & topology management
- CYCLON: Random Peer Sampling

## Installation

The libraries can be installed via `opam`:

opam install p2p
opam install p2p-cyclon
opam install p2p-cyclon-lwt
opam install p2p-vicinity
opam install p2p-vicinity-lwt
opam install p2p-ringcast
opam install p2p-ringcast-lwt
opam install p2p-poldercast
opam install p2p-poldercast-lwt

## Building

To build from source, generate documentation, and run tests, use `dune`:

dune build
dune build @doc
dune runtest -f -j1 --no-buffer

In addition, the following `Makefile` targets are available
as a shorthand for the above:

make
make build
make doc
make test

## Documentation

The documentation and API reference is generated from the source interfaces.
It can be consulted [online][doc] or via `odig`, e.g.:

odig doc p2p
odig doc p2p-cyclon
...

[doc]: https://p2pcollab.net/doc/ocaml/
2 changes: 2 additions & 0 deletions dune-project
@@ -0,0 +1,2 @@
(lang dune 1.3)
(name gossip)
5 changes: 5 additions & 0 deletions lib/p2p-cyclon-lwt/dune
@@ -0,0 +1,5 @@
(library
(name p2p_cyclon_lwt)
(public_name p2p-cyclon-lwt)
(libraries p2p p2p-cyclon lwt lwt.unix)
(preprocess (pps lwt_ppx)))
141 changes: 141 additions & 0 deletions lib/p2p-cyclon-lwt/p2p_cyclon_lwt.ml
@@ -0,0 +1,141 @@
(*
Copyright (C) 2019 TG x Thoth
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*)

(**
{1 CYCLON with Lwt}
High-level library implementing the CYCLON protocol using Lwt.
*)

(** Functor building an implementation of Cyclon with Lwt
given a [Node_id], [Node], gossip [View], [Cyclon] implementation,
and an [Io] event handler module. *)
module Make
(Node_id: P2p.S.NODE_ID)
(Node : P2p.S.NODE with type nid := Node_id.t)
(View : P2p.S.VIEW with type nid := Node_id.t
and type node := Node.t)
(Cyclon : P2p.S.GOSSIP with type node := Node.t
and type view := View.t)
(Io : S.CYCLON_IO with type nid := Node_id.t
and type node := Node.t
and type view := View.t)
: S.CYCLON with type nid := Node_id.t
and type node := Node.t
and type view := View.t
and type io := Io.t = struct

module Gossip = Cyclon

type t = {
me : Node.t; (** this node *)
mutable view : View.t; (** partial view of gossip protocol *)
view_len : int; (** max view length *)
xchg_len : int; (** max exchange length *)
period : float; (** gossip period in seconds *)
io : Io.t;
mutable stop : unit Lwt.t option;
mutable stopper : unit Lwt.u option;
}

let init ~me ~view ~view_len ~xchg_len ~period ~io =
{ me; view; view_len; xchg_len; period; io;
stop = None; stopper = None }

let view t = t.view

(** wait for [delay] seconds,
then return the result of thread [t],
or cancel it if not finished yet **)
let timeout delay stop t =
let%lwt _ = Lwt.choose [ Lwt_unix.sleep delay; stop ] in
match Lwt.state t with
| Lwt.Sleep -> Lwt.cancel t; Lwt.return None
| Lwt.Return v -> Lwt.return (Some v)
| Lwt.Fail ex -> Lwt.fail ex

(** initiate exchange with a node from [t.view],
wait for response, and return merged view *)
let initiate t dst sent view =
match dst with
| (Some dst) ->
let%lwt recvd = Io.initiate_gossip t.io dst sent in
let%lwt recvd = Io.gossip_recvd t.io t.me recvd t.view in
t.view <- Gossip.merge ~view ~view_len:t.view_len ~me:t.me
~sent ~recvd ~xchg_len:t.xchg_len;
let%lwt _ = Io.view_updated t.io t.me t.view in
Lwt.return t.view
| _ ->
Lwt.return t.view

(** run initiator:
pick a random node from [t.view] to gossip with every [t.period] seconds *)
let run ?stop t =
match t.stop with
| Some stop -> stop
| None ->
let stop =
match stop with
| None ->
let (stop, stopper) = Lwt.wait () in
t.stopper <- Some stopper;
t.stop <- Some stop;
stop
| Some stop ->
t.stop <- Some stop;
stop in

let rec loop () =
let (dst, sent, view_before) =
Gossip.initiate ~me:t.me ~view:t.view ~xview:View.empty
~xchg_len:t.xchg_len in
let%lwt view_after = timeout t.period stop
(initiate t dst sent view_before) in
let _ = t.view <- match view_after with
| Some v -> v
| _ -> view_before in
match Lwt.state stop with
| Lwt.Sleep -> loop ()
| _ -> Lwt.return_unit
in loop ()

let shutdown t =
match t.stopper with
| Some stopper ->
Lwt.wakeup_later stopper ();
t.stop <- None;
t.stopper <- None
| None ->
match t.stop with
| Some stop ->
Lwt.cancel stop;
t.stop <- None
| None -> ()

(** merge received entries from a node and send response *)
let respond t src recvd =
let sent = Gossip.respond ~view:t.view ~xview:View.empty
~recvd ~src ~me:t.me ~xchg_len:t.xchg_len in
let%lwt _ = Io.respond_gossip t.io src sent in
let%lwt recvd = Io.gossip_recvd t.io src recvd t.view in
t.view <- Gossip.merge ~view:t.view ~view_len:t.view_len
~sent ~recvd ~xchg_len:t.xchg_len ~me:t.me;
Lwt.return t.view

end

(** Signatures *)
module S = S
86 changes: 86 additions & 0 deletions lib/p2p-cyclon-lwt/s.ml
@@ -0,0 +1,86 @@
(*
Copyright (C) 2019 TG x Thoth
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*)

module type CYCLON = sig
type t
type nid
type node
type view
type io

val init : me:node -> view:view -> view_len:int -> xchg_len:int
-> period:float -> io:io -> t
(** [init node view view_len xchg_len period io]
initializes a CYCLON instance with the following configuration:
- [me] - this node
- [view] - initial view
- [view_len] - max view length
- [xchg_len] - number of entries to exchange at each period
- [period] - gossip period, in seconds
- [io]
*)

val run : ?stop:unit Lwt.t -> t -> unit Lwt.t
(** [run t] runs initiator thread:
picks a random node from [view] to gossip with
every [period] seconds.
If [?stop] is provided, this initiator thread returns
as soon as the [stop] thread is fulfilled. *)

val shutdown : t -> unit
(** [shutdown t] stops initiator thread.
In case [run] was called with a [stop] argument,
the [stop] thread is cancelled, otherwise it is fulfilled. *)

val respond : t -> node -> view -> view Lwt.t
(** [respond t src recvd]
merges received entries from a node and sends response *)

val view : t -> view
(** [view t] returns current view *)
end


module type CYCLON_IO = sig
type t
type nid
type node
type view

val initiate_gossip : t -> node -> view -> view Lwt.t
(** [initiate_gossip dst xchg]
sends [xchg] entries to [node]
and returns response *)

val respond_gossip : t -> node -> view -> unit Lwt.t
(** [respond_gossip t src xchg]
sends [xchg] entries in response to [node] *)

val gossip_recvd : t -> node -> view -> view -> view Lwt.t
(** [gossip_recvd t src recvd view]
is called after entries are received during an exchange;
allows rewriting [recvd] entries with the returned value,
thus allows using a stream sampler such as URPS
to provide uniformly random nodes
instead of the possibly biased exchanged nodes *)

val view_updated : t -> node -> view -> unit Lwt.t
(** [view_updated node view]
is called when [view] has been updated
after a gossip exchange with [node] *)
end
4 changes: 4 additions & 0 deletions lib/p2p-cyclon/dune
@@ -0,0 +1,4 @@
(library
(name p2p_cyclon)
(public_name p2p-cyclon)
(libraries p2p))
75 changes: 75 additions & 0 deletions lib/p2p-cyclon/p2p_cyclon.ml
@@ -0,0 +1,75 @@
(*
Copyright (C) 2019 TG x Thoth
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*)

(** Functor building an implementation of Cyclon
given a [Node_id], [Node], and gossip [View]. *)
module Make
(Node_id : P2p.S.NODE_ID)
(Node : P2p.S.NODE with type nid := Node_id.t)
(View : P2p.S.VIEW with type nid := Node_id.t
and type node := Node.t)
: P2p.S.GOSSIP with type node := Node.t
and type view := View.t = struct

let initiate ~view ~xview ~me ~xchg_len =
let dst = View.oldest view in
match dst with
| Some dst ->
let view = View.remove (Node.id dst) view in
let view = View.incr_age view in
let uview = View.union view xview in
let xchg = View.random_subset (xchg_len - 1) uview in
let xchg = View.add me xchg in
(Some dst, xchg, view)
| None -> (* view empty *)
(None, View.empty, view)

let respond ~view ~xview ~recvd ~src ~me ~xchg_len =
let _recvd = recvd and _src = src and _me = me in
let uview = View.union view xview in
View.random_subset xchg_len uview

let merge ~view ~view_len ~sent ~recvd ~xchg_len ~me =
let sent = View.remove (Node.id me) sent in
let recvd = View.remove (Node.id me) recvd in
let recvd = View.random_subset xchg_len recvd in
let recvd = View.zero_age recvd in
let rec merge view sent recvd =
if 0 < View.length recvd then
match View.random recvd with
| (Some rnode) ->
if View.length view < view_len then
(* fill an empty slot in view *)
let view = View.add rnode view in
let recvd = View.remove (Node.id rnode) recvd in
merge view sent recvd
else (* replace a sent entry in view with a received one *)
(match View.random sent with
| Some snode ->
let view = View.add rnode view in
let view =
if view_len < View.length view
then View.remove (Node.id snode) view
else view in
let sent = View.remove (Node.id snode) sent in
let recvd = View.remove (Node.id rnode) recvd in
merge view sent recvd
| _ -> view)
| _ -> view
else view in
merge view sent recvd

end

0 comments on commit 6f54891

Please sign in to comment.