Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/cache/build/src/getter_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ where
.collect()
}

/// All keys.
pub(super) fn keys(&self) -> &[GetterCtxKey<K, V>] {
&self.keys[..]
}

/// If all keys have an associated value.
pub(super) fn all_keys_have_value(&self) -> bool {
self.keys.iter().all(|x| x.value.is_some())
Expand Down
38 changes: 21 additions & 17 deletions lib/cache/build/src/req_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,18 @@ impl RequestConfig {
.with_label_values(&[&base_key])
.inc_by(keys.len() as u64);

let redis_keys = keys
// Build context.
//
// Drop `keys` bc this is not the same as the keys list in `ctx`, so it should not be used
// again.
let mut ctx = GetterCtx::new(base_key.clone().into(), keys.to_vec());
drop(keys);

// Build keys to look up values in Redis
let redis_keys = ctx
.keys()
.iter()
.map(|key| self.cache.build_redis_cache_key(&base_key, key))
.map(|key| self.cache.build_redis_cache_key(&base_key, &key.key))
.collect::<Vec<_>>();

// Build Redis command explicitly, since `conn.get` with one value will
Expand Down Expand Up @@ -183,7 +192,6 @@ impl RequestConfig {
);

// Create the getter ctx and resolve the cached values
let mut ctx = GetterCtx::new(base_key.clone().into(), keys.to_vec());
for (i, value) in cached_values.into_iter().enumerate() {
if let Some(value) = value {
let value = decoder(&value)?;
Expand Down Expand Up @@ -266,12 +274,8 @@ impl RequestConfig {

// Fall back to the getter since we can't fetch the value from
// the cache
let ctx = getter(
GetterCtx::new(base_key.into(), keys.to_vec()),
keys.to_vec(),
)
.await
.map_err(Error::Getter)?;
let keys = ctx.unresolved_keys();
let ctx = getter(ctx, keys).await.map_err(Error::Getter)?;

Ok(ctx.into_values())
}
Expand Down Expand Up @@ -656,11 +660,11 @@ impl RequestConfig {
}
}

#[tracing::instrument(skip(conn))]
async fn unwatch_gracefully(conn: &mut RedisPool) {
tracing::debug!("unwatching");
match redis::cmd("UNWATCH").query_async::<_, ()>(conn).await {
Ok(_) => tracing::debug!("unwatched successfully"),
Err(err) => tracing::error!(?err, "failed to unwatch from cache"),
}
}
// #[tracing::instrument(skip(conn))]
// async fn unwatch_gracefully(conn: &mut RedisPool) {
// tracing::debug!("unwatching");
// match redis::cmd("UNWATCH").query_async::<_, ()>(conn).await {
// Ok(_) => tracing::debug!("unwatched successfully"),
// Err(err) => tracing::error!(?err, "failed to unwatch from cache"),
// }
// }
65 changes: 14 additions & 51 deletions svc/pkg/mm-config/ops/version-get/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,58 +55,21 @@ async fn handle(

// TODO: There's a bug with this that returns the lobby groups for the wrong
// version, can't figure this out
// let versions = ctx
// .cache()
// .immutable()
// .fetch_all_proto("versions", req_version_ids, |mut cache, req_version_ids| {
// let ctx = ctx.base();
//
// async move {
// fetch_versions(&ctx, req_version_ids)
// .await?
// .into_iter()
// .for_each(|(version_id, version)| {
// cache.resolve_with_topic(
// &version_id,
// version,
// ("game_mm_versions", &version_id),
// )
// });
// Ok(cache)
// }
// })
// .await?;
let versions = ctx
.cache()
.immutable()
.fetch_all_proto("versions", req_version_ids, |mut cache, req_version_ids| {
let ctx = ctx.base();

// HACK: Because fetch all doesn't work, we'll use fetch one
// let mut versions = Vec::new();
// for version_id in req_version_ids {
// let version = ctx
// .cache()
// .immutable()
// .fetch_one_proto("versions2", version_id, |mut cache, req_version_id| {
// let ctx = ctx.base();
//
// async move {
// let versions = fetch_versions(&ctx.base(), vec![req_version_id]).await?;
// ensure!(versions.len() <= 1, "too many versions");
// if let Some((_, version)) = versions.into_iter().next() {
// cache.resolve(&version_id, version);
// }
//
// Ok(cache)
// }
// })
// .await?;
// if let Some(version) = version {
// versions.push(version);
// }
// }

let versions = fetch_versions(&ctx.base(), req_version_ids)
.await?
.into_iter()
.map(|x| x.1)
.collect::<Vec<_>>();
async move {
fetch_versions(&ctx, req_version_ids)
.await?
.into_iter()
.for_each(|(version_id, version)| cache.resolve(&version_id, version));
Ok(cache)
}
})
.await?;

Ok(mm_config::version_get::Response { versions })
}
Expand Down