Skip to content

Commit

Permalink
fix(cache): mixed values in Cache::fetch_all (#927)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
NathanFlurry committed Jun 19, 2024
1 parent 853cf06 commit d69a072
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 68 deletions.
5 changes: 5 additions & 0 deletions lib/cache/build/src/getter_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ where
.collect()
}

/// All keys.
pub(super) fn keys(&self) -> &[GetterCtxKey<K, V>] {
&self.keys[..]
}

/// If all keys have an associated value.
pub(super) fn all_keys_have_value(&self) -> bool {
self.keys.iter().all(|x| x.value.is_some())
Expand Down
38 changes: 21 additions & 17 deletions lib/cache/build/src/req_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,18 @@ impl RequestConfig {
.with_label_values(&[&base_key])
.inc_by(keys.len() as u64);

let redis_keys = keys
// Build context.
//
// Drop `keys` bc this is not the same as the keys list in `ctx`, so it should not be used
// again.
let mut ctx = GetterCtx::new(base_key.clone().into(), keys.to_vec());
drop(keys);

// Build keys to look up values in Redis
let redis_keys = ctx
.keys()
.iter()
.map(|key| self.cache.build_redis_cache_key(&base_key, key))
.map(|key| self.cache.build_redis_cache_key(&base_key, &key.key))
.collect::<Vec<_>>();

// Build Redis command explicitly, since `conn.get` with one value will
Expand Down Expand Up @@ -183,7 +192,6 @@ impl RequestConfig {
);

// Create the getter ctx and resolve the cached values
let mut ctx = GetterCtx::new(base_key.clone().into(), keys.to_vec());
for (i, value) in cached_values.into_iter().enumerate() {
if let Some(value) = value {
let value = decoder(&value)?;
Expand Down Expand Up @@ -266,12 +274,8 @@ impl RequestConfig {

// Fall back to the getter since we can't fetch the value from
// the cache
let ctx = getter(
GetterCtx::new(base_key.into(), keys.to_vec()),
keys.to_vec(),
)
.await
.map_err(Error::Getter)?;
let keys = ctx.unresolved_keys();
let ctx = getter(ctx, keys).await.map_err(Error::Getter)?;

Ok(ctx.into_values())
}
Expand Down Expand Up @@ -656,11 +660,11 @@ impl RequestConfig {
}
}

#[tracing::instrument(skip(conn))]
async fn unwatch_gracefully(conn: &mut RedisPool) {
tracing::debug!("unwatching");
match redis::cmd("UNWATCH").query_async::<_, ()>(conn).await {
Ok(_) => tracing::debug!("unwatched successfully"),
Err(err) => tracing::error!(?err, "failed to unwatch from cache"),
}
}
// #[tracing::instrument(skip(conn))]
// async fn unwatch_gracefully(conn: &mut RedisPool) {
// tracing::debug!("unwatching");
// match redis::cmd("UNWATCH").query_async::<_, ()>(conn).await {
// Ok(_) => tracing::debug!("unwatched successfully"),
// Err(err) => tracing::error!(?err, "failed to unwatch from cache"),
// }
// }
65 changes: 14 additions & 51 deletions svc/pkg/mm-config/ops/version-get/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,58 +55,21 @@ async fn handle(

// TODO: There's a bug with this that returns the lobby groups for the wrong
// version, can't figure this out
// let versions = ctx
// .cache()
// .immutable()
// .fetch_all_proto("versions", req_version_ids, |mut cache, req_version_ids| {
// let ctx = ctx.base();
//
// async move {
// fetch_versions(&ctx, req_version_ids)
// .await?
// .into_iter()
// .for_each(|(version_id, version)| {
// cache.resolve_with_topic(
// &version_id,
// version,
// ("game_mm_versions", &version_id),
// )
// });
// Ok(cache)
// }
// })
// .await?;
let versions = ctx
.cache()
.immutable()
.fetch_all_proto("versions", req_version_ids, |mut cache, req_version_ids| {
let ctx = ctx.base();

// HACK: Because fetch all doesn't work, we'll use fetch one
// let mut versions = Vec::new();
// for version_id in req_version_ids {
// let version = ctx
// .cache()
// .immutable()
// .fetch_one_proto("versions2", version_id, |mut cache, req_version_id| {
// let ctx = ctx.base();
//
// async move {
// let versions = fetch_versions(&ctx.base(), vec![req_version_id]).await?;
// ensure!(versions.len() <= 1, "too many versions");
// if let Some((_, version)) = versions.into_iter().next() {
// cache.resolve(&version_id, version);
// }
//
// Ok(cache)
// }
// })
// .await?;
// if let Some(version) = version {
// versions.push(version);
// }
// }

let versions = fetch_versions(&ctx.base(), req_version_ids)
.await?
.into_iter()
.map(|x| x.1)
.collect::<Vec<_>>();
async move {
fetch_versions(&ctx, req_version_ids)
.await?
.into_iter()
.for_each(|(version_id, version)| cache.resolve(&version_id, version));
Ok(cache)
}
})
.await?;

Ok(mm_config::version_get::Response { versions })
}
Expand Down

0 comments on commit d69a072

Please sign in to comment.