Skip to content

Commit

Permalink
Add retries to cloud_admin client.
Browse files Browse the repository at this point in the history
  • Loading branch information
arssher committed Apr 30, 2024
1 parent 9f792f9 commit 4ac4b21
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 84 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions s3_scrubber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ postgres-native-tls.workspace = true
postgres_ffi.workspace = true
tokio-stream.workspace = true
tokio-postgres.workspace = true
tokio-util = { workspace = true }
futures-util.workspace = true
itertools.workspace = true
camino.workspace = true
Expand Down
191 changes: 107 additions & 84 deletions s3_scrubber/src/cloud_admin_api.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::time::Duration;

use chrono::{DateTime, Utc};
use futures::Future;
use hex::FromHex;

use reqwest::{header, Client, StatusCode, Url};
use serde::Deserialize;
use tokio::sync::Semaphore;

use tokio_util::sync::CancellationToken;
use utils::backoff;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;

Expand Down Expand Up @@ -210,30 +212,39 @@ impl CloudAdminApiClient {
.await
.expect("Semaphore is not closed");

let response = self
.http_client
.get(self.append_url("/projects"))
.query(&[
("tenant_id", tenant_id.to_string()),
("show_deleted", "true".to_string()),
])
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| {
Error::new(
"Find project for tenant".to_string(),
ErrorKind::RequestSend(e),
)
})?;
let response = CloudAdminApiClient::with_retries(
|| async {
let response = self
.http_client
.get(self.append_url("/projects"))
.query(&[
("tenant_id", tenant_id.to_string()),
("show_deleted", "true".to_string()),
])
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| {
Error::new(
"Find project for tenant".to_string(),
ErrorKind::RequestSend(e),
)
})?;

let response: AdminApiResponse<Vec<ProjectData>> =
response.json().await.map_err(|e| {
Error::new(
"Find project for tenant".to_string(),
ErrorKind::BodyRead(e),
)
})?;
Ok(response)
},
"find_tenant_project",
)
.await?;

let response: AdminApiResponse<Vec<ProjectData>> = response.json().await.map_err(|e| {
Error::new(
"Find project for tenant".to_string(),
ErrorKind::BodyRead(e),
)
})?;
match response.data.len() {
0 => Ok(None),
1 => Ok(Some(
Expand Down Expand Up @@ -261,42 +272,34 @@ impl CloudAdminApiClient {
const PAGINATION_LIMIT: usize = 512;
let mut result: Vec<ProjectData> = Vec::with_capacity(PAGINATION_LIMIT);
loop {
let response = self
.http_client
.get(self.append_url("/projects"))
.query(&[
("show_deleted", "false".to_string()),
("limit", format!("{PAGINATION_LIMIT}")),
("offset", format!("{pagination_offset}")),
])
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| {
Error::new(
"List active projects".to_string(),
ErrorKind::RequestSend(e),
)
})?;

match response.status() {
StatusCode::OK => {}
StatusCode::SERVICE_UNAVAILABLE | StatusCode::TOO_MANY_REQUESTS => {
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
_status => {
return Err(Error::new(
"List active projects".to_string(),
ErrorKind::ResponseStatus(response.status()),
))
}
}

let response_bytes = response.bytes().await.map_err(|e| {
Error::new("List active projects".to_string(), ErrorKind::BodyRead(e))
})?;
let response_bytes = CloudAdminApiClient::with_retries(
|| async {
let response = self
.http_client
.get(self.append_url("/projects"))
.query(&[
("show_deleted", "false".to_string()),
("limit", format!("{PAGINATION_LIMIT}")),
("offset", format!("{pagination_offset}")),
])
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| {
Error::new(
"List active projects".to_string(),
ErrorKind::RequestSend(e),
)
})?;

response.bytes().await.map_err(|e| {
Error::new("List active projects".to_string(), ErrorKind::BodyRead(e))
})
},
"list_projects",
)
.await?;

let decode_result =
serde_json::from_slice::<AdminApiResponse<Vec<ProjectData>>>(&response_bytes);
Expand Down Expand Up @@ -336,30 +339,39 @@ impl CloudAdminApiClient {
.await
.expect("Semaphore is not closed");

let response = self
.http_client
.get(self.append_url("/branches"))
.query(&[
("timeline_id", timeline_id.to_string()),
("show_deleted", "true".to_string()),
])
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| {
Error::new(
"Find branch for timeline".to_string(),
ErrorKind::RequestSend(e),
)
})?;
let response = CloudAdminApiClient::with_retries(
|| async {
let response = self
.http_client
.get(self.append_url("/branches"))
.query(&[
("timeline_id", timeline_id.to_string()),
("show_deleted", "true".to_string()),
])
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| {
Error::new(
"Find branch for timeline".to_string(),
ErrorKind::RequestSend(e),
)
})?;

let response: AdminApiResponse<Vec<BranchData>> =
response.json().await.map_err(|e| {
Error::new(
"Find branch for timeline".to_string(),
ErrorKind::BodyRead(e),
)
})?;
Ok(response)
},
"find_timeline_branch",
)
.await?;

let response: AdminApiResponse<Vec<BranchData>> = response.json().await.map_err(|e| {
Error::new(
"Find branch for timeline".to_string(),
ErrorKind::BodyRead(e),
)
})?;
let mut branches: Vec<BranchData> = response.data.into_iter().collect();
// Normally timeline_id is unique. However, we do have at least one case
// of the same timeline_id in two different projects, apparently after
Expand Down Expand Up @@ -542,4 +554,15 @@ impl CloudAdminApiClient {
.parse()
.unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}"))
}

async fn with_retries<T, O, F>(op: O, description: &str) -> Result<T, Error>
where
O: FnMut() -> F,
F: Future<Output = Result<T, Error>>,
{
let cancel = CancellationToken::new(); // not really used
backoff::retry(op, |_| false, 1, 20, description, &cancel)
.await
.expect("cancellations are disabled")
}
}

0 comments on commit 4ac4b21

Please sign in to comment.