Skip to content

Commit

Permalink
Added separate rpc module, cleanup handling
Browse files Browse the repository at this point in the history
  • Loading branch information
saschagrunert committed Sep 3, 2017
1 parent eb59fb5 commit 9ecb639
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 96 deletions.
43 changes: 23 additions & 20 deletions examples/main.rs
Expand Up @@ -51,18 +51,18 @@ fn run() -> Result<()> {
}

// Check the CLI parameters
let address = matches.value_of("address").ok_or_else(
|| "No CLI 'address' provided",
)?;
let server_cert_file = matches.value_of("servercertfile").ok_or_else(
|| "No server certificate provided",
)?;
let client_cert_file = matches.value_of("clientcertfile").ok_or_else(
|| "No client certificate provided",
)?;
let address = matches
.value_of("address")
.ok_or_else(|| "No CLI 'address' provided")?;
let server_cert_file = matches
.value_of("servercertfile")
.ok_or_else(|| "No server certificate provided")?;
let client_cert_file = matches
.value_of("clientcertfile")
.ok_or_else(|| "No client certificate provided")?;

// Create the microservice instance
let microservice = Microservice::new(address, server_cert_file)?;
let microservice = Microservice::new(address)?;

// Check if testing is enabled
if matches.is_present("test") {
Expand All @@ -75,20 +75,23 @@ fn run() -> Result<()> {

// Run the RPC
info!("Running the RPC.");
rpc.run(request.send().promise.and_then(|message| {
// Get the response content
let response = message.get()?.get_response()?;
info!("Got response: {}", response);

// Check the result
assert_eq!(response, "olleH");
Ok(())
}))?;
rpc.run(request
.send()
.promise
.and_then(|message| {
// Get the response content
let response = message.get()?.get_response()?;
info!("Got response: {}", response);

// Check the result
assert_eq!(response, "olleH");
Ok(())
}))?;

info!("Test passed.");
} else {
// Start the server
microservice.serve()?;
microservice.serve(server_cert_file)?;
}

Ok(())
Expand Down
95 changes: 33 additions & 62 deletions src/lib.rs
Expand Up @@ -26,12 +26,12 @@ pub mod microservice_capnp {
#![allow(missing_docs)]
include!(concat!(env!("OUT_DIR"), "/proto/microservice_capnp.rs"));
}
mod rpc;

use std::net::ToSocketAddrs;
use std::fs::File;
use std::io::{self, Read};

use capnp::capability::Promise;
use capnp_rpc::{RpcSystem, twoparty, rpc_twoparty_capnp, Server};
use futures::{Future, Stream};
use native_tls::{Certificate, Pkcs12, TlsAcceptor, TlsConnector};
Expand All @@ -45,29 +45,26 @@ use microservice_capnp::microservice;
/// The main microservice structure
pub struct Microservice {
socket_addr: std::net::SocketAddr,
cert_filename: String,
}

impl Microservice {
/// Create a new microservice instance
pub fn new(address: &str, cert_filename: &str) -> Result<Self> {
pub fn new(address: &str) -> Result<Self> {
// Parse socket address
let parsed_address = address.to_socket_addrs()?.next().ok_or_else(
|| "Could not parse socket address.",
)?;
let parsed_address = address
.to_socket_addrs()?
.next()
.ok_or_else(|| "Could not parse socket address.")?;

// Process TLS settings
info!("Parsed socket address: {:}", parsed_address);

// Return service
Ok(Microservice {
socket_addr: parsed_address,
cert_filename: cert_filename.to_owned(),
})
Ok(Microservice { socket_addr: parsed_address })
}

/// Runs the server
pub fn serve(&self) -> Result<()> {
pub fn serve(&self, cert_filename: &str) -> Result<()> {
info!("Creating server and binding socket.");
let mut core = reactor::Core::new()?;
let handle = core.handle();
Expand All @@ -76,36 +73,35 @@ impl Microservice {
// Prepare the vector
info!("Opening server certificate");
let mut bytes = vec![];
File::open(&self.cert_filename)?.read_to_end(&mut bytes)?;
File::open(cert_filename)?
.read_to_end(&mut bytes)?;

// Create the certificate
let cert = Pkcs12::from_der(&bytes, "")?;

// Create the acceptor
let tls_acceptor = TlsAcceptor::builder(cert)?.build()?;

let server_impl = microservice::ToClient::new(ServerImplementation).from_server::<Server>();
let server_impl = microservice::ToClient::new(rpc::Rpc).from_server::<Server>();
let connections = socket.incoming();

let tls_handshake = connections.map(|(socket, _addr)| {
if let Err(e) = socket.set_nodelay(true) {
error!("Unable to set socket to nodelay: {:}", e);
}
tls_acceptor.accept_async(socket)
});
if let Err(e) = socket.set_nodelay(true) {
error!("Unable to set socket to nodelay: {:}", e);
}
tls_acceptor.accept_async(socket)
});

let server = tls_handshake.map(|acceptor| {
let handle = handle.clone();
let server_impl = server_impl.clone();
acceptor.and_then(move |socket| {
let (reader, writer) = socket.split();

let network = twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let network = twoparty::VatNetwork::new(reader,
writer,
rpc_twoparty_capnp::Side::Server,
Default::default());

let rpc_system = RpcSystem::new(Box::new(network), Some(server_impl.client));
handle.spawn(rpc_system.map_err(|e| error!("{}", e)));
Expand All @@ -115,9 +111,9 @@ impl Microservice {

info!("Running server");
Ok(core.run(server.for_each(|client| {
handle.spawn(client.map_err(|e| error!("{}", e)));
Ok(())
}))?)
handle.spawn(client.map_err(|e| error!("{}", e)));
Ok(())
}))?)
}

/// Retrieve a client to the microservice instance
Expand All @@ -136,23 +132,20 @@ impl Microservice {
builder.add_root_certificate(ca_cert)?;
let cx = builder.build()?;
let tls_handshake = socket.and_then(|socket| {
if let Err(e) = socket.set_nodelay(true) {
error!("Unable to set socket to nodelay: {:}", e);
}
cx.connect_async("localhost", socket).map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
})
});
if let Err(e) = socket.set_nodelay(true) {
error!("Unable to set socket to nodelay: {:}", e);
}
cx.connect_async("localhost", socket)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
});

let stream = core.run(tls_handshake)?;
let (reader, writer) = stream.split();

let network = Box::new(twoparty::VatNetwork::new(
reader,
writer,
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let network = Box::new(twoparty::VatNetwork::new(reader,
writer,
rpc_twoparty_capnp::Side::Client,
Default::default()));
let mut rpc_system = RpcSystem::new(network, None);
let client: microservice::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
handle.spawn(rpc_system.map_err(|e| error!("{}", e)));
Expand All @@ -161,25 +154,3 @@ impl Microservice {
Ok((client, core))
}
}

struct ServerImplementation;

impl microservice::Server for ServerImplementation {
fn hello(
&mut self,
params: microservice::HelloParams,
mut results: microservice::HelloResults,
) -> Promise<(), capnp::Error> {
// Get the request
let request = pry!(pry!(params.get()).get_request());
info!("Got request: {}", request);

// Create the response
let response: String = request.chars().rev().collect();
results.get().set_response(&response);
info!("Returned reponse: {}", response);

// Finish the future
Promise::ok(())
}
}
25 changes: 25 additions & 0 deletions src/rpc.rs
@@ -0,0 +1,25 @@
use capnp;
use capnp::capability::Promise;
use microservice_capnp::microservice;

/// The main RPC implementation structure
pub struct Rpc;

impl microservice::Server for Rpc {
fn hello(&mut self,
params: microservice::HelloParams,
mut results: microservice::HelloResults)
-> Promise<(), capnp::Error> {
// Get the request
let request = pry!(pry!(params.get()).get_request());
info!("Got request: {}", request);

// Create the response
let response: String = request.chars().rev().collect();
results.get().set_response(&response);
info!("Returned reponse: {}", response);

// Finish the future
Promise::ok(())
}
}
32 changes: 18 additions & 14 deletions tests/lib.rs
Expand Up @@ -13,18 +13,18 @@ fn hello_success() {

// Run the server in a differenc instance
thread::spawn(move || {
Microservice::new(addr, server_cert)
.unwrap()
.serve()
.unwrap();
});
Microservice::new(addr)
.unwrap()
.serve(server_cert)
.unwrap();
});

// Wait for the server to become ready
let time = time::Duration::from_secs(1);
thread::sleep(time);

// Get a client to the microservice
let (client, mut rpc) = Microservice::new(addr, server_cert)
let (client, mut rpc) = Microservice::new(addr)
.unwrap()
.get_client(client_cert)
.unwrap();
Expand All @@ -34,12 +34,16 @@ fn hello_success() {
request.get().set_request("Hello");

// Run the RPC
rpc.run(request.send().promise.and_then(|message| {
// Get the response content
let response = message.get()?.get_response()?;

// Check the result
assert_eq!(response, "olleH");
Ok(())
})).unwrap();
rpc.run(request
.send()
.promise
.and_then(|message| {
// Get the response content
let response = message.get()?.get_response()?;

// Check the result
assert_eq!(response, "olleH");
Ok(())
}))
.unwrap();
}

0 comments on commit 9ecb639

Please sign in to comment.