Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 58 additions & 46 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ path = "engine/packages/test-deps-docker"
[workspace.dependencies.rivet-tracing-reconfigure]
path = "engine/packages/tracing-reconfigure"

[workspace.dependencies.rivet-tracing-utils]
path = "engine/packages/tracing-utils"

[workspace.dependencies.rivet-types]
path = "engine/packages/types"

Expand Down
17 changes: 14 additions & 3 deletions engine/packages/guard-core/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
use rivet_error::*;
use rivet_util::Id;
use serde::{Deserialize, Serialize};

#[derive(RivetError)]
#[error("guard", "rate_limit", "Too many requests. Try again later.")]
pub struct RateLimit;
#[derive(RivetError, Serialize, Deserialize)]
#[error(
"guard",
"rate_limit",
"Too many requests. Try again later.",
"Too many requests to '{method} {path}' (actor_id: {actor_id:?}) from IP {ip}."
)]
pub struct RateLimit {
pub actor_id: Option<Id>,
pub method: String,
pub path: String,
pub ip: String,
}

#[derive(RivetError, Serialize, Deserialize)]
#[error(
Expand Down
29 changes: 25 additions & 4 deletions engine/packages/guard-core/src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,16 +717,31 @@ impl ProxyService {
None
};

// Extract IP address from remote_addr
let client_ip = self.remote_addr.ip();
// Extract IP address from X-Forwarded-For header or fall back to remote_addr
let client_ip = req
.headers()
.get(X_FORWARDED_FOR)
.and_then(|h| h.to_str().ok())
.and_then(|forwarded| {
// X-Forwarded-For can be a comma-separated list, take the first IP
forwarded.split(',').next().map(|s| s.trim())
})
.and_then(|ip_str| ip_str.parse::<std::net::IpAddr>().ok())
.unwrap_or_else(|| self.remote_addr.ip());

// Apply rate limiting
if !self
.state
.check_rate_limit(client_ip, &actor_id, req.headers())
.await?
{
return Err(errors::RateLimit.build());
return Err(errors::RateLimit {
actor_id,
method: req.method().to_string(),
path: path.clone(),
ip: client_ip.to_string(),
}
.build());
}

// Check in-flight limit
Expand All @@ -735,7 +750,13 @@ impl ProxyService {
.acquire_in_flight(client_ip, &actor_id, req.headers())
.await?
{
return Err(errors::RateLimit.build());
return Err(errors::RateLimit {
actor_id,
method: req.method().to_string(),
path: path.clone(),
ip: client_ip.to_string(),
}
.build());
}

// Increment metrics
Expand Down
12 changes: 12 additions & 0 deletions engine/packages/tracing-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "rivet-tracing-utils"
version.workspace = true
authors.workspace = true
license.workspace = true
edition.workspace = true

[dependencies]
futures-util.workspace = true
lazy_static.workspace = true
rivet-metrics.workspace = true
tracing.workspace = true
91 changes: 91 additions & 0 deletions engine/packages/tracing-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Instant,
};

use futures_util::future;
use tracing::{Instrument, instrument::Instrumented};

use rivet_metrics::KeyValue;

/// Attempts to create a new future to select over a list of futures.
/// Non-panicking version of [futures_util::future::select_all](https://docs.rs/futures/0.3.15/futures/future/fn.select_all.html).
///
/// If `iter` is empty, a `Pending` future is returned.
pub async fn select_all_or_wait<I>(iter: I) -> <I::Item as Future>::Output
where
I: IntoIterator,
I::Item: Future + Unpin,
{
let futs = iter.into_iter().collect::<Vec<I::Item>>();

if !futs.is_empty() {
future::select_all(futs).await.0
} else {
std::future::pending().await
}
}

pub trait CustomInstrumentExt: Sized {
fn custom_instrument(self, span: tracing::Span) -> CustomInstrumented<Self> {
CustomInstrumented {
inner: self.instrument(span),
start: Instant::now(),
}
}
}

impl<F: Sized> CustomInstrumentExt for F {}

pub struct CustomInstrumented<T> {
inner: Instrumented<T>,
start: Instant,
}

impl<T: Future> Future for CustomInstrumented<T> {
type Output = T::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
let inner = unsafe { Pin::new_unchecked(&mut this.inner) };

let metadata = inner.span().metadata().clone();

match inner.poll(cx) {
Poll::Ready(val) => {
if let Some(metadata) = metadata {
if let (Some(file), Some(line)) = (metadata.file(), metadata.line()) {
metrics::INSTRUMENTED_FUTURE_DURATION.record(
this.start.elapsed().as_secs_f64(),
&[
KeyValue::new("location", format!("{file}:{line}")),
KeyValue::new("name", metadata.name()),
],
);
}
}
Poll::Ready(val)
}
Poll::Pending => Poll::Pending,
}
}
}

mod metrics {
use rivet_metrics::{
MICRO_BUCKETS,
otel::{global::*, metrics::*},
};

lazy_static::lazy_static! {
static ref METER: Meter = meter("rivet-util-core");

/// Expected attributes: "location", "name"
pub static ref INSTRUMENTED_FUTURE_DURATION: Histogram<f64> = METER.f64_histogram("rivet_instrumented_future_duration")
.with_description("Duration of a future.")
.with_boundaries(MICRO_BUCKETS.to_vec())
.build();
}
}
1 change: 1 addition & 0 deletions engine/packages/universaldb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ futures-util.workspace = true
lazy_static.workspace = true
rand.workspace = true
rivet-metrics.workspace = true
rivet-tracing-utils.workspace = true
rocksdb.workspace = true
serde.workspace = true
thiserror.workspace = true
Expand Down
6 changes: 5 additions & 1 deletion engine/packages/universaldb/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::future::Future;

use anyhow::{Context, Result, anyhow};
use futures_util::FutureExt;
use rivet_tracing_utils::CustomInstrumentExt;

use crate::{
driver::{DatabaseDriverHandle, Erased},
Expand All @@ -20,6 +21,7 @@ impl Database {
}

/// Run a closure with automatic retry logic
#[tracing::instrument(skip_all)]
pub async fn run<'a, F, Fut, T>(&'a self, closure: F) -> Result<T>
where
F: Fn(RetryableTransaction) -> Fut + Send + Sync,
Expand All @@ -29,7 +31,9 @@ impl Database {
let closure = &closure;
self.driver
.run(Box::new(|tx| {
async move { closure(tx).await.map(|value| Box::new(value) as Erased) }.boxed()
async move { closure(tx).await.map(|value| Box::new(value) as Erased) }
.custom_instrument(tracing::info_span!("run_attempt"))
.boxed()
}))
.await
.and_then(|res| {
Expand Down
1 change: 1 addition & 0 deletions engine/packages/util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ regex.workspace = true
reqwest.workspace = true
rivet-config.workspace = true
rivet-metrics.workspace = true
rivet-tracing-utils.workspace = true
rivet-util-id.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
92 changes: 1 addition & 91 deletions engine/packages/util/src/future.rs
Original file line number Diff line number Diff line change
@@ -1,91 +1 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Instant,
};

use futures_util::future;
use tracing::{Instrument, instrument::Instrumented};

use rivet_metrics::KeyValue;

/// Attempts to create a new future to select over a list of futures.
/// Non-panicking version of [futures_util::future::select_all](https://docs.rs/futures/0.3.15/futures/future/fn.select_all.html).
///
/// If `iter` is empty, a `Pending` future is returned.
pub async fn select_all_or_wait<I>(iter: I) -> <I::Item as Future>::Output
where
I: IntoIterator,
I::Item: Future + Unpin,
{
let futs = iter.into_iter().collect::<Vec<I::Item>>();

if !futs.is_empty() {
future::select_all(futs).await.0
} else {
std::future::pending().await
}
}

pub trait CustomInstrumentExt: Sized {
fn custom_instrument(self, span: tracing::Span) -> CustomInstrumented<Self> {
CustomInstrumented {
inner: self.instrument(span),
start: Instant::now(),
}
}
}

impl<F: Sized> CustomInstrumentExt for F {}

pub struct CustomInstrumented<T> {
inner: Instrumented<T>,
start: Instant,
}

impl<T: Future> Future for CustomInstrumented<T> {
type Output = T::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
let inner = unsafe { Pin::new_unchecked(&mut this.inner) };

let metadata = inner.span().metadata().clone();

match inner.poll(cx) {
Poll::Ready(val) => {
if let Some(metadata) = metadata {
if let (Some(file), Some(line)) = (metadata.file(), metadata.line()) {
metrics::INSTRUMENTED_FUTURE_DURATION.record(
this.start.elapsed().as_secs_f64(),
&[
KeyValue::new("location", format!("{file}:{line}")),
KeyValue::new("name", metadata.name()),
],
);
}
}
Poll::Ready(val)
}
Poll::Pending => Poll::Pending,
}
}
}

mod metrics {
use rivet_metrics::{
MICRO_BUCKETS,
otel::{global::*, metrics::*},
};

lazy_static::lazy_static! {
static ref METER: Meter = meter("rivet-util-core");

/// Expected attributes: "location", "name"
pub static ref INSTRUMENTED_FUTURE_DURATION: Histogram<f64> = METER.f64_histogram("rivet_instrumented_future_duration")
.with_description("Duration of a future.")
.with_boundaries(MICRO_BUCKETS.to_vec())
.build();
}
}
pub use rivet_tracing_utils::*;
Loading