Skip to content

Commit

Permalink
another attempt at websocket connection and threads
Browse files Browse the repository at this point in the history
  • Loading branch information
ttdonovan committed Sep 22, 2017
1 parent e4ab5c0 commit 7b5e684
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 109 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Expand Up @@ -11,7 +11,7 @@ error-chain = "0.11"
log = "0.3.8"
prost = "0.2"
prost-derive = "0.2"
ws = "0.7"
websocket = "0.20"

[build-dependencies]
prost-build = "0.2"
prost-build = "0.2"
29 changes: 17 additions & 12 deletions examples/client.rs
Expand Up @@ -12,19 +12,24 @@ fn main() {
// println!("StarCraft failed to start");
// }

println!("make connection...");
println!("Connecting...");
let mut conn = sc2_api::Connection::connect().expect("connect failed");

println!("Ready.");
loop {
match conn.recv_response() {
Ok(Response::Observation(r)) => { println!("Observation {:?}", r) },
Ok(Response::Quit(_)) => {
println!("Quit.");
break;
},
Ok(_) => { },
_ => { panic!("something has gone wrong") }
};
}
let _ = conn.send_thread.join();
let _ = conn.recv_thread.join();

println!("Exit.");

// loop {
// match conn.recv_response() {
// Ok(Response::Observation(r)) => { println!("Observation {:?}", r) },
// Ok(Response::Quit(_)) => {
// println!("Quit.");
// break;
// },
// Ok(_) => { },
// _ => { panic!("something has gone wrong") }
// };
// }
}
162 changes: 73 additions & 89 deletions src/connection.rs
Expand Up @@ -2,117 +2,101 @@ use errors::*;
use raw_protobuf_api::{response, Response};

use prost::Message;
use ws::{self, connect, Handler, Handshake, Result as WsResult};
use websocket as ws;
use websocket::client::ClientBuilder;

use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{self, JoinHandle};

const CONNECTION: &'static str = "ws://127.0.0.1:5000/sc2api";

#[derive(Debug)]
pub struct Connection {
recv_ch: Receiver<response::Response>,
send_ch: Sender<()>,
running: Arc<AtomicBool>,
conn_thread: JoinHandle<()>,
}

pub struct Client {
tx: Sender<response::Response>,
// rx: Receiver<()>,
// out: ws::Sender,
}

impl Handler for Client {
fn on_open(&mut self, _: Handshake) -> WsResult<()> {
println!("on_open");

Ok(())
}

fn on_message(&mut self, msg: ws::Message) -> WsResult<()> {
if let Ok(r) = Response::decode(msg.into_data()) {
self.tx.send(r.response.unwrap()).unwrap();
};

Ok(())
}
// recv_ch: Receiver<response::Response>,
send_ch: Sender<(ws::OwnedMessage)>,
pub recv_thread: JoinHandle<()>,
pub send_thread: JoinHandle<()>,
}

impl Connection {
pub fn connect() -> Result<Connection> {
let (recv_tx, recv_rx) = channel();
let (send_tx, send_rx): (Sender<()>, Receiver<()>) = channel();
let running = Arc::new(AtomicBool::new(true));

let thread = thread::Builder::new()
.name("StarCraft connection".into())
.spawn(move || {
connect("ws://127.0.0.1:5000/sc2api", |_out| {
// // FIXME: compilation error[E0507]
// // error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
// // --> src/connection.rs:55:32
// // |
// // 45 | let (send_tx, send_rx): (Sender<()>, Receiver<()>) = channel();
// // | ------- captured outer variable
// // ...
// // 63 | .spawn(move || {
// // | ^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure
// let _t = thread::Builder::new()
// .name("Client sender".into())
// .spawn(move || {
// // TODO: replace "loop" with "while running <AtomicBool(true)>"
// loop {
// use std::time::Duration;
//
// println!("client sender listening...");
// match send_rx.recv() {
// Ok(_) => println!("got a message"),
// Err(_) => panic!(),
// }
//
// thread::sleep(Duration::from_millis(500));
// }
// })
// .unwrap();

Client {
tx: recv_tx.clone(),
// rx: send_rx,
// out: out,
}
}).unwrap()
})
.unwrap();
let client = ClientBuilder::new(CONNECTION)
.unwrap()
.connect_insecure()
.unwrap();

let (mut recv_tx, mut recv_rx) = client.split().unwrap();
let (send_tx, send_rx) = channel();

let tx = send_tx.clone();
let recv_thread = thread::spawn(move || {
for message in recv_tx.incoming_messages() {
let msg = match message {
Ok(m) => m,
Err(e) => {
debug!("Receive Loop: {:?}", e);
let _ = tx.send(ws::OwnedMessage::Close(None));
return;
},
};

match msg {
ws::OwnedMessage::Close(_) => {
let _ = tx.send(ws::OwnedMessage::Close(None));
return;
},
_ => {
// TODO: need send message to `tx` for `send_rx`?
println!("Receive Loop...");
debug!("{:?}", msg);
},
}
}
});

// TODO: maybe this thread is not needed...
let send_thread = thread::spawn(move || {
loop {
let msg = match send_rx.recv() {
Ok(m) => m,
Err(e) => {
debug!("Send Loop: {:?}", e);
return;
},
};

match recv_rx.send_message(&msg) {
Ok(()) => (),
Err(e) => {
debug!("Send Loop: {:?}", e);
let _ = recv_rx.send_message(&ws::Message::close());
return;
},
}
}
});

let connection = Connection {
recv_ch: recv_rx,
// recv_ch: recv_rx,
send_ch: send_tx,
running: running,
conn_thread: thread,
recv_thread: recv_thread,
send_thread: send_thread,
};

Ok(connection)
}

pub fn recv_response(&mut self) -> Result<response::Response> {
pub fn recv_response(&mut self) {
// The `recv` method picks a message from the channel
// `recv` will block the current thread if there are no messages available
match self.recv_ch.recv() {
Ok(r) => Ok(r),
Err(_) => panic!(),
}
// match self.recv_ch.recv() {
// Ok(r) => Ok(r),
// Err(_) => panic!(),
// }
}

// TODO: accept a request and send to the channel (send_ch)
// FIXME: dummy function that only sends a `OwnedMessage::Close(None)`
pub fn send_request(&self) {
self.send_ch.send(()).expect("failed to send...");
}
}

impl Drop for Connection {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
// TODO: do we need to wait for listening thread with send_ch (rx) to close for a graceful shutdown?
self.send_ch.send(ws::OwnedMessage::Close(None)).expect("failed to send...");
}
}
6 changes: 1 addition & 5 deletions src/errors.rs
@@ -1,7 +1,3 @@
use ws;

error_chain! {
foreign_links {
Ws(ws::Error);
}

}
2 changes: 1 addition & 1 deletion src/lib.rs
Expand Up @@ -6,7 +6,7 @@ extern crate log;
extern crate prost;
#[macro_use]
extern crate prost_derive;
extern crate ws;
extern crate websocket;

mod connection;
mod errors;
Expand Down

0 comments on commit 7b5e684

Please sign in to comment.