Skip to content

Commit

Permalink
feat(p2p): generic resquest/response type
Browse files Browse the repository at this point in the history
  • Loading branch information
elenaf9 committed Nov 27, 2020
1 parent e8d2a0b commit 5e24e64
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 345 deletions.
4 changes: 2 additions & 2 deletions engine/communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ authors = ["Elena Frank <elena.frank@iota.org>"]
edition = "2018"
license = "Apache-2.0 OR MIT"
readme = "README.md"
build = "build.rs"

[dependencies]
async-std = "1.6.2"
Expand All @@ -17,7 +16,8 @@ libp2p = {version = "0.28.1", default-features = false, features = ["dns", "iden
prost = {version = "0.6.1", default-features = false, features = ["prost-derive"] }
regex = "1.3.9"
thiserror = "1.0.21"
serde = {version = "1.0.117", features = ["derive"]}
serde = { version = "1.0.117", default-features = false, features = ["alloc", "derive"] }
serde_json = { version = "1.0.59", default-features = false, features = ["alloc"] }
riker = "0.4.1"

[features]
Expand Down
9 changes: 0 additions & 9 deletions engine/communication/build.rs

This file was deleted.

74 changes: 57 additions & 17 deletions engine/communication/examples/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,81 @@

use communication::{
actor::CommunicationActor,
message::{CommunicationEvent, ReqResEvent, Request, Response},
behaviour::{
message::{CommunicationEvent, ReqResEvent},
MessageEvent,
},
};
use core::time::Duration;
use libp2p::core::identity::Keypair;
use riker::actors::*;
use serde::{Deserialize, Serialize};

pub type Key = String;
pub type Value = String;

/// Indicates if a Request was received and / or the associated operation at the remote peer was successful
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RequestOutcome {
Success,
Error,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MailboxRecord {
key: String,
value: String,
}

impl MailboxRecord {
pub fn new(key: Key, value: Key) -> Self {
MailboxRecord { key, value }
}

pub fn key(&self) -> Key {
self.key.clone()
}
pub fn value(&self) -> Value {
self.value.clone()
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Request {
Ping,
PutRecord(MailboxRecord),
GetRecord(String),
}
impl MessageEvent for Request {}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Response {
Pong,
Outcome(RequestOutcome),
Record(MailboxRecord),
}
impl MessageEvent for Response {}

#[actor(CommunicationEvent)]
struct TestActor {
chan: ChannelRef<CommunicationEvent>,
chan: ChannelRef<CommunicationEvent<Request, Response>>,
}

impl ActorFactoryArgs<ChannelRef<CommunicationEvent>> for TestActor {
fn create_args(chan: ChannelRef<CommunicationEvent>) -> Self {
impl ActorFactoryArgs<ChannelRef<CommunicationEvent<Request, Response>>> for TestActor {
fn create_args(chan: ChannelRef<CommunicationEvent<Request, Response>>) -> Self {
TestActor { chan }
}
}

impl Actor for TestActor {
type Msg = TestActorMsg;
type Msg = CommunicationEvent<Request, Response>;

fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
let topic = Topic::from("swarm_inbound");
let sub = Box::new(ctx.myself());
self.chan.tell(Subscribe { actor: sub, topic }, None);
}

fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
self.receive(ctx, msg, sender);
}
}

impl Receive<CommunicationEvent> for TestActor {
type Msg = TestActorMsg;

fn receive(&mut self, ctx: &Context<Self::Msg>, msg: CommunicationEvent, _sender: Sender) {
fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, _sender: Sender) {
println!("{}: -> got msg: {:?}", ctx.myself.name(), msg);
if let CommunicationEvent::RequestResponse {
peer_id,
Expand All @@ -64,8 +104,8 @@ impl Receive<CommunicationEvent> for TestActor {
fn main() {
let local_keys = Keypair::generate_ed25519();
let sys = ActorSystem::new().unwrap();
let chan: ChannelRef<CommunicationEvent> = channel("remote-peer", &sys).unwrap();
sys.actor_of_args::<CommunicationActor, _>("communication-actor", (local_keys, chan.clone()))
let chan: ChannelRef<CommunicationEvent<Request, Response>> = channel("remote-peer", &sys).unwrap();
sys.actor_of_args::<CommunicationActor<Request, Response>, _>("communication-actor", (local_keys, chan.clone()))
.unwrap();
sys.actor_of_args::<TestActor, _>("test-actor", chan).unwrap();
std::thread::sleep(Duration::from_secs(600));
Expand Down
56 changes: 51 additions & 5 deletions engine/communication/examples/local-echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ use async_std::{
io::{stdin, BufReader, Stdin},
task,
};
use communication::{
behaviour::P2PNetworkBehaviour,
use communication::behaviour::{
error::QueryResult,
message::{CommunicationEvent, ReqResEvent, Request, Response},
message::{CommunicationEvent, ReqResEvent},
MessageEvent, P2PNetworkBehaviour,
};
use core::{
str::FromStr,
Expand All @@ -60,6 +60,52 @@ use libp2p::{
swarm::Swarm,
};
use regex::Regex;
use serde::{Deserialize, Serialize};

pub type Key = String;
pub type Value = String;

/// Indicates if a Request was received and / or the associated operation at the remote peer was successful
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RequestOutcome {
Success,
Error,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MailboxRecord {
key: String,
value: String,
}

impl MailboxRecord {
pub fn new(key: Key, value: Key) -> Self {
MailboxRecord { key, value }
}

pub fn key(&self) -> Key {
self.key.clone()
}
pub fn value(&self) -> Value {
self.value.clone()
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Request {
Ping,
PutRecord(MailboxRecord),
GetRecord(String),
}
impl MessageEvent for Request {}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Response {
Pong,
Outcome(RequestOutcome),
Record(MailboxRecord),
}
impl MessageEvent for Response {}

// Poll for user input
fn poll_stdin(stdin: &mut Lines<BufReader<Stdin>>, cx: &mut Context<'_>) -> Result<Option<String>, Box<dyn Error>> {
Expand All @@ -77,7 +123,7 @@ fn poll_stdin(stdin: &mut Lines<BufReader<Stdin>>, cx: &mut Context<'_>) -> Resu
fn listen() -> QueryResult<()> {
let local_keys = Keypair::generate_ed25519();
// Create a Swarm that implementes the Request-Reponse Protocl and mDNS
let mut swarm = P2PNetworkBehaviour::new(local_keys)?;
let mut swarm = P2PNetworkBehaviour::<Request, Response>::new(local_keys)?;
P2PNetworkBehaviour::start_listening(&mut swarm, None)?;
println!("Local PeerId: {:?}", Swarm::local_peer_id(&swarm));
let mut listening = false;
Expand Down Expand Up @@ -163,7 +209,7 @@ fn listen() -> QueryResult<()> {
Ok(())
}

fn handle_input_line(swarm: &mut Swarm<P2PNetworkBehaviour>, line: String) {
fn handle_input_line(swarm: &mut Swarm<P2PNetworkBehaviour<Request, Response>>, line: String) {
if let Some(peer_id) = Regex::new("PING\\s+\"(\\w+)\"")
.ok()
.and_then(|regex| regex.captures(&line))
Expand Down
58 changes: 52 additions & 6 deletions engine/communication/examples/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@

use async_std::task;
use clap::{load_yaml, App, ArgMatches};
use communication::{
behaviour::P2PNetworkBehaviour,
use communication::behaviour::{
error::QueryResult,
message::{CommunicationEvent, MailboxRecord, ReqResEvent, Request, RequestOutcome, Response},
message::{CommunicationEvent, ReqResEvent},
MessageEvent, P2PNetworkBehaviour,
};
use core::{
str::FromStr,
Expand All @@ -83,8 +83,54 @@ use libp2p::{
multiaddr::Multiaddr,
swarm::Swarm,
};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

pub type Key = String;
pub type Value = String;

/// Indicates if a Request was received and / or the associated operation at the remote peer was successful
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RequestOutcome {
Success,
Error,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MailboxRecord {
key: String,
value: String,
}

impl MailboxRecord {
pub fn new(key: Key, value: Key) -> Self {
MailboxRecord { key, value }
}

pub fn key(&self) -> Key {
self.key.clone()
}
pub fn value(&self) -> Value {
self.value.clone()
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Request {
Ping,
PutRecord(MailboxRecord),
GetRecord(String),
}
impl MessageEvent for Request {}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Response {
Pong,
Outcome(RequestOutcome),
Record(MailboxRecord),
}
impl MessageEvent for Response {}

// only used for this CLI
struct Matches {
mail_id: PeerId,
Expand Down Expand Up @@ -123,7 +169,7 @@ fn run_mailbox(matches: &ArgMatches) -> QueryResult<()> {
let local_keys = Keypair::generate_ed25519();

// Create swarm for communication
let mut swarm = P2PNetworkBehaviour::new(local_keys)?;
let mut swarm = P2PNetworkBehaviour::<Request, Response>::new(local_keys)?;
P2PNetworkBehaviour::start_listening(&mut swarm, Some("/ip4/0.0.0.0/tcp/16384".parse().unwrap()))?;
println!("Local PeerId: {:?}", Swarm::local_peer_id(&swarm));
let mut listening = false;
Expand Down Expand Up @@ -200,7 +246,7 @@ fn put_record(matches: &ArgMatches) -> QueryResult<()> {
{
let local_keys = Keypair::generate_ed25519();
// Create swarm for communication
let mut swarm = P2PNetworkBehaviour::new(local_keys)?;
let mut swarm = P2PNetworkBehaviour::<Request, Response>::new(local_keys)?;
println!("Local PeerId: {:?}", Swarm::local_peer_id(&swarm));

// Connect to a remote mailbox on the server.
Expand Down Expand Up @@ -264,7 +310,7 @@ fn get_record(matches: &ArgMatches) -> QueryResult<()> {
let local_keys = Keypair::generate_ed25519();

// Create swarm for communication
let mut swarm = P2PNetworkBehaviour::new(local_keys)?;
let mut swarm = P2PNetworkBehaviour::<Request, Response>::new(local_keys)?;
println!("Local PeerId: {:?}", Swarm::local_peer_id(&swarm));

let mut original_id = None;
Expand Down
36 changes: 15 additions & 21 deletions engine/communication/src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
// Copyright 2020 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use crate::{
behaviour::P2PNetworkBehaviour,
use crate::behaviour::{
message::{CommunicationEvent, ReqResEvent},
MessageEvent, P2PNetworkBehaviour,
};
use async_std::task;
use core::task::{Context as TaskContext, Poll};
use futures::{channel::mpsc, future, prelude::*};
use libp2p::{core::identity::Keypair, swarm::Swarm};
use riker::actors::*;

pub enum CommActorEvent {
Message(CommunicationEvent),
pub enum CommActorEvent<T, U> {
Message(CommunicationEvent<T, U>),
Shutdown,
}

#[actor(CommunicationEvent)]
pub struct CommunicationActor {
chan: ChannelRef<CommunicationEvent>,
pub struct CommunicationActor<T: MessageEvent, U: MessageEvent> {
chan: ChannelRef<CommunicationEvent<T, U>>,
keypair: Keypair,
swarm_tx: Option<mpsc::Sender<CommActorEvent>>,
swarm_tx: Option<mpsc::Sender<CommActorEvent<T, U>>>,
poll_swarm_handle: Option<future::RemoteHandle<()>>,
}

impl ActorFactoryArgs<(Keypair, ChannelRef<CommunicationEvent>)> for CommunicationActor {
fn create_args((keypair, chan): (Keypair, ChannelRef<CommunicationEvent>)) -> Self {
impl<T: MessageEvent, U: MessageEvent> ActorFactoryArgs<(Keypair, ChannelRef<CommunicationEvent<T, U>>)>
for CommunicationActor<T, U>
{
fn create_args((keypair, chan): (Keypair, ChannelRef<CommunicationEvent<T, U>>)) -> Self {
Self {
chan,
keypair,
Expand All @@ -35,8 +36,8 @@ impl ActorFactoryArgs<(Keypair, ChannelRef<CommunicationEvent>)> for Communicati
}
}

impl Actor for CommunicationActor {
type Msg = CommunicationActorMsg;
impl<T: MessageEvent, U: MessageEvent> Actor for CommunicationActor<T, U> {
type Msg = CommunicationEvent<T, U>;

fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
let topic = Topic::from("swarm_outbound");
Expand All @@ -47,7 +48,7 @@ impl Actor for CommunicationActor {
fn post_start(&mut self, ctx: &Context<Self::Msg>) {
let (swarm_tx, mut swarm_rx) = mpsc::channel(16);
self.swarm_tx = Some(swarm_tx);
let mut swarm = P2PNetworkBehaviour::new(self.keypair.clone()).unwrap();
let mut swarm = P2PNetworkBehaviour::<T, U>::new(self.keypair.clone()).unwrap();
P2PNetworkBehaviour::start_listening(&mut swarm, None).unwrap();
let topic = Topic::from("swarm_inbound");
let chan = self.chan.clone();
Expand Down Expand Up @@ -126,14 +127,7 @@ impl Actor for CommunicationActor {
}
}

fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
self.receive(ctx, msg, sender);
}
}

impl Receive<CommunicationEvent> for CommunicationActor {
type Msg = CommunicationActorMsg;
fn receive(&mut self, _ctx: &Context<Self::Msg>, msg: CommunicationEvent, _sender: Sender) {
fn recv(&mut self, _ctx: &Context<Self::Msg>, msg: Self::Msg, _sender: Sender) {
if let Some(tx) = self.swarm_tx.as_mut() {
task::block_on(future::poll_fn(move |tcx: &mut TaskContext<'_>| {
match tx.poll_ready(tcx) {
Expand Down
Loading

0 comments on commit 5e24e64

Please sign in to comment.