Skip to content

Commit

Permalink
safekeeper: remove .partial suffix on the last WAL file.
Browse files Browse the repository at this point in the history
Reasons:
- it makes pg_waldump usage slightly more cumbersome, forcing to rename file.
- it makes pull_timeline slightly more cumbersome because at any
  moment source file can be renamed from partial to full.

Leave ability to read .partial files for backward compatibility.
  • Loading branch information
arssher committed May 25, 2024
1 parent 43f9a16 commit d07fd5d
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 52 deletions.
1 change: 0 additions & 1 deletion libs/utils/scripts/restore_from_wal.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@ declare -i WAL_SIZE=$REDO_POS+114
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
rm -f 000000010000000000000001
1 change: 0 additions & 1 deletion libs/utils/scripts/restore_from_wal_initdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ declare -i WAL_SIZE=$REDO_POS+114
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
rm -f 000000010000000000000001
27 changes: 22 additions & 5 deletions safekeeper/src/wal_backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,16 +469,33 @@ async fn backup_object(
.await
}

/// Source file should point to path with segment without .partial suffix; we'll
/// try to append .partial if file without it doesn't exist.
pub(crate) async fn backup_partial_segment(
source_file: &Utf8Path,
target_file: &RemotePath,
size: usize,
) -> Result<()> {
let storage = get_configured_remote_storage();

let file = File::open(&source_file)
.await
.with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
let storage: &GenericRemoteStorage = get_configured_remote_storage();
let mut partial_path = source_file.to_owned();
partial_path.set_extension("partial");

// First try opening without .partial prefix, if that fails, open legacy .partial one.
let file = match File::open(&source_file).await {
Ok(file) => file,
Err(full_e) => match File::open(&partial_path).await {
Ok(file) => file,
Err(partial_e) => {
anyhow::bail!(
"failed to open file for partial backup, {} error: '{}', {} error: '{}'",
source_file,
full_e,
partial_path,
partial_e
);
}
},
};

// limiting the file to read only the first `size` bytes
let limited_file = tokio::io::AsyncReadExt::take(file, size as u64);
Expand Down
6 changes: 1 addition & 5 deletions safekeeper/src/wal_backup_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@ impl PartialBackup {
self.conf.my_id.0,
)
}

fn local_segment_name(&self, segno: u64) -> String {
format!("{}.partial", self.segment_name(segno))
}
}

impl PartialBackup {
Expand Down Expand Up @@ -152,7 +148,7 @@ impl PartialBackup {
// We're going to backup bytes from the start of the segment up to flush_lsn.
let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);

let local_path = self.local_prefix.join(self.local_segment_name(segno));
let local_path = self.local_prefix.join(self.segment_name(segno));
let remote_path = RemotePath::new(self.remote_prefix.join(&prepared.name).as_ref())?;

// Upload first `backup_bytes` bytes of the segment to the remote storage.
Expand Down
56 changes: 25 additions & 31 deletions safekeeper/src/wal_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
//! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
//! PG timeline is always 1, so WAL segments are usually have names like this:
//! - 000000010000000000000001
//! - 000000010000000000000002.partial
//! - 000000010000000000000002
//!
//! Note that last file has `.partial` suffix, that's different from postgres.
//! In the past last file had `.partial` suffix, so code still can read it.

use anyhow::{bail, Context, Result};
use bytes::Bytes;
Expand Down Expand Up @@ -102,11 +102,13 @@ pub struct PhysicalStorage {

/// Cached open file for the last segment.
///
/// If Some(file) is open, then it always:
/// - has ".partial" suffix
/// If Some(file, is_partial) is open, then it always:
/// - points to write_lsn, so no seek is needed for writing
/// - doesn't point to the end of the segment
file: Option<File>,
///
/// If the file name has .partial suffix (created before suffix was
/// removed), the bool is True.
file: Option<(File, bool)>,

/// When false, we have just initialized storage using the LSN from find_end_of_wal().
/// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
Expand Down Expand Up @@ -243,29 +245,26 @@ impl PhysicalStorage {

// Note: this doesn't get into observe_flush_seconds metric. But
// segment init should be separate metric, if any.
if let Err(e) =
durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
{
if let Err(e) = durable_rename(&tmp_path, &wal_file_path, !self.conf.no_sync).await {
// Probably rename succeeded, but fsync of it failed. Remove
// the file then to avoid using it.
remove_file(wal_file_partial_path)
remove_file(wal_file_path)
.await
.or_else(utils::fs_ext::ignore_not_found)?;
return Err(e.into());
}
Ok((file, true))
Ok((file, false))
}
}

/// Write WAL bytes, which are known to be located in a single WAL segment.
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
let mut file = if let Some(file) = self.file.take() {
file
let (mut file, is_partial) = if let Some((file, is_partial)) = self.file.take() {
(file, is_partial)
} else {
let (mut file, is_partial) = self.open_or_create(segno).await?;
assert!(is_partial, "unexpected write into non-partial segment file");
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
file
(file, is_partial)
};

file.write_all(buf).await?;
Expand All @@ -278,13 +277,15 @@ impl PhysicalStorage {
// If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&file).await?;

// Rename partial file to completed file
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_partial_path, wal_file_path).await?;
// Rename partial file to completed file in case it was legacy .partial file.
if is_partial {
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_partial_path, wal_file_path).await?;
}
} else {
// otherwise, file can be reused later
self.file = Some(file);
self.file = Some((file, is_partial));
}

Ok(())
Expand All @@ -298,7 +299,7 @@ impl PhysicalStorage {
async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
if self.write_lsn != pos {
// need to flush the file before discarding it
if let Some(file) = self.file.take() {
if let Some((file, _)) = self.file.take() {
self.fdatasync_file(&file).await?;
}

Expand Down Expand Up @@ -402,9 +403,9 @@ impl Storage for PhysicalStorage {
return Ok(());
}

if let Some(unflushed_file) = self.file.take() {
if let Some((unflushed_file, is_partial)) = self.file.take() {
self.fdatasync_file(&unflushed_file).await?;
self.file = Some(unflushed_file);
self.file = Some((unflushed_file, is_partial));
} else {
// We have unflushed data (write_lsn != flush_lsn), but no file.
// This should only happen if last file was fully written and flushed,
Expand Down Expand Up @@ -445,7 +446,7 @@ impl Storage for PhysicalStorage {
}

// Close previously opened file, if any
if let Some(unflushed_file) = self.file.take() {
if let Some((unflushed_file, _)) = self.file.take() {
self.fdatasync_file(&unflushed_file).await?;
}

Expand All @@ -455,20 +456,13 @@ impl Storage for PhysicalStorage {
// Remove all segments after the given LSN.
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;

let (mut file, is_partial) = self.open_or_create(segno).await?;
let (mut file, _) = self.open_or_create(segno).await?;

// Fill end with zeroes
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
self.fdatasync_file(&file).await?;

if !is_partial {
// Make segment partial once again
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_path, wal_file_partial_path).await?;
}

// Update LSNs
self.write_lsn = end_pos;
self.write_record_lsn = end_pos;
Expand Down
12 changes: 5 additions & 7 deletions test_runner/regress/test_pg_waldump.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import shutil

from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.utils import subprocess_capture
Expand Down Expand Up @@ -48,14 +47,13 @@ def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
endpoint.stop()

assert endpoint.pgdata_dir
wal_path = os.path.join(endpoint.pgdata_dir, "pg_wal/000000010000000000000001")
seg_path = os.path.join(endpoint.pgdata_dir, "pg_wal/000000010000000000000001")
pg_waldump_path = os.path.join(pg_bin.pg_bin_path, "pg_waldump")
# check segment on compute
check_wal_segment(pg_waldump_path, wal_path, test_output_dir)
check_wal_segment(pg_waldump_path, seg_path, test_output_dir)

# Check file on safekeepers as well. pg_waldump is strict about file naming, so remove .partial suffix.
# Check file on safekeepers as well.
sk = env.safekeepers[0]
sk_tli_dir = sk.timeline_dir(tenant_id, timeline_id)
non_partial_path = os.path.join(sk_tli_dir, "000000010000000000000001")
shutil.copyfile(os.path.join(sk_tli_dir, "000000010000000000000001.partial"), non_partial_path)
check_wal_segment(pg_waldump_path, non_partial_path, test_output_dir)
seg_path = os.path.join(sk_tli_dir, "000000010000000000000001")
check_wal_segment(pg_waldump_path, seg_path, test_output_dir)
4 changes: 2 additions & 2 deletions test_runner/regress/test_wal_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,9 +592,9 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
# save the last (partial) file to put it back after recreation; others will be fetched from s3
sk = env.safekeepers[0]
tli_dir = Path(sk.data_dir()) / str(tenant_id) / str(timeline_id)
f_partial = Path([f for f in os.listdir(tli_dir) if f.endswith(".partial")][0])
f_partial = sk.list_segments(tenant_id, timeline_id)[-1]
f_partial_path = tli_dir / f_partial
f_partial_saved = Path(sk.data_dir()) / f_partial.name
f_partial_saved = Path(sk.data_dir()) / f_partial
f_partial_path.rename(f_partial_saved)

pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version
Expand Down

0 comments on commit d07fd5d

Please sign in to comment.