Skip to content

Commit

Permalink
feat(feed): add periodic cache refresher
Browse files Browse the repository at this point in the history
  • Loading branch information
ymgyt committed Apr 20, 2024
1 parent a0c2c53 commit d831d3e
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/synd_feed/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ readme = "README.md"
version = "0.2.0"

[dependencies]
synd-o11y = { path = "../synd_o11y", version = "0.1.5" }

anyhow = { workspace = true }
async-graphql = { workspace = true, optional = true }
async-trait = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use moka::future::Cache;
// use moka::future::Cache;

use crate::{
feed::parser::{FetchFeed, FetchFeedResult},
types::{self, FeedUrl},
};

mod periodic_refresher;
pub use periodic_refresher::PeriodicRefresher;

type Cache = moka::future::Cache<FeedUrl, Arc<types::Feed>>;

#[derive(Clone, Copy)]
pub struct CacheConfig {
max_cache_size: u64,
Expand Down Expand Up @@ -57,7 +62,7 @@ pub struct CacheLayer<S> {
service: S,
// Use Arc to avoid expensive clone
// https://github.com/moka-rs/moka?tab=readme-ov-file#avoiding-to-clone-the-value-at-get
cache: Cache<FeedUrl, Arc<types::Feed>>,
cache: Cache,
}
impl<S> CacheLayer<S> {
/// Construct `CacheLayer` with default config
Expand All @@ -72,7 +77,7 @@ impl<S> CacheLayer<S> {
time_to_live,
} = config;

let cache = Cache::builder()
let cache = moka::future::Cache::builder()
.weigher(|_key, value: &Arc<types::Feed>| -> u32 {
value.approximate_size().try_into().unwrap_or(u32::MAX)
})
Expand All @@ -84,6 +89,15 @@ impl<S> CacheLayer<S> {
}
}

impl<S> CacheLayer<S>
where
S: Clone,
{
pub fn periodic_refresher(&self) -> PeriodicRefresher<S> {
PeriodicRefresher::new(self.service.clone(), self.cache.clone())
}
}

#[async_trait]
impl<S> FetchCachedFeed for CacheLayer<S>
where
Expand Down
88 changes: 88 additions & 0 deletions crates/synd_feed/src/feed/cache/periodic_refresher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::{sync::Arc, time::Duration};

use synd_o11y::metric;
use tracing::{error, info, warn};

use crate::feed::parser::FetchFeed;

use super::Cache;

pub struct PeriodicRefresher<S> {
service: S,
cache: Cache,
emit_metrics: bool,
}

impl<S> PeriodicRefresher<S> {
pub fn new(service: S, cache: Cache) -> Self {
Self {
service,
cache,
emit_metrics: false,
}
}

#[must_use]
pub fn with_emit_metrics(self, emit_metrics: bool) -> Self {
Self {
emit_metrics,
..self
}
}

fn emit_metrics(&self) {
// Should call cache.run_pending_tasks() ?
let entry_count = self.cache.entry_count();
let size = self.cache.weighted_size();

metric!(counter_cache.feed.count = entry_count);
metric!(counter_cache.feed.size = size);
}
}

impl<S> PeriodicRefresher<S>
where
S: FetchFeed + Clone + 'static,
{
#[tracing::instrument(skip_all, name = "feed::cache::refresh")]
async fn refresh(&self) -> anyhow::Result<()> {
// It is safe to insert while iterating to cache.
for (feed_url, _) in &self.cache {
let feed_url = Arc::unwrap_or_clone(feed_url);
match self.service.fetch_feed(feed_url.clone()).await {
Ok(new_feed) => {
self.cache.insert(feed_url, Arc::new(new_feed)).await;
}
Err(err) => {
warn!(
url = feed_url.as_str(),
"Failed to refresh feed cache: {err}"
);
}
}
}
Ok(())
}

pub async fn run(self, interval: Duration) {
info!(?interval, "Run periodic feed cache refresher");

let mut interval = tokio::time::interval(interval);

// Consume initial tick which return ready immediately
interval.tick().await;

loop {
interval.tick().await;

if self.emit_metrics {
self.emit_metrics();
}
if let Err(err) = self.refresh().await {
error!("Periodic refresh error: {err}");
} else {
info!("Refreshed feed cache");
}
}
}
}

0 comments on commit d831d3e

Please sign in to comment.