Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

safekeeper: remove .partial suffix on the last WAL file. #7882

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion libs/utils/scripts/restore_from_wal.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@ declare -i WAL_SIZE=$REDO_POS+114
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
rm -f 000000010000000000000001
1 change: 0 additions & 1 deletion libs/utils/scripts/restore_from_wal_initdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ declare -i WAL_SIZE=$REDO_POS+114
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
rm -f 000000010000000000000001
27 changes: 22 additions & 5 deletions safekeeper/src/wal_backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,16 +469,33 @@ async fn backup_object(
.await
}

/// Source file should point to path with segment without .partial suffix; we'll
/// try to append .partial if file without it doesn't exist.
pub(crate) async fn backup_partial_segment(
source_file: &Utf8Path,
target_file: &RemotePath,
size: usize,
) -> Result<()> {
let storage = get_configured_remote_storage();

let file = File::open(&source_file)
.await
.with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
let storage: &GenericRemoteStorage = get_configured_remote_storage();
let mut partial_path = source_file.to_owned();
partial_path.set_extension("partial");

// First try opening without .partial prefix, if that fails, open legacy .partial one.
let file = match File::open(&source_file).await {
Ok(file) => file,
Err(full_e) => match File::open(&partial_path).await {
Ok(file) => file,
Err(partial_e) => {
anyhow::bail!(
"failed to open file for partial backup, {} error: '{}', {} error: '{}'",
source_file,
full_e,
partial_path,
partial_e
);
}
},
};

// limiting the file to read only the first `size` bytes
let limited_file = tokio::io::AsyncReadExt::take(file, size as u64);
Expand Down
6 changes: 1 addition & 5 deletions safekeeper/src/wal_backup_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@ impl PartialBackup {
self.conf.my_id.0,
)
}

fn local_segment_name(&self, segno: u64) -> String {
format!("{}.partial", self.segment_name(segno))
}
}

impl PartialBackup {
Expand Down Expand Up @@ -152,7 +148,7 @@ impl PartialBackup {
// We're going to backup bytes from the start of the segment up to flush_lsn.
let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);

let local_path = self.local_prefix.join(self.local_segment_name(segno));
let local_path = self.local_prefix.join(self.segment_name(segno));
let remote_path = RemotePath::new(self.remote_prefix.join(&prepared.name).as_ref())?;

// Upload first `backup_bytes` bytes of the segment to the remote storage.
Expand Down
56 changes: 25 additions & 31 deletions safekeeper/src/wal_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
//! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
//! PG timeline is always 1, so WAL segments are usually have names like this:
//! - 000000010000000000000001
//! - 000000010000000000000002.partial
//! - 000000010000000000000002
//!
//! Note that last file has `.partial` suffix, that's different from postgres.
//! In the past last file had `.partial` suffix, so code still can read it.

use anyhow::{bail, Context, Result};
use bytes::Bytes;
Expand Down Expand Up @@ -102,11 +102,13 @@ pub struct PhysicalStorage {

/// Cached open file for the last segment.
///
/// If Some(file) is open, then it always:
/// - has ".partial" suffix
/// If Some(file, is_partial) is open, then it always:
/// - points to write_lsn, so no seek is needed for writing
/// - doesn't point to the end of the segment
file: Option<File>,
///
/// If the file name has .partial suffix (created before suffix was
/// removed), the bool is True.
file: Option<(File, bool)>,

/// When false, we have just initialized storage using the LSN from find_end_of_wal().
/// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
Expand Down Expand Up @@ -243,29 +245,26 @@ impl PhysicalStorage {

// Note: this doesn't get into observe_flush_seconds metric. But
// segment init should be separate metric, if any.
if let Err(e) =
durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
{
if let Err(e) = durable_rename(&tmp_path, &wal_file_path, !self.conf.no_sync).await {
// Probably rename succeeded, but fsync of it failed. Remove
// the file then to avoid using it.
remove_file(wal_file_partial_path)
remove_file(wal_file_path)
.await
.or_else(utils::fs_ext::ignore_not_found)?;
return Err(e.into());
}
Ok((file, true))
Ok((file, false))
}
}

/// Write WAL bytes, which are known to be located in a single WAL segment.
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
let mut file = if let Some(file) = self.file.take() {
file
let (mut file, is_partial) = if let Some((file, is_partial)) = self.file.take() {
(file, is_partial)
} else {
let (mut file, is_partial) = self.open_or_create(segno).await?;
assert!(is_partial, "unexpected write into non-partial segment file");
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
file
(file, is_partial)
};

file.write_all(buf).await?;
Expand All @@ -278,13 +277,15 @@ impl PhysicalStorage {
// If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&file).await?;

// Rename partial file to completed file
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_partial_path, wal_file_path).await?;
// Rename partial file to completed file in case it was legacy .partial file.
if is_partial {
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_partial_path, wal_file_path).await?;
}
} else {
// otherwise, file can be reused later
self.file = Some(file);
self.file = Some((file, is_partial));
}

Ok(())
Expand All @@ -298,7 +299,7 @@ impl PhysicalStorage {
async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
if self.write_lsn != pos {
// need to flush the file before discarding it
if let Some(file) = self.file.take() {
if let Some((file, _)) = self.file.take() {
self.fdatasync_file(&file).await?;
}

Expand Down Expand Up @@ -402,9 +403,9 @@ impl Storage for PhysicalStorage {
return Ok(());
}

if let Some(unflushed_file) = self.file.take() {
if let Some((unflushed_file, is_partial)) = self.file.take() {
self.fdatasync_file(&unflushed_file).await?;
self.file = Some(unflushed_file);
self.file = Some((unflushed_file, is_partial));
} else {
// We have unflushed data (write_lsn != flush_lsn), but no file.
// This should only happen if last file was fully written and flushed,
Expand Down Expand Up @@ -445,7 +446,7 @@ impl Storage for PhysicalStorage {
}

// Close previously opened file, if any
if let Some(unflushed_file) = self.file.take() {
if let Some((unflushed_file, _)) = self.file.take() {
self.fdatasync_file(&unflushed_file).await?;
}

Expand All @@ -455,20 +456,13 @@ impl Storage for PhysicalStorage {
// Remove all segments after the given LSN.
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;

let (mut file, is_partial) = self.open_or_create(segno).await?;
let (mut file, _) = self.open_or_create(segno).await?;

// Fill end with zeroes
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
self.fdatasync_file(&file).await?;

if !is_partial {
// Make segment partial once again
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_path, wal_file_partial_path).await?;
}

// Update LSNs
self.write_lsn = end_pos;
self.write_record_lsn = end_pos;
Expand Down
12 changes: 5 additions & 7 deletions test_runner/regress/test_pg_waldump.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import shutil

from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.utils import subprocess_capture
Expand Down Expand Up @@ -48,14 +47,13 @@ def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
endpoint.stop()

assert endpoint.pgdata_dir
wal_path = os.path.join(endpoint.pgdata_dir, "pg_wal/000000010000000000000001")
seg_path = os.path.join(endpoint.pgdata_dir, "pg_wal/000000010000000000000001")
pg_waldump_path = os.path.join(pg_bin.pg_bin_path, "pg_waldump")
# check segment on compute
check_wal_segment(pg_waldump_path, wal_path, test_output_dir)
check_wal_segment(pg_waldump_path, seg_path, test_output_dir)

# Check file on safekeepers as well. pg_waldump is strict about file naming, so remove .partial suffix.
# Check file on safekeepers as well.
sk = env.safekeepers[0]
sk_tli_dir = sk.timeline_dir(tenant_id, timeline_id)
non_partial_path = os.path.join(sk_tli_dir, "000000010000000000000001")
shutil.copyfile(os.path.join(sk_tli_dir, "000000010000000000000001.partial"), non_partial_path)
check_wal_segment(pg_waldump_path, non_partial_path, test_output_dir)
seg_path = os.path.join(sk_tli_dir, "000000010000000000000001")
check_wal_segment(pg_waldump_path, seg_path, test_output_dir)
6 changes: 3 additions & 3 deletions test_runner/regress/test_wal_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,10 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):

# save the last (partial) file to put it back after recreation; others will be fetched from s3
sk = env.safekeepers[0]
tli_dir = Path(sk.data_dir) / str(tenant_id) / str(timeline_id)
f_partial = Path([f for f in os.listdir(tli_dir) if f.endswith(".partial")][0])
tli_dir = sk.data_dir / str(tenant_id) / str(timeline_id)
f_partial = sk.list_segments(tenant_id, timeline_id)[-1]
f_partial_path = tli_dir / f_partial
f_partial_saved = Path(sk.data_dir) / f_partial.name
f_partial_saved = sk.data_dir / f_partial
f_partial_path.rename(f_partial_saved)

pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version
Expand Down
Loading