Skip to content

Commit

Permalink
feat(pagebench): add aux file bench (#7746)
Browse files Browse the repository at this point in the history
part of #7462

## Summary of changes

This pull request adds two APIs to the pageserver management API:
list_aux_files and ingest_aux_files. The aux file pagebench is intended
to be used on an empty timeline because the data do not go through the
safekeeper. LSNs are advanced by 8 for each ingestion, to avoid
invariant checks inside the pageserver.

For now, I only care about space amplification / read amplification, so
the bench is designed in a very simple way: ingest 10000 files, and I
will manually dump the layer map to analyze.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
  • Loading branch information
skyzh authored and a-masterov committed May 20, 2024
1 parent 21ef21b commit 052ca59
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 0 deletions.
10 changes: 10 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,16 @@ pub struct DownloadRemoteLayersTaskSpawnRequest {
pub max_concurrent_downloads: NonZeroUsize,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct IngestAuxFilesRequest {
pub aux_files: HashMap<String, String>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ListAuxFilesRequest {
pub lsn: Lsn,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DownloadRemoteLayersTaskInfo {
pub task_id: String,
Expand Down
57 changes: 57 additions & 0 deletions pageserver/client/src/mgmt_api.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::collections::HashMap;

use bytes::Bytes;
use pageserver_api::{models::*, shard::TenantShardId};
use reqwest::{IntoUrl, Method, StatusCode};
use utils::{
http::error::HttpErrorBody,
id::{TenantId, TimelineId},
lsn::Lsn,
};

pub mod util;
Expand Down Expand Up @@ -561,4 +565,57 @@ impl Client {
}),
}
}

pub async fn ingest_aux_files(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
aux_files: HashMap<String, String>,
) -> Result<bool> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/ingest_aux_files",
self.mgmt_api_endpoint, tenant_shard_id, timeline_id
);
let resp = self
.request_noerror(Method::POST, &uri, IngestAuxFilesRequest { aux_files })
.await?;
match resp.status() {
StatusCode::OK => Ok(true),
status => Err(match resp.json::<HttpErrorBody>().await {
Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
Err(_) => {
Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
}
}),
}
}

pub async fn list_aux_files(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<HashMap<String, Bytes>> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/list_aux_files",
self.mgmt_api_endpoint, tenant_shard_id, timeline_id
);
let resp = self
.request_noerror(Method::POST, &uri, ListAuxFilesRequest { lsn })
.await?;
match resp.status() {
StatusCode::OK => {
let resp: HashMap<String, Bytes> = resp.json().await.map_err(|e| {
Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{e}"))
})?;
Ok(resp)
}
status => Err(match resp.json::<HttpErrorBody>().await {
Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
Err(_) => {
Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
}
}),
}
}
}
98 changes: 98 additions & 0 deletions pageserver/pagebench/src/cmd/aux_files.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use pageserver_api::models::{AuxFilePolicy, TenantConfig, TenantConfigRequest};
use pageserver_api::shard::TenantShardId;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;

use std::collections::HashMap;
use std::sync::Arc;

/// Ingest aux files into the pageserver.
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
page_service_connstring: String,
#[clap(long)]
pageserver_jwt: Option<String>,

targets: Option<Vec<TenantTimelineId>>,
}

pub(crate) fn main(args: Args) -> anyhow::Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let main_task = rt.spawn(main_impl(args));
rt.block_on(main_task).unwrap()
}

async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));

let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
));

// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
&mgmt_api_client,
crate::util::cli::targets::Spec {
limit_to_first_n_targets: None,
targets: {
if let Some(targets) = &args.targets {
if targets.len() != 1 {
anyhow::bail!("must specify exactly one target");
}
Some(targets.clone())
} else {
None
}
},
},
)
.await?;

let timeline = timelines[0];
let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id);
let timeline_id = timeline.timeline_id;

println!("operating on timeline {}", timeline);

mgmt_api_client
.tenant_config(&TenantConfigRequest {
tenant_id: timeline.tenant_id,
config: TenantConfig {
switch_aux_file_policy: Some(AuxFilePolicy::V2),
..Default::default()
},
})
.await?;

for batch in 0..100 {
let items = (0..100)
.map(|id| {
(
format!("pg_logical/mappings/{:03}.{:03}", batch, id),
format!("{:08}", id),
)
})
.collect::<HashMap<_, _>>();
let file_cnt = items.len();
mgmt_api_client
.ingest_aux_files(tenant_shard_id, timeline_id, items)
.await?;
println!("ingested {file_cnt} files");
}

let files = mgmt_api_client
.list_aux_files(tenant_shard_id, timeline_id, Lsn(Lsn::MAX.0 - 1))
.await?;

println!("{} files found", files.len());

anyhow::Ok(())
}
3 changes: 3 additions & 0 deletions pageserver/pagebench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod util {

/// The pagebench CLI sub-commands, dispatched in [`main`] below.
mod cmd {
pub(super) mod aux_files;
pub(super) mod basebackup;
pub(super) mod getpage_latest_lsn;
pub(super) mod ondemand_download_churn;
Expand All @@ -27,6 +28,7 @@ enum Args {
GetPageLatestLsn(cmd::getpage_latest_lsn::Args),
TriggerInitialSizeCalculation(cmd::trigger_initial_size_calculation::Args),
OndemandDownloadChurn(cmd::ondemand_download_churn::Args),
AuxFiles(cmd::aux_files::Args),
}

fn main() {
Expand All @@ -46,6 +48,7 @@ fn main() {
cmd::trigger_initial_size_calculation::main(args)
}
Args::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args),
Args::AuxFiles(args) => cmd::aux_files::main(args),
}
.unwrap()
}
75 changes: 75 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::IngestAuxFilesRequest;
use pageserver_api::models::ListAuxFilesRequest;
use pageserver_api::models::LocationConfig;
use pageserver_api::models::LocationConfigListResponse;
use pageserver_api::models::ShardParameters;
Expand Down Expand Up @@ -2331,6 +2333,71 @@ async fn get_utilization(
.map_err(ApiError::InternalServerError)
}

async fn list_aux_files(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let body: ListAuxFilesRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;

let state = get_state(&request);

let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;

let process = || async move {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let files = timeline.list_aux_files(body.lsn, &ctx).await?;
Ok::<_, anyhow::Error>(files)
};

match process().await {
Ok(st) => json_response(StatusCode::OK, st),
Err(err) => json_response(
StatusCode::INTERNAL_SERVER_ERROR,
ApiError::InternalServerError(err).to_string(),
),
}
}

async fn ingest_aux_files(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let body: IngestAuxFilesRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;

let state = get_state(&request);

let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;

let process = || async move {
let mut modification = timeline.begin_modification(Lsn(
timeline.get_last_record_lsn().0 + 8
) /* advance LSN by 8 */);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
for (fname, content) in body.aux_files {
modification
.put_file(&fname, content.as_bytes(), &ctx)
.await?;
}
modification.commit(&ctx).await?;
Ok::<_, anyhow::Error>(())
};

match process().await {
Ok(st) => json_response(StatusCode::OK, st),
Err(err) => Err(ApiError::InternalServerError(err)),
}
}

/// Report on the largest tenants on this pageserver, for the storage controller to identify
/// candidates for splitting
async fn post_top_tenants(
Expand Down Expand Up @@ -2708,6 +2775,14 @@ pub fn make_router(
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",
|r| testing_api_handler("ingest_aux_files", r, ingest_aux_files),
)
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/list_aux_files",
|r| testing_api_handler("list_aux_files", r, list_aux_files),
)
.post("/v1/top_tenants", |r| api_handler(r, post_top_tenants))
.any(handler_404))
}

0 comments on commit 052ca59

Please sign in to comment.