Skip to content

Commit

Permalink
feat(streaming): first iteration of streaming protocol
Browse files Browse the repository at this point in the history
- Separates out RequestResponseProtocol and StreamingProtocol, both of
  which rely on libp2p::request_response::RequestResponseProtocol
- RequestResponseProtocol has RequestEvent::Ping and ResponseEvent::Pong
- StreamingProtocol has:
  - DataRequest
  - Header
  - Packet
  - StreamError
  - Ack
- The streaming protocol uses the data request and header response as a
  normal request response cycle, coordinating an ID over which two
"stream" the data file
- The source location sends Packet requests to the original requester,
  which sends Ack responses in response
- We currently have no way to end streams early or handle it properly if
  packets get lost in transfer, or if there is some disruption on the
stream. In all cases, we error as soon as there is a problem, and remove
the stream id from the `active_streams` map
  • Loading branch information
ramfox committed May 11, 2022
1 parent 81673e1 commit b55e00e
Show file tree
Hide file tree
Showing 12 changed files with 1,341 additions and 535 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/target
/iroh_gateway/test_files/*
/iroh-rpc/test_data/*
.env
Cargo.lock
Cargo.lock
5 changes: 5 additions & 0 deletions iroh-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@ tokio = { version = "1.17.0", features = ["full"] }
futures = "0.3.21"
async-trait = "0.1.53"
bytecheck = "0.6.7"
tempfile = "3.3.0"
smallvec = "1.8.0"
rand = "0.8.5"
log = "0.4.16"
pretty_env_logger = "0.4.0"
Binary file added iroh-rpc/src/.DS_Store
Binary file not shown.
46 changes: 46 additions & 0 deletions iroh-rpc/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use libp2p::NetworkBehaviour;

use crate::request_response::{
new_request_response_behaviour, RequestResponse, RequestResponseEvent,
};
use crate::streaming::{new_streaming_behaviour, Streaming, StreamingEvent};

#[derive(NetworkBehaviour)]
#[behaviour(out_event = "CoreEvent")]
pub struct CoreBehaviour {
pub request_response: RequestResponse,
pub streaming: Streaming,
}

impl CoreBehaviour {
pub fn new() -> Self {
CoreBehaviour {
request_response: new_request_response_behaviour(),
streaming: new_streaming_behaviour(),
}
}
}

impl Default for CoreBehaviour {
fn default() -> Self {
Self::new()
}
}

#[derive(Debug)]
pub enum CoreEvent {
RequestResponse(RequestResponseEvent),
Streaming(StreamingEvent),
}

impl From<RequestResponseEvent> for CoreEvent {
fn from(event: RequestResponseEvent) -> Self {
CoreEvent::RequestResponse(event)
}
}

impl From<StreamingEvent> for CoreEvent {
fn from(event: StreamingEvent) -> Self {
CoreEvent::Streaming(event)
}
}
85 changes: 85 additions & 0 deletions iroh-rpc/src/commands.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use futures::channel::{mpsc, oneshot};
use libp2p::core::connection::ListenerId;
use libp2p::request_response::RequestId;
use libp2p::Multiaddr;
use libp2p::PeerId;
use std::collections::HashMap;
use std::error::Error;

use crate::stream::{Header, Packet, StreamType};
use crate::streaming::StreamingResponseChannel;

// OutCommands are commands from the Client going out to the server or network
// They should include a sender on which the server will send the response from
// the network to the client
pub enum OutCommand {
StartListening {
addr: Multiaddr,
sender: OneshotSender,
},
Dial {
peer_id: PeerId,
peer_addr: Multiaddr,
sender: OneshotSender,
},
Ping {
peer_id: PeerId,
sender: OneshotSender,
},
DataRequest {
id: u64,
path: String,
peer_id: PeerId,
sender: OneshotSender,
},
HeaderResponse {
header: Header,
channel: StreamingResponseChannel,
},
SendPacket {
peer_id: PeerId,
packet: Packet,
sender: OneshotSender,
},
CloseStream {
id: u64,
},
PeerId {
sender: OneshotSender,
},
}

// InCommands are commands from the Network to the client, passed by the Server
pub enum InCommand {
DataRequest {
id: u64,
peer_id: PeerId,
path: String,
},
}

pub type OneshotSender = oneshot::Sender<SenderType>;

#[derive(Debug)]
pub enum SenderType {
Ack,
File(Vec<u8>),
PeerId(PeerId),
Multiaddr(Multiaddr),
Stream {
header: Header,
stream: mpsc::Receiver<StreamType>,
},
Error(Box<dyn Error + Send + Sync>),
}

pub type PendingMap = HashMap<PendingId, OneshotSender>;

#[derive(PartialEq, Eq, Hash)]
pub enum PendingId {
PeerId(PeerId),
RequestId(RequestId),
ListenerId(ListenerId),
}

pub type ActiveStreams = HashMap<u64, mpsc::Sender<StreamType>>;

0 comments on commit b55e00e

Please sign in to comment.