Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dummy lsn lease http and page service APIs #7815

Merged
merged 8 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ impl std::fmt::Debug for TenantState {
}
}

#[serde_as]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct LsnLease {
#[serde(rename = "valid_until_millis_since_epoch")]
#[serde_as(as = "serde_with::TimestampMilliSeconds")]
pub valid_until: SystemTime,
koivunej marked this conversation as resolved.
Show resolved Hide resolved
}
koivunej marked this conversation as resolved.
Show resolved Hide resolved

/// The only [`TenantState`] variants we could be `TenantState::Activating` from.
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ActivatingFrom {
Expand Down
8 changes: 8 additions & 0 deletions libs/pageserver_api/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,14 @@ impl ShardIdentity {
}
}

/// Obtains the shard number and count combined into a `ShardIndex`.
pub fn shard_index(&self) -> ShardIndex {
ShardIndex {
shard_count: self.count,
shard_number: self.number,
}
}

pub fn shard_slug(&self) -> String {
if self.count > ShardCount(0) {
format!("-{:02x}{:02x}", self.number.0, self.count.0)
Expand Down
40 changes: 40 additions & 0 deletions pageserver/src/http/openapi_spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,37 @@ paths:
schema:
$ref: "#/components/schemas/LsnByTimestampResponse"

/v1/tenant/{tenant_id}/timeline/{timeline_id}/lsn_lease:
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
put:
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
description: Obtain lease for the given LSN
parameters:
- name: lsn
in: query
required: true
schema:
type: string
format: hex
description: A LSN to obtain the lease for
responses:
"200":
description: OK
content:
application/json:
schema:
$ref: "#/components/schemas/LsnLease"

/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc:
parameters:
- name: tenant_id
Expand Down Expand Up @@ -980,6 +1011,15 @@ components:
type: string
enum: [past, present, future, nodata]

LsnLease:
type: object
required:
- valid_until_millis_since_epoch
properties:
valid_until_millis_since_epoch:
type: integer
format: int64

PageserverUtilization:
type: object
required:
Expand Down
30 changes: 30 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,32 @@ async fn handle_tenant_break(
json_response(StatusCode::OK, ())
}

// Obtains a lsn lease on the given timeline.
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
async fn lsn_lease_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;

let lsn: Lsn = parse_query_param(&request, "lsn")?
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;

let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);

let state = get_state(&request);

let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let result = timeline
.make_lsn_lease(lsn, &ctx)
.map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?;

json_response(StatusCode::OK, result)
}

// Run GC immediately on given timeline.
async fn timeline_gc_handler(
mut request: Request<Body>,
Expand Down Expand Up @@ -2701,6 +2727,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_timestamp_of_lsn",
|r| api_handler(r, get_timestamp_of_lsn_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/lsn_lease",
|r| api_handler(r, lsn_lease_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
|r| api_handler(r, timeline_gc_handler),
Expand Down
73 changes: 73 additions & 0 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use pageserver_api::models::{
};
use pageserver_api::shard::ShardIndex;
use pageserver_api::shard::ShardNumber;
use pageserver_api::shard::TenantShardId;
use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
use pq_proto::framed::ConnectionError;
use pq_proto::FeStartupPacket;
Expand All @@ -33,6 +34,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
Expand Down Expand Up @@ -905,6 +907,39 @@ impl PageServerHandler {
}
}

#[instrument(skip_all, fields(shard_id, %lsn))]
async fn handle_make_lsn_lease<IO>(
&self,
pgb: &mut PostgresBackend<IO>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
let shard_selector = ShardSelector::Known(tenant_shard_id.to_index());
let timeline = self
.get_active_tenant_timeline(tenant_shard_id.tenant_id, timeline_id, shard_selector)
.await?;
let lease = timeline.make_lsn_lease(lsn, ctx)?;
let valid_until = lease
.valid_until
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|e| QueryError::Other(e.into()))?;

pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
b"valid_until",
)]))?
.write_message_noflush(&BeMessage::DataRow(&[Some(
&valid_until.as_millis().to_be_bytes(),
)]))?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;

Ok(())
}

#[instrument(skip_all, fields(shard_id))]
async fn handle_get_rel_exists_request(
&mut self,
Expand Down Expand Up @@ -1802,6 +1837,44 @@ where
// important because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("lease lsn ") {
let (_, params_raw) = query_string.split_at("lease lsn ".len());
koivunej marked this conversation as resolved.
Show resolved Hide resolved
let params = params_raw.split_whitespace().collect::<Vec<_>>();
if params.len() != 3 {
return Err(QueryError::Other(anyhow::anyhow!(
"invalid param number {} for lease lsn command",
params.len()
)));
}

let tenant_shard_id = TenantShardId::from_str(params[0])
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
let timeline_id = TimelineId::from_str(params[1])
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;

tracing::Span::current()
.record("tenant_id", field::display(tenant_shard_id))
.record("timeline_id", field::display(timeline_id));

self.check_permission(Some(tenant_shard_id.tenant_id))?;

// The caller is responsible for providing correct lsn.
let lsn = Lsn::from_str(params[2])
.with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;

match self
.handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
.await
{
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
Err(e) => {
error!("error obtaining lsn lease for {lsn}: {e:?}");
pgb.write_message_noflush(&BeMessage::ErrorResponse(
&e.to_string(),
Some(e.pg_error_code()),
))?
}
};
} else if query_string.starts_with("show ") {
// show <tenant_id>
let (_, params_raw) = query_string.split_at("show ".len());
Expand Down
7 changes: 6 additions & 1 deletion pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::models::LocationConfigMode;
use pageserver_api::shard::{
ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
ShardCount, ShardIdentity, ShardIndex, ShardNumber, ShardStripeSize, TenantShardId
};
use pageserver_api::upcall_api::ReAttachResponseTenant;
use rand::{distributions::Alphanumeric, Rng};
Expand Down Expand Up @@ -127,6 +127,8 @@ pub(crate) enum ShardSelector {
First,
/// Pick the shard that holds this key
Page(Key),
/// The shard ID is known: pick the given shard
Known(ShardIndex),
}

/// A convenience for use with the re_attach ControlPlaneClient function: rather
Expand Down Expand Up @@ -2067,6 +2069,9 @@ impl TenantManager {
return ShardResolveResult::Found(tenant.clone());
}
}
ShardSelector::Known(shard) if tenant.shard_identity.shard_index() == shard => {
return ShardResolveResult::Found(tenant.clone());
}
_ => continue,
}
}
Expand Down
12 changes: 11 additions & 1 deletion pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use pageserver_api::{
models::{
AtomicAuxFilePolicy, AuxFilePolicy, CompactionAlgorithm, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
TimelineState,
LsnLease, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, ShardNumber, TenantShardId},
Expand Down Expand Up @@ -1505,6 +1505,16 @@ impl Timeline {
Ok(())
}

/// Obtains a temporary lease blocking garbage collection for the given LSN
pub(crate) fn make_lsn_lease(&self, _lsn: Lsn, _ctx: &RequestContext) -> anyhow::Result<LsnLease> {
koivunej marked this conversation as resolved.
Show resolved Hide resolved
const LEASE_LENGTH: Duration = Duration::from_secs(5 * 60);
let lease = LsnLease {
valid_until: SystemTime::now() + LEASE_LENGTH,
};
// TODO: dummy implementation
Ok(lease)
}

/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
Expand Down
Loading