Skip to content

Commit

Permalink
📝 use actor.with for communication (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xudong-Huang committed May 10, 2018
1 parent b4ee77e commit 5eb669e
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fern = "0.5"
config = "0.8"
failure = "0.1"
num_cpus = "1"
may_actor = "0.1"
may_actor = "0.2"
lazy_static = "1"
tungstenite = "0.5"
native-tls = "0.1.5"
Expand Down
6 changes: 3 additions & 3 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ fn test_ws() -> Result<()> {

let client = hub::create_outbound_conn(("127.0.0.1", config::WS_PORT))?;

client.send_message(json!("hehehe"));
client.send_version()?;

let g = hub::INBOUND_CONN.read().unwrap();
let server = &g[0];
server.send_message(json!("hahaha"));
server.send_version()?;

Ok(())
}
Expand Down Expand Up @@ -223,7 +223,7 @@ fn network_clean() {

fn main() {
use std::io::{self, Read};
may::config().set_stack_size(0x1000 - 1);
// may::config().set_stack_size(0x1000 - 1);
log_init();
show_config().unwrap();
test_json().unwrap();
Expand Down
2 changes: 2 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub const WS_PORT: u16 = 8080;
pub const COUNT_WITNESSES: usize = 12;
pub const MAJORITY_OF_WITNESSES: usize = 7;
pub const GENESIS_UNIT: &str = "rg1RzwKwnfRHjBojGol3gZaC5w7kR++rOR6O61JRsrQ=";
pub const VERSION: &str = "1.0";
pub const ALT: &str = "1";

lazy_static! {
pub static ref CONFIG: RwLock<Config> = RwLock::new({
Expand Down
69 changes: 60 additions & 9 deletions src/network/hub.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io::{Read, Write};
use std::net::ToSocketAddrs;

use config;
use error::Result;
use may::net::TcpStream;
use may_actor::Actor;
Expand All @@ -25,17 +26,38 @@ lazy_static! {
pub struct HubConn(pub Actor<HubConnImpl<TcpStream>>);

impl HubConn {
// just a simple example interface
pub fn send_message(&self, msg: Value) {
self.0
.call(move |me| me.send_json(&json!(["justsaying", msg])).unwrap());
#[inline]
pub fn with<R, F>(&self, f: F) -> R
where
F: FnOnce(&mut HubConnImpl<TcpStream>) -> R + Send,
R: Send,
{
self.0.with(f)
}

pub fn send_version(&self) -> Result<()> {
// TODO: read these things from config
self.0.with(|me| {
me.send_just_saying(
"version",
json!({
"protocol_version": config::VERSION,
"alt": config::ALT,
"library": "rust-trustnote",
"library_version": "0.1.0",
"program": "rust-trustnote-hub",
"program_version": "0.1.0"
}),
)
})
}
}

pub struct HubConnImpl<T: Read + Write> {
// this half is only used for send message
// the other receive half is within the actor driver
conn: WebSocket<T>,
peer: String,
}

impl<T: Read + Write> Drop for HubConnImpl<T> {
Expand All @@ -46,11 +68,14 @@ impl<T: Read + Write> Drop for HubConnImpl<T> {

impl<T: Read + Write> Connection<T> for HubConnImpl<T> {
fn new(s: WebSocket<T>) -> Self {
HubConnImpl { conn: s }
// TODO: need to add peer init
let peer = "peer".to_owned();
HubConnImpl { conn: s, peer }
}

fn send_json(&mut self, value: &Value) -> Result<()> {
let msg = serde_json::to_string(value)?;
fn send_json(&mut self, value: Value) -> Result<()> {
let msg = serde_json::to_string(&value)?;
info!("SENDING to {}: {}", self.peer, msg);
self.conn.write_message(Message::Text(msg))?;
Ok(())
}
Expand All @@ -71,6 +96,32 @@ impl<T: Read + Write> Connection<T> for HubConnImpl<T> {
}
}

impl<T: Read + Write> HubConnImpl<T> {
pub fn send_message(&mut self, kind: &str, content: Value) -> Result<()> {
self.send_json(json!([kind, &content]))
}

pub fn send_just_saying(&mut self, subject: &str, body: Value) -> Result<()> {
self.send_message("justsaying", json!({"subject": subject, "body": body}))
}

pub fn send_error(&mut self, error: Value) -> Result<()> {
self.send_just_saying("error", error)
}

pub fn send_info(&mut self, info: Value) -> Result<()> {
self.send_just_saying("info", info)
}

pub fn send_result(&mut self, result: Value) -> Result<()> {
self.send_just_saying("result", result)
}

pub fn send_error_result(&mut self, unit: &str, error: &str) -> Result<()> {
self.send_result(json!({"unit": unit, "result": "error", "error": error}))
}
}

pub fn create_outbound_conn<A: ToSocketAddrs>(address: A) -> Result<HubConn> {
let stream = TcpStream::connect(address)?;
let r_stream = stream.try_clone()?;
Expand All @@ -80,7 +131,7 @@ pub fn create_outbound_conn<A: ToSocketAddrs>(address: A) -> Result<HubConn> {

let (conn, _) = client(req, stream)?;
let r_ws = WebSocket::from_raw_socket(r_stream, Role::Client);
let actor = Actor::drive_new(HubConnImpl { conn }, move |actor| {
let actor = Actor::drive_new(HubConnImpl::new(conn), move |actor| {
super::network::connection_receiver(r_ws, actor)
});

Expand All @@ -101,5 +152,5 @@ pub fn new_wss(host: &str) -> Result<HubConnImpl<TlsStream<TcpStream>>> {
let req = Request::from(url);

let (conn, _) = client(req, stream)?;
Ok(HubConnImpl { conn })
Ok(HubConnImpl::new(conn))
}
2 changes: 1 addition & 1 deletion src/network/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ macro_rules! t_c {

pub trait Connection<S> {
fn new(s: WebSocket<S>) -> Self;
fn send_json(&mut self, value: &Value) -> Result<()>;
fn send_json(&mut self, value: Value) -> Result<()>;
fn on_message(&mut self, msg: Value) -> Result<()>;
fn on_request(&mut self, msg: Value) -> Result<()>;
fn on_response(&mut self, msg: Value) -> Result<()>;
Expand Down

0 comments on commit 5eb669e

Please sign in to comment.