Skip to content

Commit

Permalink
WIP Upload partial segments
Browse files Browse the repository at this point in the history
  • Loading branch information
petuhovskiy committed Mar 18, 2024
1 parent 2bc2fd9 commit 68281b3
Show file tree
Hide file tree
Showing 11 changed files with 500 additions and 18 deletions.
1 change: 1 addition & 0 deletions safekeeper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ once_cell.workspace = true
parking_lot.workspace = true
postgres.workspace = true
postgres-protocol.workspace = true
rand.workspace = true
regex.workspace = true
scopeguard.workspace = true
reqwest = { workspace = true, features = ["json"] }
Expand Down
13 changes: 12 additions & 1 deletion safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use utils::pid_file;
use metrics::set_build_info_metric;
use safekeeper::defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
};
use safekeeper::wal_service;
use safekeeper::GlobalTimelines;
Expand Down Expand Up @@ -170,6 +170,13 @@ struct Args {
/// still needed for existing replication connection.
#[arg(long)]
walsenders_keep_horizon: bool,
/// Enable partial backup. If disabled, safekeeper will not upload partial
/// segments to remote storage.
#[arg(long)]
partial_backup_enabled: bool,
/// Controls how long backup will wait until uploading the partial segment.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_PARTIAL_BACKUP_TIMEOUT, verbatim_doc_comment)]
partial_backup_timeout: Duration,
}

// Like PathBufValueParser, but allows empty string.
Expand Down Expand Up @@ -300,6 +307,8 @@ async fn main() -> anyhow::Result<()> {
http_auth,
current_thread_runtime: args.current_thread_runtime,
walsenders_keep_horizon: args.walsenders_keep_horizon,
partial_backup_enabled: args.partial_backup_enabled,
partial_backup_timeout: args.partial_backup_timeout,
};

// initialize sentry if SENTRY_DSN is provided
Expand Down Expand Up @@ -365,6 +374,8 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {

let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);

wal_backup::init_remote_storage(&conf);

// Keep handles to main tasks to die if any of them disappears.
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
FuturesUnordered::new();
Expand Down
72 changes: 72 additions & 0 deletions safekeeper/src/control_file_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use crate::{
safekeeper::{AcceptorState, PgUuid, ServerInfo, Term, TermHistory, TermLsn},
state::{PersistedPeers, TimelinePersistentState},
wal_backup_partial,
};
use anyhow::{bail, Result};
use pq_proto::SystemId;
Expand Down Expand Up @@ -138,6 +139,50 @@ pub struct SafeKeeperStateV4 {
pub peers: PersistedPeers,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SafeKeeperStateV7 {
#[serde(with = "hex")]
pub tenant_id: TenantId,
#[serde(with = "hex")]
pub timeline_id: TimelineId,
/// persistent acceptor state
pub acceptor_state: AcceptorState,
/// information about server
pub server: ServerInfo,
/// Unique id of the last *elected* proposer we dealt with. Not needed
/// for correctness, exists for monitoring purposes.
#[serde(with = "hex")]
pub proposer_uuid: PgUuid,
/// Since which LSN this timeline generally starts. Safekeeper might have
/// joined later.
pub timeline_start_lsn: Lsn,
/// Since which LSN safekeeper has (had) WAL for this timeline.
/// All WAL segments next to one containing local_start_lsn are
/// filled with data from the beginning.
pub local_start_lsn: Lsn,
/// Part of WAL acknowledged by quorum *and available locally*. Always points
/// to record boundary.
pub commit_lsn: Lsn,
/// LSN that points to the end of the last backed up segment. Useful to
/// persist to avoid finding out offloading progress on boot.
pub backup_lsn: Lsn,
/// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
/// of last record streamed to everyone). Persisting it helps skipping
/// recovery in walproposer, generally we compute it from peers. In
/// walproposer proto called 'truncate_lsn'. Updates are currently drived
/// only by walproposer.
pub peer_horizon_lsn: Lsn,
/// LSN of the oldest known checkpoint made by pageserver and successfully
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
/// informational purposes, we receive it from pageserver (or broker).
pub remote_consistent_lsn: Lsn,
// Peers and their state as we remember it. Knowing peers themselves is
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
pub peers: PersistedPeers,
}

pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersistentState> {
// migrate to storing full term history
if version == 1 {
Expand Down Expand Up @@ -167,6 +212,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(vec![]),
partial_backup: wal_backup_partial::State::default(),
});
// migrate to hexing some ids
} else if version == 2 {
Expand All @@ -190,6 +236,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(vec![]),
partial_backup: wal_backup_partial::State::default(),
});
// migrate to moving tenant_id/timeline_id to the top and adding some lsns
} else if version == 3 {
Expand All @@ -213,6 +260,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(vec![]),
partial_backup: wal_backup_partial::State::default(),
});
// migrate to having timeline_start_lsn
} else if version == 4 {
Expand All @@ -236,6 +284,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
peer_horizon_lsn: oldstate.peer_horizon_lsn,
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(vec![]),
partial_backup: wal_backup_partial::State::default(),
});
} else if version == 5 {
info!("reading safekeeper control file version {}", version);
Expand Down Expand Up @@ -263,6 +312,29 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste

return Ok(oldstate);
}
if version == 7 {
info!("reading safekeeper control file version {}", version);
let oldstate = SafeKeeperStateV7::des(&buf[..buf.len()])?;

// TODO: persist the file back to the disk

return Ok(TimelinePersistentState {
tenant_id: oldstate.tenant_id,
timeline_id: oldstate.timeline_id,
acceptor_state: oldstate.acceptor_state,
server: oldstate.server,
proposer_uuid: oldstate.proposer_uuid,
timeline_start_lsn: oldstate.timeline_start_lsn,
local_start_lsn: oldstate.local_start_lsn,
commit_lsn: oldstate.commit_lsn,
backup_lsn: oldstate.backup_lsn,
peer_horizon_lsn: oldstate.peer_horizon_lsn,
remote_consistent_lsn: oldstate.remote_consistent_lsn,
peers: oldstate.peers,
partial_backup: wal_backup_partial::State::default(),
});
}

bail!("unsupported safekeeper control file version {}", version)
}

Expand Down
6 changes: 6 additions & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod send_wal;
pub mod state;
pub mod timeline;
pub mod wal_backup;
pub mod wal_backup_partial;
pub mod wal_service;
pub mod wal_storage;

Expand All @@ -48,6 +49,7 @@ pub mod defaults {

pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -79,6 +81,8 @@ pub struct SafeKeeperConf {
pub http_auth: Option<Arc<SwappableJwtAuth>>,
pub current_thread_runtime: bool,
pub walsenders_keep_horizon: bool,
pub partial_backup_enabled: bool,
pub partial_backup_timeout: Duration,
}

impl SafeKeeperConf {
Expand Down Expand Up @@ -123,6 +127,8 @@ impl SafeKeeperConf {
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
current_thread_runtime: false,
walsenders_keep_horizon: false,
partial_backup_enabled: false,
partial_backup_timeout: Duration::from_secs(0),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions safekeeper/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,7 @@ mod tests {
commit_lsn: Lsn(1234567600),
},
)]),
partial_backup: crate::wal_backup_partial::State::default(),
};

let ser = state.ser().unwrap();
Expand Down Expand Up @@ -1266,6 +1267,8 @@ mod tests {
0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
// partial_backup
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
];

assert_eq!(Hex(&ser), Hex(&expected));
Expand Down
13 changes: 9 additions & 4 deletions safekeeper/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use utils::{
use crate::{
control_file,
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
wal_backup_partial::{self},
};

/// Persistent information stored on safekeeper node about timeline.
Expand Down Expand Up @@ -54,11 +55,14 @@ pub struct TimelinePersistentState {
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
/// informational purposes, we receive it from pageserver (or broker).
pub remote_consistent_lsn: Lsn,
// Peers and their state as we remember it. Knowing peers themselves is
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
/// Peers and their state as we remember it. Knowing peers themselves is
/// fundamental; but state is saved here only for informational purposes and
/// obviously can be stale. (Currently not saved at all, but let's provision
/// place to have less file version upgrades).
pub peers: PersistedPeers,
/// Holds names of partial segments uploaded to remote storage. Used to
/// clean up old objects without leaving garbage in remote storage.
pub partial_backup: wal_backup_partial::State,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
Expand Down Expand Up @@ -93,6 +97,7 @@ impl TimelinePersistentState {
.map(|p| (*p, PersistedPeerInfo::new()))
.collect(),
),
partial_backup: wal_backup_partial::State::default(),
}
}

Expand Down
7 changes: 5 additions & 2 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};

use crate::metrics::FullTimelineInfo;
use crate::wal_storage::Storage as wal_storage_iface;
use crate::{debug_dump, wal_storage};
use crate::{debug_dump, wal_backup_partial, wal_storage};
use crate::{GlobalTimelines, SafeKeeperConf};

/// Things safekeeper should know about timeline state on peers.
Expand Down Expand Up @@ -503,6 +503,9 @@ impl Timeline {
if conf.peer_recovery_enabled {
tokio::spawn(recovery_main(self.clone(), conf.clone()));
}
if conf.is_wal_backup_enabled() && conf.partial_backup_enabled {
tokio::spawn(wal_backup_partial::main_task(self.clone(), conf.clone()));
}
}

/// Delete timeline from disk completely, by removing timeline directory.
Expand Down Expand Up @@ -667,8 +670,8 @@ impl Timeline {
term_flush_lsn =
TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
}
self.commit_lsn_watch_tx.send(commit_lsn)?;
self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
self.commit_lsn_watch_tx.send(commit_lsn)?;
Ok(rmsg)
}

Expand Down
32 changes: 22 additions & 10 deletions safekeeper/src/wal_backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@ fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
.unwrap()
}

pub fn init_remote_storage(conf: &SafeKeeperConf) {
// TODO: refactor REMOTE_STORAGE to avoid using global variables, and provide
// dependencies to all tasks instead.
REMOTE_STORAGE.get_or_init(|| {
conf.remote_storage
.as_ref()
.map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
});
}

const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;

/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
Expand All @@ -194,14 +204,6 @@ pub async fn wal_backup_launcher_task_main(
conf.remote_storage
);

let conf_ = conf.clone();
REMOTE_STORAGE.get_or_init(|| {
conf_
.remote_storage
.as_ref()
.map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
});

// Presence in this map means launcher is aware s3 offloading is needed for
// the timeline, but task is started only if it makes sense for to offload
// from this safekeeper.
Expand Down Expand Up @@ -498,7 +500,7 @@ fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
res
}

async fn backup_object(
pub async fn backup_object(
source_file: &Utf8Path,
target_file: &RemotePath,
size: usize,
Expand All @@ -509,7 +511,10 @@ async fn backup_object(
.await
.with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;

let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE);
// limiting the file to read only the first `size` bytes
let limited_file = tokio::io::AsyncReadExt::take(file, size as u64);

let file = tokio_util::io::ReaderStream::with_capacity(limited_file, BUFFER_SIZE);

let cancel = CancellationToken::new();

Expand Down Expand Up @@ -604,6 +609,13 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
Ok(())
}

/// Used by wal_backup_partial.
pub async fn delete_objects(paths: &[RemotePath]) -> Result<()> {
let cancel = CancellationToken::new(); // not really used
let storage = get_configured_remote_storage();
storage.delete_objects(paths, &cancel).await
}

/// Copy segments from one timeline to another. Used in copy_timeline.
pub async fn copy_s3_segments(
wal_seg_size: usize,
Expand Down
Loading

0 comments on commit 68281b3

Please sign in to comment.