Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into problame/init-logical…
Browse files Browse the repository at this point in the history
…-size-rework
  • Loading branch information
problame committed Dec 4, 2023
2 parents 4d5bc88 + 7403d55 commit 5b0e7a7
Show file tree
Hide file tree
Showing 17 changed files with 245 additions and 119 deletions.
21 changes: 20 additions & 1 deletion compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,12 @@ impl ComputeNode {

// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
let postgresql_conf_path = pgdata_path.join("postgresql.conf");
config::write_postgres_conf(&postgresql_conf_path, &spec, None)?;
// temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are reconfiguring:
// creating new extensions, roles, etc...
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
self.pg_reload_conf()?;

let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
Expand All @@ -749,6 +754,10 @@ impl ComputeNode {
// 'Close' connection
drop(client);

// reset max_cluster_size in config back to original value and reload config
config::compute_ctl_temp_override_remove(pgdata_path)?;
self.pg_reload_conf()?;

let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!(
Expand Down Expand Up @@ -809,7 +818,17 @@ impl ComputeNode {

let config_time = Utc::now();
if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
let pgdata_path = Path::new(&self.pgdata);
// temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are applying config:
// creating new extensions, roles, etc...
config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
self.pg_reload_conf()?;

self.apply_config(&compute_state)?;

config::compute_ctl_temp_override_remove(pgdata_path)?;
self.pg_reload_conf()?;
}

let startup_end_time = Utc::now();
Expand Down
20 changes: 20 additions & 0 deletions compute_tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,25 @@ pub fn write_postgres_conf(
writeln!(file, "neon.extension_server_port={}", port)?;
}

// This is essential to keep this line at the end of the file,
// because it is intended to override any settings above.
writeln!(file, "include_if_exists = 'compute_ctl_temp_override.conf'")?;

Ok(())
}

/// create file compute_ctl_temp_override.conf in pgdata_dir
/// add provided options to this file
pub fn compute_ctl_temp_override_create(pgdata_path: &Path, options: &str) -> Result<()> {
let path = pgdata_path.join("compute_ctl_temp_override.conf");
let mut file = File::create(path)?;
write!(file, "{}", options)?;
Ok(())
}

/// remove file compute_ctl_temp_override.conf in pgdata_dir
pub fn compute_ctl_temp_override_remove(pgdata_path: &Path) -> Result<()> {
let path = pgdata_path.join("compute_ctl_temp_override.conf");
std::fs::remove_file(path)?;
Ok(())
}
13 changes: 0 additions & 13 deletions compute_tools/src/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,6 @@ pub fn get_spec_from_control_plane(
spec
}

/// It takes cluster specification and does the following:
/// - Serialize cluster config and put it into `postgresql.conf` completely rewriting the file.
/// - Update `pg_hba.conf` to allow external connections.
pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
// File `postgresql.conf` is no longer included into `basebackup`, so just
// always write all config into it creating new file.
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?;

update_pg_hba(pgdata_path)?;

Ok(())
}

/// Check `pg_hba.conf` and update if needed to allow external connections.
pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of spec.json
Expand Down
2 changes: 1 addition & 1 deletion libs/remote_storage/src/s3_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl RemoteStorage for S3Bucket {
let empty = Vec::new();
let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);

tracing::info!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());

for object in keys {
let object_path = object.key().expect("response does not contain a key");
Expand Down
13 changes: 13 additions & 0 deletions libs/utils/src/generation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,16 @@ impl Debug for Generation {
}
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn generation_gt() {
// Important that a None generation compares less than a valid one, during upgrades from
// pre-generation systems.
assert!(Generation::none() < Generation::new(0));
assert!(Generation::none() < Generation::new(1));
}
}
17 changes: 17 additions & 0 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,8 @@ pub(crate) static WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM: Lazy<Histogram> =
pub(crate) struct WalRedoProcessCounters {
pub(crate) started: IntCounter,
pub(crate) killed_by_cause: enum_map::EnumMap<WalRedoKillCause, IntCounter>,
pub(crate) active_stderr_logger_tasks_started: IntCounter,
pub(crate) active_stderr_logger_tasks_finished: IntCounter,
}

#[derive(Debug, enum_map::Enum, strum_macros::IntoStaticStr)]
Expand All @@ -1411,13 +1413,28 @@ impl Default for WalRedoProcessCounters {
&["cause"],
)
.unwrap();

let active_stderr_logger_tasks_started = register_int_counter!(
"pageserver_walredo_stderr_logger_tasks_started_total",
"Number of active walredo stderr logger tasks that have started",
)
.unwrap();

let active_stderr_logger_tasks_finished = register_int_counter!(
"pageserver_walredo_stderr_logger_tasks_finished_total",
"Number of active walredo stderr logger tasks that have finished",
)
.unwrap();

Self {
started,
killed_by_cause: EnumMap::from_array(std::array::from_fn(|i| {
let cause = <WalRedoKillCause as enum_map::Enum>::from_usize(i);
let cause_str: &'static str = cause.into();
killed.with_label_values(&[cause_str])
})),
active_stderr_logger_tasks_started,
active_stderr_logger_tasks_finished,
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ impl WalRedoManager {
}
}

/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn request_redo(
&self,
key: crate::repository::Key,
Expand Down Expand Up @@ -3852,6 +3855,9 @@ pub(crate) mod harness {
pub(crate) struct TestRedoManager;

impl TestRedoManager {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn request_redo(
&self,
key: Key,
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/remote_timeline_client/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ pub(super) async fn download_index_part(
None => {
// Migration from legacy pre-generation state: we have a generation but no prior
// attached pageservers did. Try to load from a no-generation path.
tracing::info!("No index_part.json* found");
tracing::debug!("No index_part.json* found");
do_download_index_part(
storage,
tenant_shard_id,
Expand Down
12 changes: 6 additions & 6 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,12 @@ impl Timeline {
.access_stats_behavior(AccessStatsBehavior::Skip)
.build();

// 2. Create new image layers for partitions that have been modified
// 2. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size, ctx).await?;
timer.stop_and_record();

// 3. Create new image layers for partitions that have been modified
// "enough".
let layers = self
.create_image_layers(&partitioning, lsn, false, &image_ctx)
Expand All @@ -818,11 +823,6 @@ impl Timeline {
}
}

// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size, ctx).await?;
timer.stop_and_record();

if let Some(remote_client) = &self.remote_client {
// should any new image layer been created, not uploading index_part will
// result in a mismatch between remote_physical_size and layermap calculated
Expand Down
93 changes: 30 additions & 63 deletions pageserver/src/walredo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use std::process::{Child, ChildStdin, ChildStdout, Command};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::time::Duration;
use std::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};

Expand Down Expand Up @@ -124,7 +123,9 @@ impl PostgresRedoManager {
/// The WAL redo is handled by a separate thread, so this just sends a request
/// to the thread and waits for response.
///
/// CANCEL SAFETY: NOT CANCEL SAFE.
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn request_redo(
&self,
key: Key,
Expand Down Expand Up @@ -157,7 +158,6 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
};
img = Some(result?);

Expand All @@ -178,7 +178,6 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
}
}
}
Expand Down Expand Up @@ -216,7 +215,7 @@ impl PostgresRedoManager {
/// Process one request for WAL redo using wal-redo postgres
///
#[allow(clippy::too_many_arguments)]
async fn apply_batch_postgres(
fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
Expand Down Expand Up @@ -332,12 +331,7 @@ impl PostgresRedoManager {
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
// This probably needs revisiting at some later point.
let mut wait_done = proc.stderr_logger_task_done.clone();
drop(proc);
wait_done
.wait_for(|v| *v)
.await
.expect("we use scopeguard to ensure we always send `true` to the channel before dropping the sender");
} else if n_attempts != 0 {
info!(n_attempts, "retried walredo succeeded");
}
Expand Down Expand Up @@ -649,8 +643,6 @@ struct WalRedoProcess {
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
stderr_logger_cancel: CancellationToken,
stderr_logger_task_done: tokio::sync::watch::Receiver<bool>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
Expand Down Expand Up @@ -699,6 +691,8 @@ impl WalRedoProcess {
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let stderr = tokio::process::ChildStderr::from_std(stderr)
.context("convert to tokio::ChildStderr")?;
macro_rules! set_nonblock_or_log_err {
($file:ident) => {{
let res = set_nonblock($file.as_raw_fd());
Expand All @@ -710,69 +704,45 @@ impl WalRedoProcess {
}
set_nonblock_or_log_err!(stdin)?;
set_nonblock_or_log_err!(stdout)?;
set_nonblock_or_log_err!(stderr)?;

let mut stderr = tokio::io::unix::AsyncFd::new(stderr).context("AsyncFd::with_interest")?;

// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);

let stderr_logger_cancel = CancellationToken::new();
let (stderr_logger_task_done_tx, stderr_logger_task_done_rx) =
tokio::sync::watch::channel(false);
tokio::spawn({
let stderr_logger_cancel = stderr_logger_cancel.clone();
tokio::spawn(
async move {
scopeguard::defer! {
debug!("wal-redo-postgres stderr_logger_task finished");
let _ = stderr_logger_task_done_tx.send(true);
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
}
debug!("wal-redo-postgres stderr_logger_task started");
loop {
// NB: we purposefully don't do a select! for the cancellation here.
// The cancellation would likely cause us to miss stderr messages.
// We can rely on this to return from .await because when we SIGKILL
// the child, the writing end of the stderr pipe gets closed.
match stderr.readable_mut().await {
Ok(mut guard) => {
let mut errbuf = [0; 16384];
let res = guard.try_io(|fd| {
use std::io::Read;
fd.get_mut().read(&mut errbuf)
});
match res {
Ok(Ok(0)) => {
// it closed the stderr pipe
break;
}
Ok(Ok(n)) => {
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
let output = String::from_utf8_lossy(&errbuf[0..n]).to_string();
error!(output, "received output");
},
Ok(Err(e)) => {
error!(error = ?e, "read() error, waiting for cancellation");
stderr_logger_cancel.cancelled().await;
error!(error = ?e, "read() error, cancellation complete");
break;
}
Err(e) => {
let _e: tokio::io::unix::TryIoError = e;
// the read() returned WouldBlock, that's expected
}
}
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();

use tokio::io::AsyncBufReadExt;
let mut stderr_lines = tokio::io::BufReader::new(stderr);
let mut buf = Vec::new();
let res = loop {
buf.clear();
// TODO we don't trust the process to cap its stderr length.
// Currently it can do unbounded Vec allocation.
match stderr_lines.read_until(b'\n', &mut buf).await {
Ok(0) => break Ok(()), // eof
Ok(num_bytes) => {
let output = String::from_utf8_lossy(&buf[..num_bytes]);
error!(%output, "received output");
}
Err(e) => {
error!(error = ?e, "read() error, waiting for cancellation");
stderr_logger_cancel.cancelled().await;
error!(error = ?e, "read() error, cancellation complete");
break;
break Err(e);
}
}
};
match res {
Ok(()) => (),
Err(e) => {
error!(error=?e, "failed to read from walredo stderr");
}
}
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_id, %pg_version))
});
);

Ok(Self {
conf,
Expand All @@ -787,8 +757,6 @@ impl WalRedoProcess {
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
stderr_logger_cancel,
stderr_logger_task_done: stderr_logger_task_done_rx,
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
})
Expand Down Expand Up @@ -1029,7 +997,6 @@ impl Drop for WalRedoProcess {
.take()
.expect("we only do this once")
.kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
self.stderr_logger_cancel.cancel();
// no way to wait for stderr_logger_task from Drop because that is async only
}
}
Expand Down
Loading

0 comments on commit 5b0e7a7

Please sign in to comment.