Skip to content


Switch safekeepers to async.
Browse files Browse the repository at this point in the history
This is a full switch, fs io operations are also tokio ones, working through
thread pool. Similar to pageserver, we have multiple runtimes for easier `top`
usage and isolation.

Notable points:
- Now that guts of are full of .await's, we need to be very
  careful not to drop task at random point, leaving timeline in unclear
  state. Currently the only writer is walreceiver and we don't have top
  level cancellation there, so we are good. But to be safe probably we should
  add a fuse panicking if task is being dropped while operation on a timeline
  is in progress.
- Timeline lock is Tokio one now, as we do disk IO under it.
- Collecting metrics got a crutch: since prometheus Collector is
  synchronous, it spawns a thread with current thread runtime collecting data.
- Anything involving closures becomes significantly more complicated, as
  async fns are already kinda closures + 'async closures are unstable'.
- Main thread now tracks other main tasks, which got much easier.
- The only sync place left is initial data loading, as otherwise clippy
  complains on timeline map lock being held across await points -- which is
  not bad here as it happens only in single threaded runtime of main thread.
  But having it sync doesn't hurt either.

I'm concerned about performance of thread pool io offloading, async traits and
many await points; but we can try and see how it goes.

fixes #3036
fixes #3966
  • Loading branch information
arssher committed Jun 11, 2023
1 parent fbf0367 commit 227271c
Show file tree
Hide file tree
Showing 21 changed files with 672 additions and 592 deletions.
41 changes: 3 additions & 38 deletions libs/utils/src/http/
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use crate::auth::{Claims, JwtAuth};
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
use anyhow::{anyhow, Context};
use anyhow::Context;
use hyper::header::{HeaderName, AUTHORIZATION};
use hyper::http::HeaderValue;
use hyper::Method;
use hyper::{header::CONTENT_TYPE, Body, Request, Response, Server};
use hyper::{header::CONTENT_TYPE, Body, Request, Response};
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy;
use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder, RouterService};
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use tokio::task::JoinError;
use tracing::{self, debug, info, info_span, warn, Instrument};

use std::future::Future;
use std::net::TcpListener;
use std::str::FromStr;

static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
Expand Down Expand Up @@ -348,40 +347,6 @@ pub fn check_permission_with(

/// Start listening for HTTP requests on given socket.
/// 'shutdown_future' can be used to stop. If the Future becomes
/// ready, we stop listening for new requests, and the function returns.
pub fn serve_thread_main<S>(
router_builder: RouterBuilder<hyper::Body, ApiError>,
listener: TcpListener,
shutdown_future: S,
) -> anyhow::Result<()>
S: Future<Output = ()> + Send + Sync,
info!("Starting an HTTP endpoint at {}", listener.local_addr()?);

// Create a Service from the router above to handle incoming requests.
let service = RouterService::new(|err| anyhow!(err))?).unwrap();

// Enter a single-threaded tokio runtime bound to the current thread
let runtime = tokio::runtime::Builder::new_current_thread()

let _guard = runtime.enter();

let server = Server::from_tcp(listener)?


mod tests {
use super::*;
Expand Down
173 changes: 101 additions & 72 deletions safekeeper/src/bin/
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
use anyhow::{bail, Context, Result};
use clap::Parser;
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use remote_storage::RemoteStorageConfig;
use tokio::runtime::Handle;
use tokio::signal::unix::{signal, SignalKind};
use tokio::task::JoinError;
use toml_edit::Document;
use utils::signals::ShutdownSignals;

use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use storage_broker::Uri;
use tokio::sync::mpsc;
Expand All @@ -20,22 +24,21 @@ use tracing::*;
use utils::pid_file;

use metrics::set_build_info_metric;
use safekeeper::broker;
use safekeeper::control_file;
use safekeeper::defaults::{
use safekeeper::http;
use safekeeper::remove_wal;
use safekeeper::wal_backup;
use safekeeper::wal_service;
use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
use safekeeper::{control_file, BROKER_RUNTIME};
use safekeeper::{http, WAL_REMOVER_RUNTIME};
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
use safekeeper::{wal_backup, HTTP_RUNTIME};
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::JwtAuth;
use utils::{
logging::{self, LogFormat},
Expand Down Expand Up @@ -104,10 +107,6 @@ struct Args {
/// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes
#[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)]
max_offloader_lag: u64,
/// Number of threads for wal backup runtime, by default number of cores
/// available to the system.
wal_backup_threads: Option<usize>,
/// Number of max parallel WAL segments to be offloaded to remote storage.
#[arg(long, default_value = "5")]
wal_backup_parallel_jobs: usize,
Expand All @@ -121,9 +120,14 @@ struct Args {
/// Format for logging, either 'plain' or 'json'.
#[arg(long, default_value = "plain")]
log_format: String,
/// Run everything in single threaded current thread runtime, might be
/// useful for debugging.
current_thread_runtime: bool,

fn main() -> anyhow::Result<()> {
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();

if let Some(addr) = args.dump_control_file {
Expand Down Expand Up @@ -183,21 +187,25 @@ fn main() -> anyhow::Result<()> {
heartbeat_timeout: args.heartbeat_timeout,
remote_storage: args.remote_storage,
max_offloader_lag_bytes: args.max_offloader_lag,
backup_runtime_threads: args.wal_backup_threads,
wal_backup_enabled: !args.disable_wal_backup,
backup_parallel_jobs: args.wal_backup_parallel_jobs,
current_thread_runtime: args.current_thread_runtime,

// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(
&[("node_id", &conf.my_id.to_string())],

fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
/// Result of joining any of main tasks: upper error means task failed to
/// complete, e.g. panicked, inner is error produced by task itself.
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;

async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
// Prevent running multiple safekeepers on the same directory
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
let lock_file =
Expand All @@ -208,14 +216,18 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
// we need to release the lock file only when the current process is gone

let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
info!("starting safekeeper WAL service on {}", conf.listen_pg_addr);
let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);

info!("starting safekeeper on {}", conf.listen_pg_addr);
let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
"starting safekeeper HTTP service on {}",
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);

Expand All @@ -224,71 +236,88 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let timeline_collector = safekeeper::metrics::TimelineCollector::new();

let mut threads = vec![];
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);

// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;

let conf_ = conf.clone();
.spawn(|| {
let router = http::make_router(conf_);
std::future::pending(), // never shut down
// Keep handles to main tasks to die if any of them disappears.
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =

let conf_cloned = conf.clone();
let safekeeper_thread = thread::Builder::new()
.name("WAL service thread".into())
.spawn(|| wal_service::thread_main(conf_cloned, pg_listener))
let conf_ = conf.clone();
// Run everything in current thread rt, if asked.
if conf.current_thread_runtime {
info!("running in current thread runtime");
let current_thread_rt = conf
.then(|| Handle::try_current().expect("no runtime in main"));
let wal_service_handle = current_thread_rt
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(wal_service::task_main(conf_, pg_listener))
// wrap with task name for error reporting
.map(|res| ("WAL service main".to_owned(), res));

let conf_ = conf.clone();
let http_handle = current_thread_rt
.unwrap_or_else(|| HTTP_RUNTIME.handle())
.spawn(http::task_main(conf_, http_listener))
.map(|res| ("HTTP service main".to_owned(), res));

let conf_ = conf.clone();
.name("broker thread".into())
.spawn(|| {
let broker_task_handle = current_thread_rt
.unwrap_or_else(|| BROKER_RUNTIME.handle())
.map(|res| ("broker main".to_owned(), res));

let conf_ = conf.clone();
.name("WAL removal thread".into())
.spawn(|| {
let wal_remover_handle = current_thread_rt
.unwrap_or_else(|| WAL_REMOVER_RUNTIME.handle())
.map(|res| ("WAL remover".to_owned(), res));

.name("WAL backup launcher thread".into())
.spawn(move || {
wal_backup::wal_backup_launcher_thread_main(conf, wal_backup_launcher_rx);
let conf_ = conf.clone();
let wal_backup_handle = current_thread_rt
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
.map(|res| ("WAL backup launcher".to_owned(), res));

// TODO: put more thoughts into handling of failed threads
// We should catch & die if they are in trouble.

// On any shutdown signal, log receival and exit. Additionally, handling
// SIGQUIT prevents coredump.
ShutdownSignals::handle(|signal| {
info!("received {}, terminating",;

// TODO: update tokio-stream, convert to real async Stream with
// SignalStream, map it to obtain missing signal name, combine streams into
// single stream we can easily sit on.
let mut sigquit_stream = signal(SignalKind::quit())?;
let mut sigint_stream = signal(SignalKind::interrupt())?;
let mut sigterm_stream = signal(SignalKind::terminate())?;

tokio::select! {
Some((task_name, res)) => {
error!("{} task failed: {:?}, exiting", task_name, res);
// On any shutdown signal, log receival and exit. Additionally, handling
// SIGQUIT prevents coredump.
_ = sigquit_stream.recv() => info!("received SIGQUIT, terminating"),
_ = sigint_stream.recv() => info!("received SIGINT, terminating"),
_ = sigterm_stream.recv() => info!("received SIGTERM, terminating")


/// Determine safekeeper id.
Expand Down

0 comments on commit 227271c

Please sign in to comment.