Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Secondary locations heatmaps, downloads & tests #5745

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9fd901b
neon_local: remote storage by default & enable multiple pageservers in
jcsp Nov 1, 2023
c7e6668
control_plane: add /inspect to attachment service
jcsp Nov 1, 2023
c27c7d8
control_plane: implement `cargo neon migrate`
jcsp Nov 1, 2023
28f752f
neon_local: don't use default remote storage if user specified it on CLI
jcsp Nov 1, 2023
11e9916
Use the same local_fs path as tests
jcsp Nov 1, 2023
b698bc9
tests: explicitly disable remote storage if it is None
jcsp Nov 1, 2023
497b09b
control_plane: use `pageserver/` sub path for local_fs
jcsp Nov 2, 2023
6f0a225
pageserver: add secondary-mode metrics
jcsp Nov 7, 2023
fc3c7aa
pageserver: add TenantConf::enable_heatmap (default false)
jcsp Nov 2, 2023
c60ec31
pageserver: add secondary downloader & heatmaps
jcsp Oct 12, 2023
06e0370
pageserver: TenantManager support for SecondaryTenant
jcsp Oct 11, 2023
c7dd9d9
Add Timeline::generate_heatmap, remote client heatmap upload
jcsp Oct 12, 2023
80e18ac
pageserver: launch tasks for secondary mode
jcsp Oct 26, 2023
56fe8ce
Refactor Workload into shared location
jcsp Oct 12, 2023
30d9830
tests: refactor a path helper
jcsp Sep 8, 2023
ce29d15
tests: add helpers for location_conf
jcsp Oct 13, 2023
33a66b9
tests: add tests for secondary mode & multi attach
jcsp Sep 7, 2023
afc8315
pageserver: implement flush on location conf update
jcsp Nov 6, 2023
8135cd7
pageserver/http: add testing routes for secondary mode
jcsp Oct 24, 2023
8506008
pageserver: create timelines/ dir when configuring secondary location
jcsp Oct 24, 2023
d9e2807
pageserver: add Layer::for_secondary
jcsp Oct 26, 2023
b55e295
pageserver: pass TenantManager into disk usage eviction task
jcsp Oct 26, 2023
028b57a
pageserver: include secondary tenants in disk usage eviction
jcsp Oct 26, 2023
c5f7c6b
pageserver: add debug_assert_current_span_has_tenant_id
jcsp Nov 7, 2023
c4bd100
tests: set a backoff_factor for test HTTP retries
jcsp Nov 8, 2023
9c52a30
Merge branch 'jcsp/neon-local-migration' into jcsp/secondary-location…
jcsp Nov 9, 2023
6901bf4
Use secondary mode bits in neon_local migration
jcsp Nov 10, 2023
ebf151f
f http routes
jcsp Nov 10, 2023
3107010
pageserver: error type for collect_keyspace
jcsp Nov 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions control_plane/src/attachment_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ pub struct AttachHookResponse {
pub gen: Option<u32>,
}

#[derive(Serialize, Deserialize)]
pub struct InspectRequest {
pub tenant_id: TenantId,
}

#[derive(Serialize, Deserialize)]
pub struct InspectResponse {
pub attachment: Option<(u32, NodeId)>,
}

impl AttachmentService {
pub fn from_env(env: &LocalEnv) -> Self {
let path = env.base_data_dir.join("attachments.json");
Expand Down Expand Up @@ -101,4 +111,29 @@ impl AttachmentService {
let response = response.json::<AttachHookResponse>()?;
Ok(response.gen)
}

pub fn inspect(&self, tenant_id: TenantId) -> anyhow::Result<Option<(u32, NodeId)>> {
use hyper::StatusCode;

let url = self
.env
.control_plane_api
.clone()
.unwrap()
.join("inspect")
.unwrap();
let client = reqwest::blocking::ClientBuilder::new()
.build()
.expect("Failed to construct http client");

let request = InspectRequest { tenant_id };

let response = client.post(url).json(&request).send()?;
if response.status() != StatusCode::OK {
return Err(anyhow!("Unexpected status {}", response.status()));
}

let response = response.json::<InspectResponse>()?;
Ok(response.attachment)
}
}
20 changes: 19 additions & 1 deletion control_plane/src/bin/attachment_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use pageserver_api::control_api::{
ValidateResponseTenant,
};

use control_plane::attachment_service::{AttachHookRequest, AttachHookResponse};
use control_plane::attachment_service::{
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
};

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -255,12 +257,28 @@ async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, Ap
)
}

async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
let inspect_req = json_request::<InspectRequest>(&mut req).await?;

let state = get_state(&req).inner.clone();
let locked = state.write().await;
let tenant_state = locked.tenants.get(&inspect_req.tenant_id);

json_response(
StatusCode::OK,
InspectResponse {
attachment: tenant_state.and_then(|s| s.pageserver.map(|ps| (s.generation, ps))),
},
)
}

fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body, ApiError> {
endpoint::make_router()
.data(Arc::new(State::new(persistent_state)))
.post("/re-attach", |r| request_span(r, handle_re_attach))
.post("/validate", |r| request_span(r, handle_validate))
.post("/attach-hook", |r| request_span(r, handle_attach_hook))
.post("/inspect", |r| request_span(r, handle_inspect))
}

#[tokio::main]
Expand Down
108 changes: 81 additions & 27 deletions control_plane/src/bin/neon_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use compute_api::spec::ComputeMode;
use control_plane::attachment_service::AttachmentService;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode;
use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR};
use control_plane::safekeeper::SafekeeperNode;
use control_plane::tenant_migration::migrate_tenant;
use control_plane::{broker, local_env};
use pageserver_api::models::TimelineInfo;
use pageserver_api::{
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
};
use postgres_backend::AuthType;
use safekeeper_api::{
Expand Down Expand Up @@ -46,30 +47,42 @@ const DEFAULT_PG_VERSION: &str = "15";

const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/";

fn default_conf() -> String {
format!(
fn default_conf(num_pageservers: u16) -> String {
let mut template = format!(
r#"
# Default built-in configuration, defined in main.rs
control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}'

[broker]
listen_addr = '{DEFAULT_BROKER_ADDR}'

[[pageservers]]
id = {DEFAULT_PAGESERVER_ID}
listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}'
listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}'
pg_auth_type = '{trust_auth}'
http_auth_type = '{trust_auth}'

[[safekeepers]]
id = {DEFAULT_SAFEKEEPER_ID}
pg_port = {DEFAULT_SAFEKEEPER_PG_PORT}
http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT}

"#,
trust_auth = AuthType::Trust,
)
);

for i in 0..num_pageservers {
let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;

template += &format!(
r#"
[[pageservers]]
id = {pageserver_id}
listen_pg_addr = '127.0.0.1:{pg_port}'
listen_http_addr = '127.0.0.1:{http_port}'
pg_auth_type = '{trust_auth}'
http_auth_type = '{trust_auth}'
"#,
trust_auth = AuthType::Trust,
)
}

template
}

///
Expand Down Expand Up @@ -295,6 +308,9 @@ fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TimelineId
}

fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
let num_pageservers = init_match
.get_one::<u16>("num-pageservers")
.expect("num-pageservers arg has a default");
// Create config file
let toml_file: String = if let Some(config_path) = init_match.get_one::<PathBuf>("config") {
// load and parse the file
Expand All @@ -306,7 +322,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
})?
} else {
// Built-in default config
default_conf()
default_conf(*num_pageservers)
};

let pg_version = init_match
Expand All @@ -320,6 +336,9 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
env.init(pg_version, force)
.context("Failed to initialize neon repository")?;

// Create remote storage location for default LocalFs remote storage
std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;

// Initialize pageserver, create initial tenant and timeline.
for ps_conf in &env.pageservers {
PageServerNode::from_env(&env, ps_conf)
Expand Down Expand Up @@ -433,6 +452,15 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
.with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
println!("tenant {tenant_id} successfully configured on the pageserver");
}
Some(("migrate", matches)) => {
let tenant_id = get_tenant_id(matches, env)?;
let new_pageserver = get_pageserver(env, matches)?;
let new_pageserver_id = new_pageserver.conf.id;

migrate_tenant(env, tenant_id, new_pageserver)?;
println!("tenant {tenant_id} migrated to {}", new_pageserver_id);
}

Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
None => bail!("no tenant subcommand provided"),
}
Expand Down Expand Up @@ -867,20 +895,20 @@ fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Res
}
}

fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
NodeId(id_str.parse().context("while parsing pageserver id")?)
} else {
DEFAULT_PAGESERVER_ID
};
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
NodeId(id_str.parse().context("while parsing pageserver id")?)
} else {
DEFAULT_PAGESERVER_ID
};

Ok(PageServerNode::from_env(
env,
env.get_pageserver_conf(node_id)?,
))
}
Ok(PageServerNode::from_env(
env,
env.get_pageserver_conf(node_id)?,
))
}

fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
match sub_match.subcommand() {
Some(("start", subcommand_args)) => {
if let Err(e) = get_pageserver(env, subcommand_args)?
Expand Down Expand Up @@ -917,6 +945,20 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
}

Some(("migrate", subcommand_args)) => {
let pageserver = get_pageserver(env, subcommand_args)?;
//TODO what shutdown strategy should we use here?
if let Err(e) = pageserver.stop(false) {
eprintln!("pageserver stop failed: {}", e);
exit(1);
}

if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) {
eprintln!("pageserver start failed: {e}");
exit(1);
}
}

Some(("status", subcommand_args)) => {
match get_pageserver(env, subcommand_args)?.check_status() {
Ok(_) => println!("Page server is up and running"),
Expand Down Expand Up @@ -1224,13 +1266,21 @@ fn cli() -> Command {
.help("Force initialization even if the repository is not empty")
.required(false);

let num_pageservers_arg = Arg::new("num-pageservers")
.value_parser(value_parser!(u16))
.long("num-pageservers")
.help("How many pageservers to create (default 1)")
.required(false)
.default_value("1");

Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
.subcommand(
Command::new("init")
.about("Initialize a new Neon repository, preparing configs for services to start with")
.arg(pageserver_config_args.clone())
.arg(num_pageservers_arg.clone())
.arg(
Arg::new("config")
.long("config")
Expand Down Expand Up @@ -1301,6 +1351,10 @@ fn cli() -> Command {
.subcommand(Command::new("config")
.arg(tenant_id_arg.clone())
.arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false)))
.subcommand(Command::new("migrate")
.about("Migrate a tenant from one pageserver to another")
.arg(tenant_id_arg.clone())
.arg(pageserver_id_arg.clone()))
)
.subcommand(
Command::new("pageserver")
Expand Down
1 change: 1 addition & 0 deletions control_plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ pub mod local_env;
pub mod pageserver;
pub mod postgresql_conf;
pub mod safekeeper;
pub mod tenant_migration;
Loading