Skip to content

Commit

Permalink
Implement an API to request a graceful shutdown.
Browse files Browse the repository at this point in the history
Additionally listen for Ctrl-C as a shutdown signal by default.
  • Loading branch information
jhpratt authored and jebrosen committed Sep 21, 2019
1 parent 4a5a0b5 commit 165b742
Show file tree
Hide file tree
Showing 38 changed files with 202 additions and 58 deletions.
3 changes: 2 additions & 1 deletion core/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ edition = "2018"
all-features = true

[features]
default = ["private-cookies"]
default = ["private-cookies", "ctrl_c_shutdown"]
tls = ["rocket_http/tls"]
private-cookies = ["rocket_http/private-cookies"]
ctrl_c_shutdown = ["tokio/signal"]

[dependencies]
rocket_codegen = { version = "0.5.0-dev", path = "../codegen" }
Expand Down
22 changes: 17 additions & 5 deletions core/lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ use yansi::Paint;
use crate::http::hyper;
use crate::router::Route;

// TODO.async docs
#[derive(Debug)]
pub enum Error {
Launch(LaunchError),
Run(hyper::Error),
}

/// The kind of launch error that occurred.
///
/// In almost every instance, a launch error occurs because of an I/O error;
Expand Down Expand Up @@ -44,7 +51,9 @@ pub enum LaunchErrorKind {
/// as inspected; a subsequent `drop` of the value will _not_ result in a panic.
/// The following snippet illustrates this:
///
/// ```rust
// TODO.async This isn't true any more, as `.launch()` now returns a
// `Result<(), crate::error::Error>`, which could also be a runtime error.
/// ```rust,ignore
/// # if false {
/// let error = rocket::ignite().launch();
///
Expand Down Expand Up @@ -106,11 +115,14 @@ impl LaunchError {
/// # Example
///
/// ```rust
/// use rocket::error::Error;
/// # if false {
/// let error = rocket::ignite().launch();
///
/// // This line is only reached if launch failed.
/// let error_kind = error.kind();
/// if let Err(error) = rocket::ignite().launch() {
/// match error {
/// Error::Launch(err) => println!("Found a launch error: {}", err.kind()),
/// Error::Run(err) => println!("Error at runtime"),
/// }
/// }
/// # }
/// ```
#[inline]
Expand Down
1 change: 1 addition & 0 deletions core/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub mod data;
pub mod handler;
pub mod fairing;
pub mod error;
pub mod shutdown;

// Reexport of HTTP everything.
pub mod http {
Expand Down
117 changes: 98 additions & 19 deletions core/lib/src/rocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use std::net::ToSocketAddrs;
use std::sync::Arc;
use std::time::Duration;

use futures::future::{Future, FutureExt, TryFutureExt, BoxFuture};
use futures::future::{Future, FutureExt, BoxFuture};
use futures::channel::{mpsc, oneshot};
use futures::stream::StreamExt;
use futures::task::SpawnExt;
use futures_tokio_compat::Compat as TokioCompat;
Expand All @@ -28,6 +29,7 @@ use crate::outcome::Outcome;
use crate::error::{LaunchError, LaunchErrorKind};
use crate::fairing::{Fairing, Fairings};
use crate::ext::AsyncReadExt;
use crate::shutdown::{ShutdownHandle, ShutdownHandleManaged};

use crate::http::{Method, Status, Header};
use crate::http::hyper::{self, header};
Expand All @@ -42,6 +44,8 @@ pub struct Rocket {
catchers: HashMap<u16, Catcher>,
pub(crate) state: Container,
fairings: Fairings,
shutdown_handle: ShutdownHandle,
shutdown_receiver: Option<mpsc::Receiver<()>>,
}

// This function tries to hide all of the Hyper-ness from Rocket. It
Expand Down Expand Up @@ -442,14 +446,22 @@ impl Rocket {
Paint::default(LoggedValue(value)).bold());
}

Rocket {
let (shutdown_sender, shutdown_receiver) = mpsc::channel(1);

let rocket = Rocket {
config,
router: Router::new(),
default_catchers: catcher::defaults::get(),
catchers: catcher::defaults::get(),
state: Container::new(),
fairings: Fairings::new(),
}
shutdown_handle: ShutdownHandle(shutdown_sender),
shutdown_receiver: Some(shutdown_receiver),
};

rocket.state.set(ShutdownHandleManaged(rocket.shutdown_handle.clone()));

rocket
}

/// Mounts all of the routes in the supplied vector at the given `base`
Expand Down Expand Up @@ -699,11 +711,10 @@ impl Rocket {
/// });
/// # }
/// ```
// TODO.async Decide on an return type, possibly creating a discriminated union.
pub fn spawn_on(
mut self,
runtime: &tokio::runtime::Runtime,
) -> Result<impl Future<Output = Result<(), Box<dyn std::error::Error>>>, LaunchError> {
) -> Result<impl Future<Output = Result<(), hyper::Error>>, LaunchError> {
#[cfg(feature = "tls")] use crate::http::tls;

self = self.prelaunch_check()?;
Expand Down Expand Up @@ -749,6 +760,11 @@ impl Rocket {
// Restore the log level back to what it originally was.
logger::pop_max_level();

// We need to get these values before moving `self` into an `Arc`.
let mut shutdown_receiver = self.shutdown_receiver
.take().expect("shutdown receiver has already been used");
let shutdown_handle = self.get_shutdown_handle();

let rocket = Arc::new(self);
let spawn = Box::new(TokioCompat::new(runtime.executor()));
let service = hyper::make_service_fn(move |socket: &hyper::AddrStream| {
Expand All @@ -762,19 +778,54 @@ impl Rocket {
}
});

#[cfg(feature = "ctrl_c_shutdown")]
let (cancel_ctrl_c_listener_sender, cancel_ctrl_c_listener_receiver) = oneshot::channel();

// NB: executor must be passed manually here, see hyperium/hyper#1537
let server = hyper::Server::builder(incoming)
let (future, handle) = hyper::Server::builder(incoming)
.executor(runtime.executor())
.serve(service);
.serve(service)
.with_graceful_shutdown(async move { shutdown_receiver.next().await; })
.inspect(|_| {
#[cfg(feature = "ctrl_c_shutdown")]
let _ = cancel_ctrl_c_listener_sender.send(());
})
.remote_handle();

let (future, handle) = server.remote_handle();
runtime.spawn(future);
Ok(handle.err_into())

#[cfg(feature = "ctrl_c_shutdown")]
match tokio::net::signal::ctrl_c() {
Ok(mut ctrl_c) => {
runtime.spawn(async move {
// Stop listening for `ctrl_c` if the server shuts down
// a different way to avoid waiting forever.
futures::future::select(
ctrl_c.next(),
cancel_ctrl_c_listener_receiver,
).await;

// Request the server shutdown.
shutdown_handle.shutdown();
});
},
Err(err) => {
// Signal handling isn't strictly necessary, so we can skip it
// if necessary. It's a good idea to let the user know we're
// doing so in case they are expecting certain behavior.
let message = "Not listening for shutdown keybinding.";
warn!("{}", Paint::yellow(message));
info_!("Error: {}", err);
},
}

Ok(handle)
}

/// Starts the application server and begins listening for and dispatching
/// requests to mounted routes and catchers. Unless there is an error, this
/// function does not return and blocks until program termination.
/// requests to mounted routes and catchers. This function does not return
/// unless a shutdown is requested via a [`ShutdownHandle`] or there is an
/// error.
///
/// # Error
///
Expand All @@ -790,25 +841,53 @@ impl Rocket {
/// rocket::ignite().launch();
/// # }
/// ```
// TODO.async Decide on an return type, possibly creating a discriminated union.
pub fn launch(self) -> Box<dyn std::error::Error> {
pub fn launch(self) -> Result<(), crate::error::Error> {
use crate::error::Error;

// TODO.async What meaning should config.workers have now?
// Initialize the tokio runtime
let runtime = tokio::runtime::Builder::new()
.core_threads(self.config.workers as usize)
.build()
.expect("Cannot build runtime!");

// TODO.async: Use with_graceful_shutdown, and let launch() return a Result<(), Error>
match self.spawn_on(&runtime) {
Ok(fut) => match runtime.block_on(fut) {
Ok(_) => unreachable!("the call to `block_on` should block on success"),
Err(err) => err,
}
Err(err) => Box::new(err),
Ok(fut) => runtime.block_on(fut).map_err(Error::Run),
Err(err) => Err(Error::Launch(err)),
}
}

/// Returns a [`ShutdownHandle`], which can be used to gracefully terminate
/// the instance of Rocket. In routes, you should use the [`ShutdownHandle`]
/// request guard.
///
/// # Example
///
/// ```rust
/// # #![feature(proc_macro_hygiene)]
/// # use std::{thread, time::Duration};
/// #
/// let rocket = rocket::ignite();
/// let handle = rocket.get_shutdown_handle();
/// # let real_handle = rocket.get_shutdown_handle();
///
/// # if false {
/// thread::spawn(move || {
/// thread::sleep(Duration::from_secs(10));
/// handle.shutdown();
/// });
/// # }
/// # real_handle.shutdown();
///
/// // Shuts down after 10 seconds
/// let shutdown_result = rocket.launch();
/// assert!(shutdown_result.is_ok());
/// ```
#[inline(always)]
pub fn get_shutdown_handle(&self) -> ShutdownHandle {
self.shutdown_handle.clone()
}

/// Returns an iterator over all of the routes mounted on this instance of
/// Rocket.
///
Expand Down
50 changes: 50 additions & 0 deletions core/lib/src/shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use crate::request::{FromRequest, Outcome, Request};
use futures::channel::mpsc;

/// # Example
///
/// ```rust
/// # #![feature(proc_macro_hygiene)]
/// # #[macro_use] extern crate rocket;
/// #
/// use rocket::shutdown::ShutdownHandle;
///
/// #[get("/shutdown")]
/// fn shutdown(handle: ShutdownHandle) -> &'static str {
/// handle.shutdown();
/// "Shutting down..."
/// }
///
/// fn main() {
/// # if false {
/// rocket::ignite()
/// .mount("/", routes![shutdown])
/// .launch()
/// .expect("server failed unexpectedly");
/// # }
/// }
/// ```
#[derive(Debug, Clone)]
pub struct ShutdownHandle(pub(crate) mpsc::Sender<()>);

impl ShutdownHandle {
/// Notify Rocket to shut down gracefully.
#[inline]
pub fn shutdown(mut self) {
// Intentionally ignore any error, as the only scenarios this can happen
// is sending too many shutdown requests or we're already shut down.
let _ = self.0.try_send(());
}
}

impl FromRequest<'_, '_> for ShutdownHandle {
type Error = std::convert::Infallible;

#[inline]
fn from_request(request: &Request<'_>) -> Outcome<Self, Self::Error> {
Outcome::Success(request.state.managed.get::<ShutdownHandleManaged>().0.clone())
}
}

// Use this type in managed state to avoid placing `ShutdownHandle` in it.
pub(crate) struct ShutdownHandleManaged(pub ShutdownHandle);
2 changes: 1 addition & 1 deletion examples/config/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This example's illustration is the Rocket.toml file.
fn main() {
rocket::ignite().launch();
let _ = rocket::ignite().launch();
}
2 changes: 1 addition & 1 deletion examples/content_types/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn not_found(request: &Request<'_>) -> Html<String> {
}

fn main() {
rocket::ignite()
let _ = rocket::ignite()
.mount("/hello", routes![get_hello, post_hello])
.register(catchers![not_found])
.launch();
Expand Down
2 changes: 1 addition & 1 deletion examples/cookies/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ fn rocket() -> rocket::Rocket {
}

fn main() {
rocket().launch();
let _ = rocket().launch();
}
3 changes: 2 additions & 1 deletion examples/errors/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ fn main() {
.launch();

println!("Whoops! Rocket didn't launch!");
println!("This went wrong: {}", e);
// TODO.async Uncomment the following line once `.launch()`'s error type is determined.
// println!("This went wrong: {}", e);
}
2 changes: 1 addition & 1 deletion examples/fairings/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,5 @@ fn rocket() -> rocket::Rocket {
}

fn main() {
rocket().launch();
let _ = rocket().launch();
}
2 changes: 1 addition & 1 deletion examples/form_kitchen_sink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@ fn rocket() -> rocket::Rocket {
}

fn main() {
rocket().launch();
let _ = rocket().launch();
}
2 changes: 1 addition & 1 deletion examples/form_validation/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,5 @@ fn rocket() -> rocket::Rocket {
}

fn main() {
rocket().launch();
let _ = rocket().launch();
}
2 changes: 1 addition & 1 deletion examples/handlebars_templates/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,5 @@ fn rocket() -> rocket::Rocket {
}

fn main() {
rocket().launch();
let _ = rocket().launch();
}
2 changes: 1 addition & 1 deletion examples/hello_2018/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ fn hello() -> &'static str {
}

fn main() {
rocket::ignite().mount("/", routes![hello]).launch();
let _ = rocket::ignite().mount("/", routes![hello]).launch();
}
2 changes: 1 addition & 1 deletion examples/hello_person/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ fn hi(name: String) -> String {
}

fn main() {
rocket::ignite().mount("/", routes![hello, hi]).launch();
let _ = rocket::ignite().mount("/", routes![hello, hi]).launch();
}
2 changes: 1 addition & 1 deletion examples/hello_world/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ fn hello() -> &'static str {
}

fn main() {
rocket::ignite().mount("/", routes![hello]).launch();
let _ = rocket::ignite().mount("/", routes![hello]).launch();
}
Loading

0 comments on commit 165b742

Please sign in to comment.