Skip to content

Commit

Permalink
Merge branch 'main' into skyzh/pagebench-aux-files
Browse files Browse the repository at this point in the history
  • Loading branch information
skyzh committed May 17, 2024
2 parents 2316b4c + aaf6081 commit f106bdd
Show file tree
Hide file tree
Showing 12 changed files with 648 additions and 11 deletions.
134 changes: 134 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
io::{BufRead, Read},
num::{NonZeroU64, NonZeroUsize},
str::FromStr,
sync::atomic::AtomicUsize,
time::{Duration, SystemTime},
};

Expand Down Expand Up @@ -308,13 +309,88 @@ pub struct TenantConfig {
pub switch_aux_file_policy: Option<AuxFilePolicy>,
}

/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy`
/// tenant config. When the first aux file written, the policy will be persisted in the
/// `index_part.json` file and has a limited migration path.
///
/// Currently, we only allow the following migration path:
///
/// Unset -> V1
/// -> V2
/// -> CrossValidation -> V2
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum AuxFilePolicy {
/// V1 aux file policy: store everything in AUX_FILE_KEY
V1,
/// V2 aux file policy: store in the AUX_FILE keyspace
V2,
/// Cross validation runs both formats on the write path and does validation
/// on the read path.
CrossValidation,
}

impl AuxFilePolicy {
pub fn is_valid_migration_path(from: Option<Self>, to: Self) -> bool {
matches!(
(from, to),
(None, _) | (Some(AuxFilePolicy::CrossValidation), AuxFilePolicy::V2)
)
}

/// If a tenant writes aux files without setting `switch_aux_policy`, this value will be used.
pub fn default_tenant_config() -> Self {
Self::V1
}
}

/// The aux file policy memory flag. Users can store `Option<AuxFilePolicy>` into this atomic flag. 0 == unspecified.
pub struct AtomicAuxFilePolicy(AtomicUsize);

impl AtomicAuxFilePolicy {
pub fn new(policy: Option<AuxFilePolicy>) -> Self {
Self(AtomicUsize::new(
policy.map(AuxFilePolicy::to_usize).unwrap_or_default(),
))
}

pub fn load(&self) -> Option<AuxFilePolicy> {
match self.0.load(std::sync::atomic::Ordering::Acquire) {
0 => None,
other => Some(AuxFilePolicy::from_usize(other)),
}
}

pub fn store(&self, policy: Option<AuxFilePolicy>) {
self.0.store(
policy.map(AuxFilePolicy::to_usize).unwrap_or_default(),
std::sync::atomic::Ordering::Release,
);
}
}

impl AuxFilePolicy {
pub fn to_usize(self) -> usize {
match self {
Self::V1 => 1,
Self::CrossValidation => 2,
Self::V2 => 3,
}
}

pub fn try_from_usize(this: usize) -> Option<Self> {
match this {
1 => Some(Self::V1),
2 => Some(Self::CrossValidation),
3 => Some(Self::V2),
_ => None,
}
}

pub fn from_usize(this: usize) -> Self {
Self::try_from_usize(this).unwrap()
}
}

impl FromStr for AuxFilePolicy {
type Err = anyhow::Error;

Expand Down Expand Up @@ -604,6 +680,9 @@ pub struct TimelineInfo {
pub state: TimelineState,

pub walreceiver_status: String,

/// The last aux file policy being used on this timeline
pub last_aux_file_policy: Option<AuxFilePolicy>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -1515,4 +1594,59 @@ mod tests {
assert_eq!(actual, expected, "example on {line}");
}
}

#[test]
fn test_aux_file_migration_path() {
assert!(AuxFilePolicy::is_valid_migration_path(
None,
AuxFilePolicy::V1
));
assert!(AuxFilePolicy::is_valid_migration_path(
None,
AuxFilePolicy::V2
));
assert!(AuxFilePolicy::is_valid_migration_path(
None,
AuxFilePolicy::CrossValidation
));
// Self-migration is not a valid migration path, and the caller should handle it by itself.
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::V1),
AuxFilePolicy::V1
));
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::V2),
AuxFilePolicy::V2
));
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::CrossValidation),
AuxFilePolicy::CrossValidation
));
// Migrations not allowed
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::CrossValidation),
AuxFilePolicy::V1
));
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::V1),
AuxFilePolicy::V2
));
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::V2),
AuxFilePolicy::V1
));
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::V2),
AuxFilePolicy::CrossValidation
));
assert!(!AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::V1),
AuxFilePolicy::CrossValidation
));
// Migrations allowed
assert!(AuxFilePolicy::is_valid_migration_path(
Some(AuxFilePolicy::CrossValidation),
AuxFilePolicy::V2
));
}
}
1 change: 1 addition & 0 deletions pageserver/ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ fn handle_metadata(
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
println!("Current metadata:\n{meta:?}");
let mut update_meta = false;
// TODO: simplify this part
if let Some(disk_consistent_lsn) = disk_consistent_lsn {
meta = TimelineMetadata::new(
*disk_consistent_lsn,
Expand Down
2 changes: 2 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ async fn build_timeline_info_common(
state,

walreceiver_status,

last_aux_file_policy: timeline.last_aux_file_policy.load(),
};
Ok(info)
}
Expand Down
33 changes: 27 additions & 6 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use tracing::{debug, info, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::vec_map::{VecMap, VecMapOrdering};
use utils::{bin_ser::BeSer, lsn::Lsn};
Expand Down Expand Up @@ -718,10 +718,11 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
match self.get_switch_aux_file_policy() {
AuxFilePolicy::V1 => self.list_aux_files_v1(lsn, ctx).await,
AuxFilePolicy::V2 => self.list_aux_files_v2(lsn, ctx).await,
AuxFilePolicy::CrossValidation => {
let current_policy = self.last_aux_file_policy.load();
match current_policy {
Some(AuxFilePolicy::V1) | None => self.list_aux_files_v1(lsn, ctx).await,
Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
Some(AuxFilePolicy::CrossValidation) => {
let v1_result = self.list_aux_files_v1(lsn, ctx).await;
let v2_result = self.list_aux_files_v2(lsn, ctx).await;
match (v1_result, v2_result) {
Expand Down Expand Up @@ -1469,7 +1470,27 @@ impl<'a> DatadirModification<'a> {
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let policy = self.tline.get_switch_aux_file_policy();
let switch_policy = self.tline.get_switch_aux_file_policy();

let policy = {
let current_policy = self.tline.last_aux_file_policy.load();
// Allowed switch path:
// * no aux files -> v1/v2/cross-validation
// * cross-validation->v2
if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
self.tline.last_aux_file_policy.store(Some(switch_policy));
self.tline
.remote_client
.schedule_index_upload_for_aux_file_policy_update(Some(switch_policy))?;
info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
switch_policy
} else {
// This branch handles non-valid migration path, and the case that switch_policy == current_policy.
// And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit.
current_policy.unwrap_or(AuxFilePolicy::default_tenant_config())
}
};

if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
Expand Down
Loading

0 comments on commit f106bdd

Please sign in to comment.