Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logger): rate limit based on peer address #1351

Merged
merged 21 commits into from Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a838d5a
feat(logger): rate limit based on peer address
oddgrd Oct 27, 2023
a8a9732
feat(proto): reduce log batch size
oddgrd Oct 27, 2023
0f2d550
feat(logger): refactor out axum layer and dependency
oddgrd Oct 28, 2023
2d42218
feat(deployer): handle rate limiting error for get_logs
oddgrd Nov 1, 2023
f179eae
refactor: increase batch & burst size
oddgrd Nov 1, 2023
34a67e0
feat(logger): send warning to logger when rate limited
oddgrd Nov 1, 2023
ce32001
feat(logger): increase refresh rate of rate limiter to 2 RPS
oddgrd Nov 1, 2023
5841342
fix(proto): clippy
oddgrd Nov 2, 2023
0668981
tests(logger): refactor rate limiter to 2 RPS
oddgrd Nov 2, 2023
36253fe
Merge branch 'main' into feature/eng-1609-rate-limits-the-amount-of-l…
oddgrd Nov 2, 2023
6311872
refactor(deployer): match more common x-ratelimit-limit header
oddgrd Nov 2, 2023
2404f5a
misc(logger): cleanups and comments
oddgrd Nov 2, 2023
f4bc9b6
Merge branch 'main' into feature/eng-1609-rate-limits-the-amount-of-l…
oddgrd Nov 3, 2023
684fbc2
Merge remote-tracking branch 'upstream/main' into feature/eng-1609-ra…
oddgrd Nov 10, 2023
58ec21d
chore: update lockfile
oddgrd Nov 10, 2023
c1df918
misc: rephrase errors, comments, constants
oddgrd Nov 14, 2023
eaa5e23
refactor: send deserializable error for log stream rate limit
oddgrd Nov 20, 2023
6968f8c
feat: add ratelimited apierror variant
oddgrd Nov 20, 2023
b3c640e
Merge branch 'main' into feature/eng-1609-rate-limits-the-amount-of-l…
oddgrd Nov 20, 2023
b750e6e
refactor(logger): use downcast_ref in if clause
oddgrd Nov 21, 2023
b5705a9
docs(logger): remove todo
oddgrd Nov 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
123 changes: 123 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 16 additions & 3 deletions cargo-shuttle/src/lib.rs
Expand Up @@ -698,9 +698,22 @@ impl Shuttle {

while let Some(Ok(msg)) = stream.next().await {
if let tokio_tungstenite::tungstenite::Message::Text(line) = msg {
let log_item: shuttle_common::LogItem = serde_json::from_str(&line)
.context("Failed parsing logs. Is your cargo-shuttle outdated?")?;
println!("{log_item}")
match serde_json::from_str::<shuttle_common::LogItem>(&line) {
Ok(log_item) => {
println!("{log_item}")
}
Err(err) => {
debug!(error = %err, "failed to parse message into log item");

let message = if let Ok(err) = serde_json::from_str::<ApiError>(&line) {
err.to_string()
} else {
"failed to parse logs, is your cargo-shuttle outdated?".to_string()
};

bail!(message);
}
}
}
}
} else {
Expand Down
7 changes: 7 additions & 0 deletions common/src/models/error.rs
Expand Up @@ -59,6 +59,7 @@ pub enum ErrorKind {
NotReady,
ServiceUnavailable,
DeleteProjectFailed,
RateLimited(String),
}

impl From<ErrorKind> for ApiError {
Expand Down Expand Up @@ -121,6 +122,12 @@ impl From<ErrorKind> for ApiError {
ErrorKind::Forbidden => (StatusCode::FORBIDDEN, "forbidden"),
ErrorKind::NotReady => (StatusCode::INTERNAL_SERVER_ERROR, "service not ready"),
ErrorKind::DeleteProjectFailed => (StatusCode::INTERNAL_SERVER_ERROR, "deleting project failed"),
ErrorKind::RateLimited(message) => {
return Self {
message,
status_code: StatusCode::TOO_MANY_REQUESTS.as_u16(),
}
},
};
Self {
message: error_message.to_string(),
Expand Down
3 changes: 3 additions & 0 deletions deployer/src/handlers/error.rs
Expand Up @@ -26,6 +26,8 @@ pub enum Error {
Internal(#[from] anyhow::Error),
#[error("Missing header: {0}")]
MissingHeader(String),
#[error("{0}. Retry the request in a few minutes")]
RateLimited(String),
}

impl Serialize for Error {
Expand All @@ -47,6 +49,7 @@ impl IntoResponse for Error {

let code = match self {
Error::NotFound(_) => StatusCode::NOT_FOUND,
Error::RateLimited(_) => StatusCode::TOO_MANY_REQUESTS,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};

Expand Down
44 changes: 35 additions & 9 deletions deployer/src/handlers/mod.rs
Expand Up @@ -33,7 +33,7 @@ use shuttle_common::{
claims::{Claim, Scope},
models::{
deployment::{DeploymentRequest, CREATE_SERVICE_BODY_LIMIT, GIT_STRINGS_MAX_LENGTH},
error::axum::CustomErrorPath,
error::{axum::CustomErrorPath, ApiError, ErrorKind},
project::ProjectName,
},
request_span, LogItem,
Expand Down Expand Up @@ -653,16 +653,27 @@ pub async fn get_logs(
logs_request.extensions_mut().insert(claim);

let mut client = deployment_manager.logs_fetcher().clone();
if let Ok(logs) = client.get_logs(logs_request).await {
Ok(Json(

match client.get_logs(logs_request).await {
Ok(logs) => Ok(Json(
logs.into_inner()
.log_items
.into_iter()
.map(|l| l.to_log_item_with_id(deployment_id))
.collect(),
))
} else {
Err(Error::NotFound("deployment not found".to_string()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the not found case no longer relevant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think it ever was. The https://github.com/shuttle-hq/shuttle/blob/main/logger/src/lib.rs#L89-L100 call can fail in the following ways:

  • can't extract the claim from extensions, this shouldn't happen since we check the claim in the deployer scopedlayers. This would return internal error
  • claim is missing the logs scope, shouldn't happen because we check it in deployer scopedlayer. This would return permission denied, I can add a match for this to the deployer handler, even though it is currently impossible, if we make changes in the deployer that may change.
  • the db query fails, this would return internal error.

Copy link
Contributor Author

@oddgrd oddgrd Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, after thinking about this some more, if the caller has access to this endpoint, but the logger client doesn't have access to get_logs, I think it makes sense to keep this as is, returning an internal error to the user from the deployer.

)),
Err(error) => {
if error.code() == tonic::Code::Unavailable
&& error.metadata().get("x-ratelimit-limit").is_some()
{
Err(Error::RateLimited(
"your application is producing too many logs. Interactions with the shuttle logger service will be rate limited"
.to_string(),
))
} else {
Err(anyhow!("failed to retrieve logs for deployment").into())
}
}
}
}

Expand Down Expand Up @@ -712,9 +723,24 @@ async fn logs_websocket_handler(
"failed to get backlog of logs"
);

let _ = s
.send(ws::Message::Text("failed to get logs".to_string()))
.await;
if error.code() == tonic::Code::Unavailable
&& error.metadata().get("x-ratelimit-limit").is_some()
{
let message = serde_json::to_string(
&ApiError::from(
ErrorKind::RateLimited(
"your application is producing too many logs. Interactions with the shuttle logger service will be rate limited"
.to_string()
)
))
.expect("to convert error to json");

let _ = s.send(ws::Message::Text(message)).await;
} else {
let _ = s
.send(ws::Message::Text("failed to get logs".to_string()))
.await;
}
let _ = s.close().await;
return;
}
Expand Down
5 changes: 5 additions & 0 deletions logger/Cargo.toml
Expand Up @@ -12,6 +12,7 @@ shuttle-proto = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
http = { workspace = true }
prost-types = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true, features = [
Expand All @@ -24,6 +25,8 @@ thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tower = { workspace = true }
tower_governor = { version= "0.1.0", features = ["tracing"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["default"] }

Expand All @@ -35,3 +38,5 @@ serde_json = { workspace = true }
shuttle-common-tests = { workspace = true }
uuid = { workspace = true }
ctor = { workspace = true }
futures = { workspace = true }
tower = { workspace = true, features = ["util"] }
1 change: 1 addition & 0 deletions logger/src/lib.rs
Expand Up @@ -15,6 +15,7 @@ use tracing::{debug, error, field, Span};

pub mod args;
mod dal;
pub mod rate_limiting;

pub use dal::Postgres;

Expand Down