148 changes: 148 additions & 0 deletions src/context.rs
@@ -0,0 +1,148 @@
use crate::event::Event;
use crate::wire_msg::WireMsg;
use std::cell::RefCell;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::net::SocketAddr;
use std::sync::mpsc::Sender;

thread_local! {
pub static CTX: RefCell<Option<Context>> = RefCell::new(None);
}

/// Initialise `Context`. This will panic if the context has already been initialised for the
/// current thread context.
pub fn initialise_ctx(context: Context) {
CTX.with(|ctx_refcell| {
let mut ctx = ctx_refcell.borrow_mut();
if ctx.is_some() {
panic!("Context already initialised !");
} else {
*ctx = Some(context);
}
})
}

/// Check if the `Context` is already initialised for the current thread context.
#[allow(unused)]
pub fn is_ctx_initialised() -> bool {
CTX.with(|ctx_refcell| {
let ctx = ctx_refcell.borrow();
ctx.is_some()
})
}

/// Obtain a referece to the `Context`. This will panic if the `Context` has not be set in the
/// current thread context.
pub fn ctx<F, R>(f: F) -> R
where
F: FnOnce(&Context) -> R,
{
CTX.with(|ctx_refcell| {
let ctx = ctx_refcell.borrow();
if let Some(ctx) = ctx.as_ref() {
f(ctx)
} else {
panic!("Context not initialised !");
}
})
}

/// Obtain a mutable referece to the `Context`. This will panic if the `Context` has not be set in
/// the current thread context.
pub fn ctx_mut<F, R>(f: F) -> R
where
F: FnOnce(&mut Context) -> R,
{
CTX.with(|ctx_refcell| {
let mut ctx = ctx_refcell.borrow_mut();
if let Some(ctx) = ctx.as_mut() {
f(ctx)
} else {
panic!("Context not initialised !");
}
})
}

pub struct Context {
pub event_tx: Sender<Event>,
pub connections: HashMap<SocketAddr, Connection>,
pub our_ext_addr_tx: Option<Sender<SocketAddr>>,
quic_ep: quinn::Endpoint,
}

impl Context {
pub fn new(event_tx: Sender<Event>, quic_ep: quinn::Endpoint) -> Self {
Self {
event_tx,
connections: Default::default(),
our_ext_addr_tx: Default::default(),
quic_ep,
}
}

pub fn quic_ep(&self) -> &quinn::Endpoint {
&self.quic_ep
}
}

#[derive(Debug, Default)]
pub struct Connection {
pub to_peer: ToPeer,
pub from_peer: ConnectionStatus,
}

#[derive(Debug, Default)]
pub struct ToPeer {
pub status: ConnectionStatus,
pub pending_sends: VecDeque<WireMsg>,
}

pub enum ConnectionStatus {
NoConnection,
Initiated,
Established(quinn::Connection),
}

impl ConnectionStatus {
pub fn is_no_connection(&self) -> bool {
if let ConnectionStatus::NoConnection = *self {
true
} else {
false
}
}

pub fn is_initiated(&self) -> bool {
if let ConnectionStatus::Initiated = *self {
true
} else {
false
}
}

#[allow(unused)]
pub fn is_established(&self) -> bool {
if let ConnectionStatus::Established(_) = *self {
true
} else {
false
}
}
}

impl Default for ConnectionStatus {
fn default() -> Self {
ConnectionStatus::NoConnection
}
}

impl fmt::Debug for ConnectionStatus {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ConnectionStatus::NoConnection => write!(f, "ConnectionStatus::NoConnection"),
ConnectionStatus::Initiated => write!(f, "ConnectionStatus::Initiated"),
ConnectionStatus::Established(_) => write!(f, "ConnectionStatus::Established"),
}
}
}
18 changes: 18 additions & 0 deletions src/error.rs
@@ -0,0 +1,18 @@
use std::net::SocketAddr;

quick_error! {
#[derive(Debug)]
pub enum Error {
ConnectError(e: quinn::ConnectError) {
display("Connection Error: {}", e)
from()
}
ConnectionError(e: quinn::ConnectionError) {
display("Connection Error: {}", e)
from()
}
DuplicateConnectionToPeer(peer_addr: SocketAddr) {
display("Duplicate connection attempted to peer {}", peer_addr)
}
}
}
5 changes: 3 additions & 2 deletions src/event.rs
Expand Up @@ -3,6 +3,7 @@ use std::net::SocketAddr;
/// Crust Events to the user
#[derive(Serialize, Deserialize, Debug)]
pub enum Event {
ConnectionFailure { peer: SocketAddr },
NewMessage { peer: SocketAddr, msg: Vec<u8> },
ConnectionFailure { peer_addr: SocketAddr },
ConnectedTo { peer_addr: SocketAddr },
NewMessage { peer_addr: SocketAddr, msg: Vec<u8> },
}
79 changes: 24 additions & 55 deletions src/event_loop.rs
@@ -1,26 +1,33 @@
use crate::event::Event;
use futures::{Future, Stream};
use std::cell::RefCell;
use futures::Stream;
use std::fmt;
use std::rc::Rc;
use std::sync::mpsc::Sender;
use std::thread::{self, JoinHandle};
use tokio::runtime::current_thread;
use tokio::sync::mpsc::{self, UnboundedSender};

/// Post messages to event loop
pub fn post<F>(tx: &mut UnboundedSender<EventLoopMsg>, f: F)
where
F: FnOnce() + Send + 'static,
{
let msg = EventLoopMsg::new(f);
if let Err(e) = tx.try_send(msg) {
println!("Error posting messages to event loop: {:?}", e);
}
}

/// Message that event loop can accept in order to be requested to do something
pub struct EventLoopMsg(Option<Box<FnMut(&EventLoopState) + Send>>);
pub struct EventLoopMsg(Option<Box<FnMut() + Send>>);

impl EventLoopMsg {
/// Create a new message to be posted to the event loop
pub fn new<F>(f: F) -> Self
where
F: FnOnce(&EventLoopState) + Send + 'static,
F: FnOnce() + Send + 'static,
{
let mut f = Some(f);
EventLoopMsg(Some(Box::new(move |el_state| {
EventLoopMsg(Some(Box::new(move || {
let f = unwrap!(f.take());
f(el_state)
f()
})))
}

Expand All @@ -41,85 +48,47 @@ impl fmt::Debug for EventLoopMsg {
}
}

#[derive(Clone)]
pub struct EventLoopState {
inner: Rc<RefCell<Inner>>,
}

struct Inner {
event_tx: Sender<Event>,
quic_ep: Option<quinn::Endpoint>,
}

impl EventLoopState {
/// Insert a QUIC Endpoint if not already. If already done previously this will return false.
pub fn insert_quic_endpoint(&self, quic_ep: quinn::Endpoint) -> bool {
let mut inner = self.inner.borrow_mut();
if inner.quic_ep.is_none() {
inner.quic_ep = Some(quic_ep);
true
} else {
false
}
}

/// Crust event sender to users
pub fn tx(&self) -> Sender<Event> {
self.inner.borrow_mut().event_tx.clone()
}

fn new(event_tx: Sender<Event>) -> Self {
Self {
inner: Rc::new(RefCell::new(Inner {
event_tx,
quic_ep: None,
})),
}
}
}

pub struct EventLoop {
tx: UnboundedSender<EventLoopMsg>,
j: Option<JoinHandle<()>>,
}

impl EventLoop {
pub fn spawn(event_tx: Sender<Event>) -> Self {
pub fn spawn() -> Self {
let (tx, rx) = mpsc::unbounded_channel::<EventLoopMsg>();

let j = unwrap!(thread::Builder::new()
.name("Crust-Event-Loop".into())
.spawn(move || {
let el_state = EventLoopState::new(event_tx);
let event_loop_future = rx.map_err(|_| ()).for_each(move |ev_loop_msg| {
if let Some(mut f) = ev_loop_msg.0 {
f(&el_state);
f();
Ok(())
} else {
Err(())
}
});

// let mut rt = unwrap!(tokio::runtime::current_thread::Runtime::new());
// let _ = rt.block_on(event_loop_future);
current_thread::run(event_loop_future);
println!("Exiting Crust Event Loop");
}));

Self { tx, j: Some(j) }
}

#[allow(unused)]
pub fn tx(&mut self) -> &mut UnboundedSender<EventLoopMsg> {
&mut self.tx
}

/// Post messages to event loop
pub fn post<F>(tx: &mut UnboundedSender<EventLoopMsg>, f: F)
pub fn post<F>(&mut self, f: F)
where
F: FnOnce(&EventLoopState) + Send + 'static,
F: FnOnce() + Send + 'static,
{
let msg = EventLoopMsg::new(f);
if let Err(e) = tx.try_send(msg) {
println!("Error posting messages to event loop: {:?}", e);
}
post(&mut self.tx, f)
}
}

Expand Down
81 changes: 64 additions & 17 deletions src/lib.rs
@@ -1,63 +1,109 @@
#![allow(unused, dead_code)]

#[macro_use]
extern crate quick_error;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate unwrap;

pub use event::Event;

use crate::wire_msg::WireMsg;
use config::Config;
use event_loop::{EventLoop, EventLoopMsg, EventLoopState};
use context::{ctx_mut, initialise_ctx, Context};
use error::Error;
use event_loop::EventLoop;
use futures::Future;
use std::net::SocketAddr;
use std::sync::mpsc::Sender;
use std::sync::mpsc::{self, Sender};
use tokio::runtime::current_thread;

mod communicate;
mod config;
mod connect;
mod context;
mod error;
mod event;
mod event_loop;
mod listener;
mod wire_msg;

pub type R<T> = Result<T, Error>;

/// Main Crust instance to communicate with Crust
pub struct Crust {
event_tx: Sender<Event>,
cfg: Config,
our_ext_addr: Option<SocketAddr>,
el: EventLoop,
}

impl Crust {
/// Create a new Crust instance
pub fn new(event_tx: Sender<Event>) -> Self {
let el = EventLoop::spawn(event_tx.clone());
let el = EventLoop::spawn();
let cfg = Default::default();

Self { event_tx, cfg, el }
Self {
event_tx,
cfg,
our_ext_addr: None,
el,
}
}

/// Start listener
///
/// It is necessary to call this to initialise Crust context within the event loop. Otherwise
/// very limited functionaity will be available.
pub fn start_listening(&mut self) {
let port = self.cfg.port.unwrap_or(0);
self.post(move |el_state| {
let tx = self.event_tx.clone();
self.el.post(move || {
let (ep, dr, incoming_connections) =
unwrap!(quinn::Endpoint::new().bind(&format!("0.0.0.0:{}", port)));

let ctx = Context::new(tx, ep);
initialise_ctx(ctx);

current_thread::spawn(dr.map_err(|e| println!("Error in quinn Driver: {:?}", e)));
assert!(el_state.insert_quic_endpoint(ep), "QUIC EP already created");
listener::listen(el_state.clone(), incoming_connections);
listener::listen(incoming_connections);
});
}

/// Connect to the given peer. This will error out if the peer is already in the process of
/// being connected to OR for any other connection failure reasons.
pub fn connect_to(&mut self, peer_addr: SocketAddr) {
self.el.post(move || {
let _r = connect::connect_to(peer_addr);
});
}

/// Send message to peer. If the peer is not connected, it will attempt to connect to it first
/// and then send the message
pub fn send(&mut self, peer_addr: SocketAddr, msg: Vec<u8>) {
self.el
.post(move || communicate::try_write_to_peer(peer_addr, WireMsg::UserMsg(msg)));
}

/// Get our connection info to give to others for them to connect to us
pub fn our_connection_info(&mut self) -> SocketAddr {
self.query_ip_echo_service()
}

fn query_ip_echo_service(&mut self) -> SocketAddr {
if let Some(addr) = self.our_ext_addr {
return addr;
}

let (tx, rx) = mpsc::channel();
let ip_echo_server = self.cfg.hard_coded_contacts[0];
unimplemented!()
}

fn post<F>(&mut self, f: F)
where
F: FnOnce(&EventLoopState) + Send + 'static,
{
EventLoop::post(self.el.tx(), f);
self.el.post(move || {
ctx_mut(|c| c.our_ext_addr_tx = Some(tx));
communicate::try_write_to_peer(ip_echo_server, WireMsg::EndpointEchoReq)
});

unwrap!(rx.recv())
}
}

Expand All @@ -69,6 +115,7 @@ mod tests {
#[test]
fn dropping_crust_handle_gracefully_shutsdown_event_loop() {
let (tx, _rx) = mpsc::channel();
let _crust = Crust::new(tx.clone());
let mut crust = Crust::new(tx.clone());
crust.start_listening();
}
}
79 changes: 43 additions & 36 deletions src/listener.rs
@@ -1,55 +1,62 @@
use crate::event::Event;
use crate::event_loop::EventLoopState;
use crate::wire_msg::WireMsg;
use futures::{Future, Stream};
use std::net::SocketAddr;
use std::sync::mpsc::Sender;
use crate::communicate::read_from_peer;
use crate::connect::connect_to;
use crate::context::{ctx_mut, Connection, ConnectionStatus};
use futures::Stream;
use std::collections::hash_map::Entry;
use tokio::runtime::current_thread;

/// Start listening
pub fn listen(el_state: EventLoopState, incoming_connections: quinn::Incoming) {
pub fn listen(incoming_connections: quinn::Incoming) {
let leaf = incoming_connections.for_each(move |new_conn| {
let connection = new_conn.connection;
let incoming_streams = new_conn.incoming;

let el_state = el_state.clone();
let peer_addr = connection.remote_address();

let mut establish_reverse_connection = false;
let allow = ctx_mut(|c| match c.connections.entry(peer_addr) {
Entry::Occupied(mut oe) => {
let conn = oe.get_mut();
if conn.from_peer.is_no_connection() {
conn.from_peer = ConnectionStatus::Established(connection);
establish_reverse_connection = conn.to_peer.status.is_no_connection();
true
} else {
false
}
}
Entry::Vacant(ve) => {
let mut conn: Connection = Default::default();
conn.from_peer = ConnectionStatus::Established(connection);
ve.insert(conn);
establish_reverse_connection = true;
true
}
});

if !allow {
println!("Not allowing duplicate connection from peer: {}", peer_addr);
return Ok(());
}

if establish_reverse_connection {
if let Err(e) = connect_to(peer_addr) {
println!(
"Dropping incoming connection as we could not reverse connect to peer {}: {:?}",
peer_addr, e
);
return Ok(());
}
}

let leaf = incoming_streams
.map_err(|e| println!("Connection closed due to: {:?}", e))
.for_each(move |quic_stream| {
handle_peer_req(el_state.clone(), quic_stream, peer_addr);
read_from_peer(peer_addr, quic_stream);
Ok(())
});
current_thread::spawn(leaf);
Ok(())
});
current_thread::spawn(leaf);
}

fn handle_peer_req(el_state: EventLoopState, quic_stream: quinn::NewStream, peer: SocketAddr) {
let i_stream = match quic_stream {
quinn::NewStream::Bi(_bi) => {
print!("No code handling for bi-directional stream");
return;
}
quinn::NewStream::Uni(uni) => uni,
};

let leaf = quinn::read_to_end(i_stream, 64 * 1024)
.map_err(|e| println!("Error reading stream: {:?}", e))
.and_then(move |(_i_stream, raw)| {
let event_tx = el_state.tx();
let wire_msg = unwrap!(bincode::deserialize(&*raw));
match wire_msg {
WireMsg::UserMsg(msg) => {
let new_msg = Event::NewMessage { peer, msg };
unwrap!(event_tx.send(new_msg));
}
x => panic!("No handler for message: {:?}", x),
}
Ok(())
});

current_thread::spawn(leaf);
}