Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio-epoll-uring: use it for on-demand downloads #6992

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
a3c88a8
tokio-epoll-uring integration: support more operations
problame Jan 17, 2024
c2d8251
WIP: statx
problame Jan 19, 2024
a0032f4
impl
problame Feb 5, 2024
0548374
[DO NOT MERGE} CI: test with both io engines
problame Feb 5, 2024
e824c84
clippy
problame Feb 5, 2024
29b9628
fix clippy in tokio-epoll-uring
problame Feb 5, 2024
c2e9cf4
for real
problame Feb 5, 2024
4fa0181
fix macos build
problame Feb 6, 2024
aaccb7a
Merge branch 'main' into problame/integrate-tokio-epoll-uring/more-ops
problame Feb 7, 2024
9a4880e
WIP
problame Feb 7, 2024
b5a00b0
refactor(disk_btree): make BlockWriter::write_blk infallible
problame Feb 7, 2024
7ba1949
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/btree-b…
problame Feb 7, 2024
91d3e25
finish
problame Feb 7, 2024
6f65648
Revert "refactor(disk_btree): make BlockWriter::write_blk infallible"
problame Feb 7, 2024
85d5fc6
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 7, 2024
edabce6
use right branch
problame Feb 7, 2024
e5d15df
WIP
problame Feb 7, 2024
2a39457
WIP
problame Feb 7, 2024
4659794
WIP
problame Feb 7, 2024
14bdd84
it turns out one wants to take BoundedBuf, not Slice<T>
problame Feb 7, 2024
4fe3b49
make tests pass
problame Feb 7, 2024
a6605a1
pull bunch of changes down
problame Feb 7, 2024
c92e8a7
don't pull that in
problame Feb 7, 2024
b0144e2
update lib
problame Feb 7, 2024
54561a8
fixup
problame Feb 7, 2024
6c47083
we can't use impl IoBuf for Array
problame Feb 7, 2024
238296a
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 7, 2024
207764b
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/refacto…
problame Feb 7, 2024
e5da261
work around BoundedBuf.slice(0..x) panicking for x == 0
problame Feb 7, 2024
26e51c7
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 7, 2024
6f4d182
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/refacto…
problame Feb 7, 2024
33f3053
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Feb 7, 2024
33261e4
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/refacto…
problame Feb 7, 2024
720f633
refactor(virtual_file) make write_all_at take owned buffers
problame Feb 7, 2024
368f2ac
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Feb 8, 2024
3d8f5d1
fix copy-pasta
problame Feb 8, 2024
183c86c
clean up Cargo.toml
problame Feb 8, 2024
a6d022a
WIP
problame Feb 12, 2024
dff1b9b
implement write_at IoEngine function & wire it up; only discovered no…
problame Feb 12, 2024
64799e6
Merge remote-tracking branch 'origin/problame/integrate-tokio-epoll-u…
problame Feb 12, 2024
8998178
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 12, 2024
98fe109
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/virtual…
problame Feb 12, 2024
bfdd86c
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/virtual…
problame Feb 12, 2024
87f05ca
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 13, 2024
ebffde1
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 14, 2024
ab51748
formatting
problame Feb 14, 2024
6a55ac7
complete comment; https://github.com/neondatabase/neon/pull/6673#disc…
problame Feb 14, 2024
0316adc
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Mar 1, 2024
0c50ee9
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/virtual…
problame Mar 1, 2024
2c30090
weird formatting fluke during merge
problame Mar 1, 2024
32358cf
layer file creation: remove redundant fsync()s
problame Mar 1, 2024
4d8b9e3
Merge branch 'problame/integrate-tokio-epoll-uring/layer-write-path-f…
problame Mar 1, 2024
ebf2de5
layer file creation: fsync timeline directories using VirtualFile::sy…
problame Mar 1, 2024
ca2ed0b
layer file creation: fatal_err the timeline dir fsync
problame Mar 1, 2024
ce251ac
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Mar 1, 2024
c4f7a19
Merge branch 'problame/integrate-tokio-epoll-uring/layer-write-path-f…
problame Mar 1, 2024
1fe80d7
rebase on fatal_err changes
problame Mar 1, 2024
5528b16
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Mar 1, 2024
c972d17
Merge branch 'problame/integrate-tokio-epoll-uring/layer-write-path-f…
problame Mar 1, 2024
7299a0a
Merge branch 'problame/integrate-tokio-epoll-uring/create-layer-fatal…
problame Mar 1, 2024
348649d
Merge branch 'problame/integrate-tokio-epoll-uring/ioengine-par-fsync…
problame Mar 1, 2024
0b268fc
avoid `spawn_blocking(Handle::block_on())` for create_delta_layer
problame Mar 1, 2024
a432299
WIP: tokio-epoll-uring for on-demand downloads
problame Mar 1, 2024
bd0ac4a
WIP
problame Mar 1, 2024
a934333
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Mar 1, 2024
68a4668
layer file download: final rename: fix durability
problame Mar 1, 2024
0a717da
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Mar 1, 2024
ca616ee
continue to do spawn_blocking if std-fs is configured
problame Mar 1, 2024
732d395
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/ondeman…
problame Mar 1, 2024
8219286
WIP: pagebench for ondemand download churn
problame Mar 1, 2024
92c30db
fixups
problame Mar 1, 2024
f6b34a4
Merge branch 'problame/integrate-tokio-epoll-uring/more-ops' into pro…
problame Mar 4, 2024
3f250e0
WIP
problame Mar 4, 2024
3959ac8
more efficient copy loop
problame Mar 4, 2024
655551f
live stats for churn bench
problame Mar 4, 2024
30058b8
bench: concurrent churning within one timeline
problame Mar 4, 2024
d08595e
bench: live stats timeline restarts
problame Mar 4, 2024
320e5fd
ps_ec2_setup_instance_store: fix output
problame Mar 4, 2024
6862bc2
feat: env-configurable buffer size
problame Mar 4, 2024
9dd9bde
(not effective): avoid self-synchronization due to blocked send
problame Mar 4, 2024
a8a23b7
(not effective) cycle through layers 1 by 1 instead of random selection
problame Mar 4, 2024
98ec0dc
implement the copy loop like tokio::io::copy_buf
problame Mar 4, 2024
c5fb9ee
Revert "(not effective) cycle through layers 1 by 1 instead of random…
problame Mar 4, 2024
7a8f39f
Revert "feat: env-configurable buffer size"
problame Mar 4, 2024
d7878a3
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Mar 5, 2024
de3acfc
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Mar 12, 2024
80e01da
extract the copy loop into its own function
problame Mar 12, 2024
3fb45f9
undo the splitting of the files (required some rearrangements)
problame Mar 12, 2024
33e376e
refactor the copy loop (haven't tested it)
problame Mar 12, 2024
32d0138
be generic over the writer
problame Mar 12, 2024
0b5ecfe
be generic about buffer size and test it
problame Mar 12, 2024
2775618
reorg modules even more
problame Mar 12, 2024
169dc77
run CI
problame Mar 13, 2024
5400905
move logic for downloading objects to its own function
problame Mar 13, 2024
7c0e742
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Mar 15, 2024
d4d33d8
undo build matrix
problame Mar 15, 2024
278e3c0
remove stray `///`
problame Mar 15, 2024
4b39956
fix macos build
problame Mar 15, 2024
433d399
Merge remote-tracking branch 'origin/problame/integrate-tokio-epoll-u…
problame Mar 15, 2024
43d947b
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Mar 15, 2024
cc63f3d
fixup(#7120): the macOS code used an outdated constant name, broke th…
problame Mar 15, 2024
6d9eb93
fixup(#7120): the macOS code used an outdated constant name, broke th…
problame Mar 15, 2024
3e09232
clippy
problame Mar 15, 2024
c56f654
Merge branch 'problame/fixup-7120' into problame/integrate-tokio-epol…
problame Mar 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 33 additions & 6 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod utilization;
pub use utilization::PageserverUtilization;

use std::{
borrow::Cow,
collections::HashMap,
io::{BufRead, Read},
num::{NonZeroU64, NonZeroUsize},
Expand Down Expand Up @@ -577,7 +578,7 @@ pub struct TimelineInfo {
pub walreceiver_status: String,
}

#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayerMapInfo {
pub in_memory_layers: Vec<InMemoryLayerInfo>,
pub historic_layers: Vec<HistoricLayerInfo>,
Expand All @@ -595,7 +596,7 @@ pub enum LayerAccessKind {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayerAccessStatFullDetails {
pub when_millis_since_epoch: u64,
pub task_kind: &'static str,
pub task_kind: Cow<'static, str>,
pub access_kind: LayerAccessKind,
}

Expand Down Expand Up @@ -654,23 +655,23 @@ impl LayerResidenceEvent {
}
}

#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayerAccessStats {
pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
pub task_kind_access_flag: Vec<&'static str>,
pub task_kind_access_flag: Vec<Cow<'static, str>>,
pub first: Option<LayerAccessStatFullDetails>,
pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
}

#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum InMemoryLayerInfo {
Open { lsn_start: Lsn },
Frozen { lsn_start: Lsn, lsn_end: Lsn },
}

#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum HistoricLayerInfo {
Delta {
Expand All @@ -692,6 +693,32 @@ pub enum HistoricLayerInfo {
},
}

impl HistoricLayerInfo {
pub fn layer_file_name(&self) -> &str {
match self {
HistoricLayerInfo::Delta {
layer_file_name, ..
} => layer_file_name,
HistoricLayerInfo::Image {
layer_file_name, ..
} => layer_file_name,
}
}
pub fn is_remote(&self) -> bool {
match self {
HistoricLayerInfo::Delta { remote, .. } => *remote,
HistoricLayerInfo::Image { remote, .. } => *remote,
}
}
pub fn set_remote(&mut self, value: bool) {
let field = match self {
HistoricLayerInfo::Delta { remote, .. } => remote,
HistoricLayerInfo::Image { remote, .. } => remote,
};
*field = value;
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct DownloadRemoteLayersTaskSpawnRequest {
pub max_concurrent_downloads: NonZeroUsize,
Expand Down
39 changes: 37 additions & 2 deletions libs/utils/src/history_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ impl<T, const L: usize> ops::Deref for HistoryBufferWithDropCounter<T, L> {
}
}

#[derive(serde::Serialize)]
#[derive(serde::Serialize, serde::Deserialize)]
struct SerdeRepr<T> {
buffer: Vec<T>,
buffer_size: usize,
drop_count: u64,
}

Expand All @@ -61,6 +62,7 @@ where
let HistoryBufferWithDropCounter { buffer, drop_count } = value;
SerdeRepr {
buffer: buffer.iter().cloned().collect(),
buffer_size: L,
drop_count: *drop_count,
}
}
Expand All @@ -78,19 +80,52 @@ where
}
}

impl<'de, T, const L: usize> serde::de::Deserialize<'de> for HistoryBufferWithDropCounter<T, L>
where
T: Clone + serde::Deserialize<'de>,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let SerdeRepr {
buffer: des_buffer,
drop_count,
buffer_size,
} = SerdeRepr::<T>::deserialize(deserializer)?;
if buffer_size != L {
use serde::de::Error;
return Err(D::Error::custom(format!(
"invalid buffer_size, expecting {L} got {buffer_size}"
)));
}
let mut buffer = HistoryBuffer::new();
buffer.extend(des_buffer);
Ok(HistoryBufferWithDropCounter { buffer, drop_count })
}
}

#[cfg(test)]
mod test {
use super::HistoryBufferWithDropCounter;

#[test]
fn test_basics() {
let mut b = HistoryBufferWithDropCounter::<_, 2>::default();
let mut b = HistoryBufferWithDropCounter::<usize, 2>::default();
b.write(1);
b.write(2);
b.write(3);
assert!(b.iter().any(|e| *e == 2));
assert!(b.iter().any(|e| *e == 3));
assert!(!b.iter().any(|e| *e == 1));

// round-trip serde
let round_tripped: HistoryBufferWithDropCounter<usize, 2> =
serde_json::from_str(&serde_json::to_string(&b).unwrap()).unwrap();
assert_eq!(
round_tripped.iter().cloned().collect::<Vec<_>>(),
b.iter().cloned().collect::<Vec<_>>()
);
}

#[test]
Expand Down
77 changes: 75 additions & 2 deletions pageserver/client/src/mgmt_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl Client {
self.request(Method::GET, uri, ()).await
}

async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
Expand All @@ -181,7 +181,16 @@ impl Client {
} else {
req
};
let res = req.json(&body).send().await.map_err(Error::ReceiveBody)?;
req.json(&body).send().await.map_err(Error::ReceiveBody)
}

async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
let res = self.request_noerror(method, uri, body).await?;
let response = res.error_from_body().await?;
Ok(response)
}
Expand Down Expand Up @@ -425,4 +434,68 @@ impl Client {
.await
.map_err(Error::ReceiveBody)
}

pub async fn layer_map_info(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<LayerMapInfo> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/layer",
self.mgmt_api_endpoint, tenant_shard_id, timeline_id,
);
self.get(&uri)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}

pub async fn layer_evict(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
layer_file_name: &str,
) -> Result<bool> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/layer/{}",
self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
);
let resp = self.request_noerror(Method::DELETE, &uri, ()).await?;
match resp.status() {
StatusCode::OK => Ok(true),
StatusCode::NOT_MODIFIED => Ok(false),
// TODO: dedupe this pattern / introduce separate error variant?
status => Err(match resp.json::<HttpErrorBody>().await {
Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
Err(_) => {
Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
}
}),
}
}

pub async fn layer_ondemand_download(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
layer_file_name: &str,
) -> Result<bool> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/layer/{}",
self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name
);
let resp = self.request_noerror(Method::GET, &uri, ()).await?;
match resp.status() {
StatusCode::OK => Ok(true),
StatusCode::NOT_MODIFIED => Ok(false),
// TODO: dedupe this pattern / introduce separate error variant?
status => Err(match resp.json::<HttpErrorBody>().await {
Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
Err(_) => {
Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
}
}),
}
}
}