Skip to content
This repository has been archived by the owner on Jan 6, 2020. It is now read-only.

Commit

Permalink
Merge pull request #3 from canndrew/add-in-out-streams
Browse files Browse the repository at this point in the history
Add InStream and OutStream
  • Loading branch information
vinipsmaker committed Apr 28, 2016
2 parents 38dad8a + 24b72ec commit 0f3bbd0
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mod test_utils;
pub use protocol::Protocol;
pub use endpoint::{Endpoint, ToEndpoints};
pub use listen_endpoint::{ListenEndpoint, ToListenEndpoints};
pub use stream::{Stream, StreamInfo};
pub use stream::{Stream, OutStream, InStream, StreamInfo};
pub use listener::Listener;
pub use rendezvous_info::{PubRendezvousInfo, PrivRendezvousInfo};

4 changes: 4 additions & 0 deletions src/rendezvous_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@ use endpoint::Endpoint;

pub const RENDEZVOUS_INFO_EXPIRY_DURATION_SECS: u64 = 300;

#[derive(Debug)]
pub struct PrivTcpInfo {
pub socket: net2::TcpBuilder,
pub info: nat_traversal::PrivRendezvousInfo,
}

#[derive(Debug)]
pub struct PrivUdpInfo {
pub socket: UdpSocket,
pub info: nat_traversal::PrivRendezvousInfo,
}

/// The private half of a rendezvous info pair. Used to perform rendezvous connections.
#[derive(Debug)]
pub struct PrivRendezvousInfo {
#[doc(hidden)]
pub priv_tcp_info: Option<PrivTcpInfo>,
Expand All @@ -32,6 +35,7 @@ pub struct PrivRendezvousInfo {

/// The public half of a rendezvous info pair. Share this object with the remote peer and use their
/// `PubRendezvousInfo` to perform a rendezvous connect.
#[derive(Debug, RustcEncodable, RustcDecodable)]
pub struct PubRendezvousInfo {
#[doc(hidden)]
pub pub_tcp_info: Option<nat_traversal::PubRendezvousInfo>,
Expand Down
121 changes: 109 additions & 12 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use rendezvous_info::{PubRendezvousInfo, PrivRendezvousInfo, PrivTcpInfo, PrivUd
RENDEZVOUS_INFO_EXPIRY_DURATION_SECS};

/// Contains protocol information about a stream. See the `StreamInfo` type for more info.
#[derive(Debug, Clone)]
pub enum StreamProtocolInfo {
/// A TCP stream.
Tcp {
Expand All @@ -45,6 +46,7 @@ pub enum StreamProtocolInfo {
}

/// Contains info about a `Stream`. The result of calling `Stream::info`.
#[derive(Debug, Clone)]
pub struct StreamInfo {
/// Protocol information about the stream.
pub protocol: StreamProtocolInfo,
Expand Down Expand Up @@ -89,7 +91,7 @@ struct Buffer {
}

/// Contains the inner, protocol-specific part of the stream.
enum StreamInner {
enum InStreamInner {
Tcp {
stream: TcpStream,
local_addr: SocketAddr,
Expand All @@ -103,11 +105,28 @@ enum StreamInner {
*/
}

/// A transport-agnostic connection to a remote peer.
pub struct Stream {
protocol_inner: StreamInner,
struct OutStreamInner {
buffer: Arc<Buffer>,
_writer_thread: RaiiThreadJoiner,
}

/// A transport-agnostic connection to a remote peer.
pub struct Stream {
in_stream: InStreamInner,
out_stream: OutStreamInner,
connection_id: u64,
}

/// A transport-agnostic outgoing connection to a remote peer.
pub struct OutStream {
inner: OutStreamInner,
stream_protocol_info: StreamProtocolInfo,
connection_id: u64,
}

/// A transport-agnostic incoming connection to a remote peer.
pub struct InStream {
inner: InStreamInner,
connection_id: u64,
}

Expand Down Expand Up @@ -427,11 +446,40 @@ impl fmt::Display for StreamRendezvousConnectError {
}
}

impl InStream {
/// Retreive information about this stream.
pub fn info(&self) -> StreamInfo {
match self.inner {
InStreamInner::Tcp {
local_addr,
peer_addr,
..
} => StreamInfo {
protocol: StreamProtocolInfo::Tcp {
local_addr: local_addr,
peer_addr: peer_addr,
},
connection_id: self.connection_id,
}
}
}
}

impl OutStream {
/// Retreive information about this stream.
pub fn info(&self) -> StreamInfo {
StreamInfo {
protocol: self.stream_protocol_info.clone(),
connection_id: self.connection_id,
}
}
}

impl Stream {
/// Retreive information about this stream.
pub fn info(&self) -> StreamInfo {
match self.protocol_inner {
StreamInner::Tcp {
match self.in_stream {
InStreamInner::Tcp {
local_addr,
peer_addr,
..
Expand All @@ -445,6 +493,21 @@ impl Stream {
}
}

/// Split the `Stream` into a writer and reader pair.
pub fn split(self) -> (OutStream, InStream) {
let info = self.info();
let in_stream = InStream {
inner: self.in_stream,
connection_id: self.connection_id,
};
let out_stream = OutStream {
inner: self.out_stream,
stream_protocol_info: info.protocol,
connection_id: self.connection_id,
};
(out_stream, in_stream)
}

/// Generate rendezvous info which can be used to perform a rendezvous connection.
pub fn gen_rendezvous_info(mc: &MappingContext, deadline: Instant)
-> (PrivRendezvousInfo, PubRendezvousInfo, StreamGenRendezvousInfoDiagnostics)
Expand Down Expand Up @@ -872,13 +935,15 @@ impl Stream {
};
}));
Ok(Stream {
protocol_inner: StreamInner::Tcp {
in_stream: InStreamInner::Tcp {
stream: stream,
local_addr: SocketAddr(local_addr),
peer_addr: SocketAddr(peer_addr),
},
buffer: buffer,
_writer_thread: writer_thread,
out_stream: OutStreamInner {
buffer: buffer,
_writer_thread: writer_thread,
},
connection_id: connection_id,
})
}
Expand All @@ -889,7 +954,7 @@ impl Stream {
*/
}

impl Drop for Stream {
impl Drop for OutStreamInner {
fn drop(&mut self) {
let mut inner = unwrap_result!(self.buffer.inner.lock());

Expand All @@ -903,15 +968,47 @@ impl Drop for Stream {

impl Read for Stream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.protocol_inner {
StreamInner::Tcp { ref mut stream, .. } => {
self.in_stream.read(buf)
}
}

impl Read for InStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}

impl Read for InStreamInner {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self {
InStreamInner::Tcp { ref mut stream, .. } => {
stream.read(buf)
},
}
}
}

impl Write for Stream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.out_stream.write(buf)
}

fn flush(&mut self) -> io::Result<()> {
self.out_stream.flush()
}
}

impl Write for OutStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}

fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}

impl Write for OutStreamInner {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut inner = unwrap_result!(self.buffer.inner.lock());
if let Some(e) = inner.error.take() {
Expand Down

0 comments on commit 0f3bbd0

Please sign in to comment.