Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/api-helper/build/src/macro_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ pub async fn __with_ctx<A: auth::ApiAuth + Send>(
ts,
ts,
(),
Vec::new(),
);

// Create auth
Expand Down
15 changes: 4 additions & 11 deletions lib/chirp/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,6 @@ impl SharedClient {
ts,
)
}

pub fn wrap_with(
self: Arc<Self>,
parent_req_id: Uuid,
ray_id: Uuid,
ts: i64,
trace: Vec<chirp::TraceEntry>,
perf_ctx: chirp_perf::PerfCtxInner,
) -> Client {
Client::new(self, parent_req_id, ray_id, trace, Arc::new(perf_ctx), ts)
}
}

/// Used to communicate with other Chirp clients.
Expand Down Expand Up @@ -238,6 +227,10 @@ impl Client {
pub fn ts(&self) -> i64 {
self.ts
}

pub fn trace(&self) -> &[chirp::TraceEntry] {
&self.trace
}
}

impl Drop for Client {
Expand Down
29 changes: 12 additions & 17 deletions lib/chirp/worker/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,22 +696,16 @@ where
let worker_req = {
// Build client
let ts = rivet_util::timestamp::now();
let client = self.shared_client.clone().wrap_with(
req_id,
ray_id,
ts,
{
let mut x = trace.clone();
x.push(chirp::TraceEntry {
context_name: worker_name.clone(),
req_id: req_id_proto.clone(),
ts: rivet_util::timestamp::now(),
run_context: chirp::RunContext::Service as i32,
});
x
},
chirp_perf::PerfCtxInner::new(self.redis_cache.clone(), ts, req_id, ray_id),
);
let client = self.shared_client.clone().wrap(req_id, ray_id, {
let mut x = trace.clone();
x.push(chirp::TraceEntry {
context_name: worker_name.clone(),
req_id: req_id_proto.clone(),
ts: rivet_util::timestamp::now(),
run_context: chirp::RunContext::Service as i32,
});
x
});
let conn = Connection::new(client, self.pools.clone(), self.cache.clone());

let ts = req_debug
Expand Down Expand Up @@ -743,7 +737,6 @@ where
ts,
req_ts,
req_body,
trace,
),
dont_log_body,
allow_recursive,
Expand Down Expand Up @@ -838,6 +831,8 @@ where
.op_ctx
.trace()
.iter()
// Don't include the last trace event, which is the current request
.take(req.op_ctx.trace().len() - 1)
.any(|x| x.context_name == req.op_ctx.name());
let handle_res = if is_recursive {
Ok(Err(err_code!(
Expand Down
1 change: 0 additions & 1 deletion lib/chirp/worker/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ impl TestCtx {
rivet_util::timestamp::now(),
rivet_util::timestamp::now(),
(),
Vec::new(),
);

Ok(TestCtx {
Expand Down
19 changes: 10 additions & 9 deletions lib/connection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,24 @@ impl Connection {
&self,
parent_req_id: Uuid,
ray_id: Uuid,
trace: Vec<chirp_client::TraceEntry>,
) -> GlobalResult<Connection> {
trace_entry: chirp_client::TraceEntry,
) -> Connection {
// Not the same as the operation ctx's ts because this cannot be overridden by debug start ts
let ts = rivet_util::timestamp::now();
let redis_cache = self.pools.redis("ephemeral")?;

Ok(Connection::new(
(*self.client).clone().wrap_with(
Connection::new(
(*self.client).clone().wrap(
parent_req_id,
ray_id,
ts,
trace,
chirp_perf::PerfCtxInner::new(redis_cache, ts, parent_req_id, ray_id),
{
let mut x = self.client.trace().to_vec();
x.push(trace_entry);
x
},
),
self.pools.clone(),
self.cache.clone(),
))
)
}

pub fn chirp(&self) -> &chirp_client::Client {
Expand Down
63 changes: 18 additions & 45 deletions lib/operation/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ where
ts: i64,
req_ts: i64,
body: B,
// Trace of all requests not including this request. The client does include
// this request in the trace, though.
trace: Vec<chirp_client::TraceEntry>,
}

impl<B> OperationContext<B>
Expand All @@ -58,7 +55,6 @@ where
ts: i64,
req_ts: i64,
body: B,
trace: Vec<chirp_client::TraceEntry>,
) -> Self {
OperationContext {
name,
Expand All @@ -69,7 +65,6 @@ where
ts,
req_ts,
body,
trace,
}
}

Expand All @@ -90,7 +85,7 @@ where

// TODO: Throw dedicated "timed out" error here
// Process the request
let req_op_ctx = self.wrap::<O>(body)?;
let req_op_ctx = self.wrap::<O>(body);
let timeout_fut = tokio::time::timeout(O::TIMEOUT, O::handle(req_op_ctx).in_current_span());
let res = tokio::task::Builder::new()
.name("operation::handle")
Expand Down Expand Up @@ -134,49 +129,28 @@ where
}

/// Adds trace and correctly wraps `Connection` (and subsequently `chirp_client::Client`).
fn wrap<O: Operation>(&self, body: O::Request) -> GlobalResult<OperationContext<O::Request>> {
let ray_id = Uuid::new_v4();
// Add self to new operation's trace
let trace = {
let mut x = self.trace.clone();
x.push(chirp_client::TraceEntry {
context_name: self.name.clone(),
req_id: Some(self.req_id.into()),
ts: rivet_util::timestamp::now(),
run_context: match rivet_util::env::run_context() {
rivet_util::env::RunContext::Service => chirp_client::RunContext::Service,
rivet_util::env::RunContext::Test => chirp_client::RunContext::Test,
} as i32,
});
x
fn wrap<O: Operation>(&self, body: O::Request) -> OperationContext<O::Request> {
let req_id = Uuid::new_v4();
let trace_entry = chirp_client::TraceEntry {
context_name: O::NAME.to_string(),
req_id: Some(req_id.into()),
ts: rivet_util::timestamp::now(),
run_context: match rivet_util::env::run_context() {
rivet_util::env::RunContext::Service => chirp_client::RunContext::Service,
rivet_util::env::RunContext::Test => chirp_client::RunContext::Test,
} as i32,
};

Ok(OperationContext {
OperationContext {
name: O::NAME.to_string(),
timeout: O::TIMEOUT,
conn: self.conn.wrap(self.req_id, ray_id, {
let mut x = trace.clone();

// Add new operation's trace to its connection (and chirp client)
x.push(chirp_client::TraceEntry {
context_name: O::NAME.to_string(),
req_id: Some(self.req_id.into()),
ts: rivet_util::timestamp::now(),
run_context: match rivet_util::env::run_context() {
rivet_util::env::RunContext::Service => chirp_client::RunContext::Service,
rivet_util::env::RunContext::Test => chirp_client::RunContext::Test,
} as i32,
});

x
})?,
req_id: self.req_id,
ray_id,
conn: self.conn.wrap(req_id, self.ray_id, trace_entry),
req_id,
ray_id: self.ray_id,
ts: util::timestamp::now(),
req_ts: self.req_ts,
body,
trace,
})
}
}

/// Clones everything but the body. This should always be used over `.clone()` unless you need to
Expand All @@ -191,7 +165,6 @@ where
ts: self.ts,
req_ts: self.req_ts,
body: (),
trace: self.trace.clone(),
}
}

Expand Down Expand Up @@ -231,11 +204,11 @@ where
}

pub fn trace(&self) -> &[chirp_client::TraceEntry] {
&self.trace
self.conn.trace()
}

pub fn test(&self) -> bool {
self.trace
self.trace()
.iter()
.any(|x| x.run_context == chirp_client::RunContext::Test as i32)
}
Expand Down
1 change: 0 additions & 1 deletion svc/api/admin/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ impl Ctx {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

Ctx { op_ctx }
Expand Down
1 change: 0 additions & 1 deletion svc/api/auth/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// let (user_id, user_token) = Self::issue_user_token(&op_ctx).await;
Expand Down
1 change: 0 additions & 1 deletion svc/api/cf-verification/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// let http_client = rivet_cf_verification::Config::builder()
Expand Down
1 change: 0 additions & 1 deletion svc/api/cloud/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// // Create temp team
Expand Down
1 change: 0 additions & 1 deletion svc/api/group/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ impl Ctx {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let (_user_id, user_token) = Self::issue_user_token(&op_ctx).await;
Expand Down
1 change: 0 additions & 1 deletion svc/api/identity/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// let (primary_region_id, _) = Self::setup_region(&op_ctx).await;
Expand Down
1 change: 0 additions & 1 deletion svc/api/job/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// let nomad_config = nomad_util::config_from_env().unwrap();
Expand Down
1 change: 0 additions & 1 deletion svc/api/kv/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// let (primary_region_id, _) = Self::setup_region(&op_ctx).await;
Expand Down
1 change: 0 additions & 1 deletion svc/api/matchmaker/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl Ctx {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let (primary_region_id, primary_region_name_id) = Self::setup_region(&op_ctx).await;
Expand Down
1 change: 0 additions & 1 deletion svc/api/portal/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ impl Ctx {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let (_user_id, user_token) = Self::issue_user_token(&op_ctx).await;
Expand Down
1 change: 0 additions & 1 deletion svc/api/status/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// let http_client = rivet_status::Config::builder()
Expand Down
1 change: 0 additions & 1 deletion svc/api/traefik-provider/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl Ctx {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

Ctx { op_ctx }
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/build/standalone/default-create/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ pub async fn run_from_env() -> GlobalResult<()> {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

for build in DEFAULT_BUILDS {
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/cluster/standalone/datacenter-tls-renew/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let updated_datacenter_ids = rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| {
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/cluster/standalone/default-update/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ pub async fn run_from_env(use_autoscaler: bool) -> GlobalResult<()> {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

// Read config from env
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/cluster/standalone/fix-tls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub async fn run_from_env(ts: i64) -> GlobalResult<()> {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let datacenter_ids = vec!["5767a802-5c7c-4563-a266-33c014f7e244"]
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/cluster/standalone/gc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let datacenter_ids = rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| {
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/cluster/standalone/metrics-publish/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub async fn run_from_env(_ts: i64, pools: rivet_pools::Pools) -> GlobalResult<(
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let servers = select_servers(&ctx).await?;
Expand Down
Loading