Skip to content

Commit

Permalink
chore(cluster): cache datacenter-get and datacenter-location-get (#908)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
NathanFlurry committed Jun 16, 2024
1 parent c36d150 commit 8863a8b
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 17 deletions.
38 changes: 32 additions & 6 deletions svc/pkg/cluster/ops/datacenter-get/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,32 @@ pub async fn handle(
.map(common::Uuid::as_uuid)
.collect::<Vec<_>>();

let datacenters = ctx
.cache()
.fetch_all_proto("cluster.datacenters", datacenter_ids, {
let ctx = ctx.base();
move |mut cache, datacenter_ids| {
let ctx = ctx.clone();
async move {
let dcs = get_dcs(ctx, datacenter_ids).await?;
for dc in dcs {
let dc_id = unwrap!(dc.datacenter_id).as_uuid();
cache.resolve(&dc_id, dc);
}

Ok(cache)
}
}
})
.await?;

Ok(cluster::datacenter_get::Response { datacenters })
}

async fn get_dcs(
ctx: OperationContext<()>,
datacenter_ids: Vec<Uuid>,
) -> GlobalResult<Vec<backend::cluster::Datacenter>> {
let configs = sql_fetch_all!(
[ctx, Datacenter]
"
Expand All @@ -69,10 +95,10 @@ pub async fn handle(
)
.await?;

Ok(cluster::datacenter_get::Response {
datacenters: configs
.into_iter()
.map(TryInto::try_into)
.collect::<GlobalResult<Vec<_>>>()?,
})
let datacenters = configs
.into_iter()
.map(TryInto::try_into)
.collect::<GlobalResult<Vec<_>>>()?;

Ok(datacenters)
}
46 changes: 35 additions & 11 deletions svc/pkg/cluster/ops/datacenter-location-get/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,32 @@ pub async fn handle(
.map(common::Uuid::as_uuid)
.collect::<Vec<_>>();

let datacenters = ctx
.cache()
.fetch_all_proto("cluster.datacenters.location", datacenter_ids, {
let ctx = ctx.base();
move |mut cache, datacenter_ids| {
let ctx = ctx.clone();
async move {
let dcs = query_dcs(ctx, datacenter_ids).await?;
for dc in dcs {
let dc_id = unwrap!(dc.datacenter_id).as_uuid();
cache.resolve(&dc_id, dc);
}

Ok(cache)
}
}
})
.await?;

Ok(cluster::datacenter_location_get::Response { datacenters })
}

async fn query_dcs(
ctx: OperationContext<()>,
datacenter_ids: Vec<Uuid>,
) -> GlobalResult<Vec<cluster::datacenter_location_get::response::Datacenter>> {
// NOTE: if there is no active GG node in a datacenter, we cannot retrieve its location
// Fetch the gg node public ip for each datacenter (there may be more than one, hence `DISTINCT`)
let server_rows = sql_fetch_all!(
Expand Down Expand Up @@ -62,15 +88,13 @@ pub async fn handle(
.try_collect::<Vec<_>>()
.await?;

Ok(cluster::datacenter_location_get::Response {
datacenters: coords_res
.into_iter()
.map(
|(datacenter_id, coords)| cluster::datacenter_location_get::response::Datacenter {
datacenter_id: Some(datacenter_id.into()),
coords,
},
)
.collect::<Vec<_>>(),
})
Ok(coords_res
.into_iter()
.map(
|(datacenter_id, coords)| cluster::datacenter_location_get::response::Datacenter {
datacenter_id: Some(datacenter_id.into()),
coords,
},
)
.collect::<Vec<_>>())
}
2 changes: 2 additions & 0 deletions svc/pkg/cluster/worker/src/workers/datacenter_scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ async fn scale_servers(
.filter(|server| matches!(server.drain_state, DrainState::None));
let active_count = active_servers_in_pool.clone().count();

tracing::info!(desired=%pctx.desired_count, active=%active_count, "comparing {:?}", pctx.pool_type);

match pctx.desired_count.cmp(&active_count) {
Ordering::Less => match pctx.pool_type {
backend::cluster::PoolType::Job => {
Expand Down
5 changes: 5 additions & 0 deletions svc/pkg/cluster/worker/src/workers/datacenter_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ async fn worker(
)
.await?;

// Purge cache
ctx.cache()
.purge("cluster.datacenters", [datacenter_id])
.await?;

msg!([ctx] cluster::msg::datacenter_scale(datacenter_id) {
datacenter_id: ctx.datacenter_id,
})
Expand Down

0 comments on commit 8863a8b

Please sign in to comment.