Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive types support for @Cacheable methods [SPR-14235] #17920

Closed
spring-projects-issues opened this issue Apr 29, 2016 · 36 comments
Closed

Reactive types support for @Cacheable methods [SPR-14235] #17920

spring-projects-issues opened this issue Apr 29, 2016 · 36 comments
Assignees
Labels
in: core Issues in core modules (aop, beans, core, context, expression) type: enhancement A general enhancement
Milestone

Comments

@spring-projects-issues
Copy link
Collaborator

spring-projects-issues commented Apr 29, 2016

Pablo Díaz-López opened SPR-14235 and commented

Currently when using cache annotations on beans it caches the Observables like other types, So it will not cache its value.

I tried to use the following pattern to handle it:

@Cacheable("items")
public Observable<Item> getItem(Long id) {
    return Observable.just(id)
        .map(myrepo::getById)
        .cache();
}

In the happy path, as we cache the Observable values works pretty well, but if getById throws an exception the observable is cached with the exception which isn't how it should work.

It would be also very nice to have support to Single.

If you give me some advice I can try to do a PR to solve this.


Affects: 4.2.5

Sub-tasks:

Referenced from: pull request #1066

1 votes, 8 watchers

@spring-projects-issues spring-projects-issues added type: enhancement A general enhancement in: core Issues in core modules (aop, beans, core, context, expression) labels Jan 11, 2019
@spring-projects-issues spring-projects-issues added this to the 5.x Backlog milestone Jan 11, 2019
@cbornet
Copy link

cbornet commented Jul 1, 2019

Hi, what is the status of this ? If I understand well, there was discussion about a ReactiveCache since JSR-107 is a blocking API. Has something evolved since the opening of this issue ?

@snicoll
Copy link
Member

snicoll commented Jul 9, 2019

Nothing has been evolving in that area I am afraid and given the lack of proprietary implementations, I think cache vendors themselves aren't keen to explore that route either. This leaves us with the blocking model which isn't a great fit with whatever @Cacheable has to offer.

There is a cache operator in reactor though and I remember so initial work trying to harmonize this here. Perhaps @simonbasle can refresh my memory?

@simonbasle
Copy link
Contributor

I made an attempt at an opinionated API towards caching in reactor-extra (see reactor-addons repo), but that has not seen much focus nor feedback since then, so I wouldn't claim it is perfect.

@tomasulo
Copy link

tomasulo commented May 8, 2020

Are there any updates on this?

@snicoll
Copy link
Member

snicoll commented May 10, 2020

No, the latest update on is here. Most cache libraries (including JCache) are still on a blocking model so my comment above still stands.

@k631583871

This comment has been minimized.

@snicoll

This comment has been minimized.

@ankurpathak
Copy link

Redis has reactive driver. So it can be implemented for redis

@jhoeller jhoeller modified the milestones: 5.x, 6.0.x Nov 1, 2021
@Bryksin
Copy link

Bryksin commented Nov 5, 2021

I made new project for reactive cache with proper annotation usage, Spring Boot auto-configuration and tests, looks like working well, it will be soon deployed to our production.

@bezrukavyy
Copy link

Folks, is there a technical reason @Cacheable is not implemented with Reactor? I've implemented cache with Caffeine AsyncCache, as a AOP MethodInterceptor, but one of my colleagues made a comment that we have to be careful with the interceptor approach since this is how the normal @Cacheable is implemented, so if it were so trivial why didn't Spring add this support to be used with Reactor. Is there a performance penalty somehow?

We can do something similar with a custom operator for the flow, but I really like the AOP interceptor approach - very clean and gives me access to the method's attributes in bulk.

@howardem
Copy link

Developers, myself included, have been waiting for Spring to support the reactive types in the Cache Annotation implementation, also known as Cache Abstraction. What is interesting in this story, Micronaut, a relative new framework compared to Spring, added support for reactives types in their Cache Annotation implementation since their 1.0.0 GA Release back in October 2018. You can check it by yourself:

Screen Shot 2021-12-23 at 12 23 01 PM

Not related to this topic, but another area where Micronaut has been more successful than Spring is supporting reactive types through their declarative HTTP Client. Spring Cloud Open Feign doesn't support reactive types and in the documentation they explicitly say they won't until OpenFeign supports Project Reactor. Interesting, Micronaut's declarative HTTP Client supports any type that implements the org.reactivestreams.Publisher interface.

BTW, I'm a huge Spring Framework fan, I've been coding Spring based applications since 2005 and for the last 5 years doing Spring Cloud microservices, but my dear Spring folks, in my honest opinion it is time to catch up!

@simonbasle
Copy link
Contributor

@howardem yeah, Micronaut does have an AsyncCache API, which might in some case use the backing cache provider's own async methods, or put some async lipstick on a blocking-only API (by running things in dedicated threads)....

One thing to consider is that Micronaut's Cacheable interceptor needs to work with the common denominator of cache providers it claim to support. As a result, caching the result of an reactive-returning method boils down to these steps:

  1. get from cache as Mono<Optional<T>>
  2. flatMap on the optional
  3. if valued, just unwrap the Optional to a T. DONE
  4. if empty, convert the original method return Publisher to a Mono
  5. flatMap the result of that Publisher and call asyncCache.put with it
  6. (if said publisher instead completes empty, continue the sequence with asyncCache.invalidate instead)

One glaring limitation is that it is not "atomic". It is comparable to calling a ConcurentMap (containsKey(k)) ? get(k) : put(k, v) instead of the atomic putIfAbsent(k, v).

The thing is, caching atomically is more important in the case of an asynchronous method:

  • The fact that the original method is asynchronous brings the possibility of additional latency when "creating" the value to be put in the cache in case of Cache Miss.
  • This opens up a larger window for problems like Cache Stampede to occur: if multiple clients call the endpoint at roughly the same time, it is likely they will all see a Cache Miss and the interceptor will trigger the original method multiple times.

Micronaut's AsyncCache interface might have tried to introduce some remediation in the form of get(K, Supplier<V>), but that Supplier is still a blocking thing. Again, this seems to be for lowest common denominator reasons, as only Caffeine has a truly async API including a get(K, Function<K, CompletableFuture<V>>) (it is actually a BiFunction, but you get the idea).

Note that Micronaut's own Cacheable interceptor doesn't even use that get(K, Supplier<V>) method from AsyncCache... For caching reactive types, I don't think it actually even try to support the atomic parameter to Cacheable at all.

So yeah, it has an async cache abstraction. It is best effort, and might hide subtle issues like cache stampeding, so I would take it with a grain of salt.

@ben-manes
Copy link
Contributor

fwiw, I believe Quarkus implements its caches by requiring that they are asynchronous. Their interceptor checks if the return type is reactive, and if not simply block indefinitely (unless a timeout is specified). Internally they use a Caffeine AsyncCache, saving a thread by performing the work on the caller by manually inserting and completing the future value. I'm not sure if they support any provider other than Caffeine, so that makes it much easier as they could require asynchronous implementations and emulate a synchronous api by blocking. Maybe Spring Cache could iterate towards something similar (assuming some caveats for other providers can be made acceptable)?

@simonbasle
Copy link
Contributor

yeah, that's interesting. they initially (very early) thought about supporting multiple caching providers but the reduction in scope (annotations only at first) and the focus on the 80% case led them to only use Caffeine as the underlying implementation 👍 they don't seem to use your get(K key, BiFunction<K, ExecutorService, CompletableFuture> loader) variant @ben-manes, but instead rely on putIfAbsent and computeIfPresent.

but yeah, they focused on an async API returning Uni reactive types (which can very easily be presented as CompletableFutures to Caffeine).

@ben-manes
Copy link
Contributor

I think that I suggested the putIfAbsent approach so that they could reuse the caller thread. There were many PRs as they iterated and I advised from a point of ignorance (as I use Guice at work, no annotated caching, and it’s all a weekend hobby). I think going async was an evolution of their internal apis, but my knowledge is very fuzzy. I hope that other caching providers become async friendly to enable similar integrations, but I don’t know the state of that support.

@62mkv
Copy link

62mkv commented Mar 15, 2022

hy @Bryksin how has it been working for you? we also currently use Redis-backed cache (via redisson), but we have to implement it in the code level, along the lines of

getFromCache(key).
.switchIfEmpty(getFromUpstream().delayUntil(value -> putToCache(key, value))

which I don't like too much, as caching should be treated a cross-cutting concern, easily disable-able if needed

I like the API of your library (https://github.com/Bryksin/redis-reactive-cache) but my coworkers are hesitant given apparent lack of community adoption :(

also would be interesting to learn of @simonbasle take on this approach 🙏

@Bryksin
Copy link

Bryksin commented Mar 15, 2022

Hi @62mkv, unfortunately usage of the lib is very low, no time to focus on it properly
It was just MVP version and definitely has room for improvement, at least:

  • Config property to enable/disable
  • Exceptions and error handling (for example connection to redis failed, it should continue with method execution)
    So yes, there is definitely room for improvement but no time, and community didn't pick it up. Though I would prefer Spring boot official solution rather then custom stuff

Though time is going but such important aspect as Cache continue to be ignored by Spring for reactive stuff by some reason...

@snicoll
Copy link
Member

snicoll commented Mar 15, 2022

@Bryksin with regards to "ignoring for some reason", this comment above yours should help nuance that hopefully.

@jaesuk-kim0808
Copy link

jaesuk-kim0808 commented Oct 11, 2022

Hi. I share the code I use.
Although it has not yet been used in a real product, performance and functionality will be verified through load tests according to various scenarios in the near future.
Hope this helps. Please let me know if there is a problem.

@RequiredArgsConstructor
@Aspect
@Component
public class AsyncCacheAspect {

    private final AsyncCacheManager asyncCacheManager;

    @Pointcut("@annotation(AsyncCacheable)")
    public void pointcut() {
    }

    @Around("pointcut()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        ParameterizedType parameterizedType = (ParameterizedType) method.getGenericReturnType();
        Type rawType = parameterizedType.getRawType();

        if (!rawType.equals(Mono.class) && !rawType.equals(Flux.class)) {
            throw new IllegalArgumentException("The return type is not Mono/Flux. Use Mono/Flux for return type. method: " + method.getName());
        }

        AsyncCacheable asyncCacheable = method.getAnnotation(AsyncCacheable.class);
        String cacheName = asyncCacheable.name();
        Object[] args = joinPoint.getArgs();

        AsyncCache asyncCache = asyncCacheManager.get(cacheName);
        if (Objects.isNull(asyncCache)) {
            return joinPoint.proceed();
        }

        //Return type : Mono
        if (rawType.equals(Mono.class)) {
            Mono retVal = Mono.defer(() -> {
                try {
                    return (Mono) joinPoint.proceed();
                } catch (Throwable th) {
                    throw new BusinessException(ResultCode.UNKNOWN_ERROR, th.getMessage());
                }
            });

            CompletableFuture completableFuture = asyncCache.get(generateKey(args), (key, exec) -> (retVal).toFuture());
            return  Mono.fromFuture(completableFuture);
        }

        //Return type : Flux
        Mono retVal = Mono.from(Flux.defer(() -> {
            try {
                return  ((Flux) joinPoint.proceed()).collectList();
            } catch (Throwable th) {
                throw new BusinessException(ResultCode.UNKNOWN_ERROR, th.getMessage());
            }
        }));

        CompletableFuture<List> completableFuture = asyncCache.get(generateKey(args), (key, exec) -> (retVal).toFuture());
        return Flux.from(Mono.fromFuture(completableFuture)).flatMap(x -> Flux.fromIterable(x));
    }

    private String generateKey(Object... objects) {
        return Arrays.stream(objects)
            .map(obj -> obj == null ? "" : obj.toString())
            .collect(Collectors.joining("#"));
    }
}
@RequiredArgsConstructor
@Component
public class AsyncCacheableMethodProcessor implements BeanPostProcessor {

    private final AsyncCacheManager asyncCacheManager;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {

        Arrays.stream(bean.getClass().getDeclaredMethods())
            .filter(m -> m.isAnnotationPresent(AsyncCacheable.class))
            .forEach(m -> {
                AsyncCacheable asyncCacheable = m.getAnnotation(AsyncCacheable.class);
                String cacheName = asyncCacheable.name();
                CacheType cacheType = CacheType.nameOf(cacheName);
                if (Objects.nonNull(cacheType)) {
                    asyncCacheManager.computeIfAbsent(cacheName, (key) ->  {
                        return Caffeine.newBuilder()
                            .maximumSize(cacheType.getMaximumSize())
                            .expireAfterWrite(cacheType.getExpiredAfterWrite(), TimeUnit.SECONDS)
                            .buildAsync();
                    });
                }
            });

        return bean;
    }

}
//This code is in Data access layer.
@AsyncCacheable(name = "getBySvcId")
@Override
public Mono<Domain> getBySvcId(String svcId) {}

AsyncCacheManager is a spring-bean with ConcurrentHashMap<String, AsyncCache>.
CaheType is an enum that defines the key to be stored in AsyncCacheManager and 'maximumSize, expireAfterWrite' required to create Caffeine's AsyncCache.

@smilejh
Copy link

smilejh commented Oct 17, 2022

Thank you. This is very helped me.

@kidhack83
Copy link

kidhack83 commented Oct 26, 2022

Hi, @jaesuk-kim0808 I'm integrating your code but the AOP is not invoked...

//This code is in Data access layer.
@AsyncCacheable(name = "getBySvcId")
@Override
public Mono<Domain> getBySvcId(String svcId) {}

Can I put the method to cached in a service implementation as a private method? For example:

@AsyncCacheable(name = "getCurrencies")
private Mono<List<RateDto>> getCurrencies(String source, String target) {
  return currencyRatesApi.getCurrencyRates()
      .collectList();
}

The parameters aren't used in the code because I need to cache all elements. I need to change this code, but the pointcut is not launched
I try to change it to public but the AOP is not invoked...

Thanks for your great work

@vgaborabs
Copy link

Hi @kidhack83
As far as i know Spring AOP advices rely on proxies, which means they do not fire when the methods are being called from within the same instance. Try to adjust your code accordingly:

@Component
public class ServiceA {
  ...
  @AsyncCacheable(name = "getCurrencies")
  public Mono<List<RateDto>> getCurrencies(String source, String target) {
    return currencyRatesApi.getCurrencyRates()
        .collectList();
  }
  ...
}

@Component
@RequiredArgsConstructor
public class ServiceB {
  private final ServiceA serviceA;
  ...
  private Mono<List<RateDto>> someMethod(String source, String target) {
     return serviceA.getCurrencies(source, target);
  }
  ...
}

I hope this helps.

@shaikezr
Copy link

I took the solutions in this thread and made a more fleshed out version that includes the annotation, the imports, and the dependencies: https://github.com/shaikezr/async-cacheable

@jhoeller jhoeller modified the milestones: 6.x Backlog, 6.1.x, 6.1.0-M4 Jul 13, 2023
@jhoeller jhoeller assigned snicoll and jhoeller and unassigned snicoll Jul 18, 2023
@jhoeller
Copy link
Contributor

jhoeller commented Jul 21, 2023

While we still do not see @Cacheable and co as a perfect companion for reactive service methods, we come back to this now from a pragmatic perspective: It is quite straightforward to support @Cacheable for CompletableFuture-returning methods, and with reasonable adaptation effort, the same approach can work for single-value reactive types such as Mono. With Flux, annotation-driven caching is never going to be an ideal fit, but when asked for it through a corresponding declaration, the best we can do is to collect the Flux outcome into a List and cache that, and then restore a Flux from there in case of a cache hit.

On the provider side, we got Caffeine's CompletableFuture-based AsyncCache which is a fine fit with the approach above. This allows for individual cache annotation processing as well as an @Cacheable(sync=true) style synchronized approach, with very similar semantics as with cache annotation usage on imperative methods. All it takes SPI-wise for that to be possible is two new methods on Spring's Cache interface: CompletableFuture retrieve(key) for plain retrieval and CompletableFuture retrieve(key, Supplier<CompletableFuture>) for sync-style retrieval. Those are designed for an efficient binding in CaffeineCache and can also be implemented in ConcurrentMapCache with some CompletableFuture.supplyAsync usage.

There are a few caveats: In order to address the risk of cache stampede for expensive operations, @Cacheable(sync=true) needs to be declared (just like with our imperative caching arrangement), with only one such caching annotation allowed on a given method then. Also, we expect Cache.evict, Cache.clear and also Cache.put to be effectively non-blocking for individual usage underneath @CacheEvict and @CachePut. This is the case with our common cache providers unless configured for immediate write-through which none of our retrieve-supporting providers have as a feature anyway. So in summary, for providers supporting our retrieve operations, evict/clear/put need to be effectively non-blocking in their implementration. This matches the existing semantic definition of those methods where they allow for asynchronous or deferred backend execution already, e.g. in transactional scenarios, with callers not expecting to see immediate effect when querying the cache right afterwards. The javadoc of those methods hints at the non-blocking requirement for reactive interactions as well now.

Thanks everyone for your comments in this thread! This served as very valuable inspiration.

@ben-manes
Copy link
Contributor

With Flux, annotation-driven caching is never going to be an ideal fit, but when asked for it through a corresponding declaration, the best we can do is to collect the Flux outcome into a List and cache that, and then restore a Flux from there in case of a cache hit.

One somewhat related area where the Flux reactive type is very useful is to support coalescing bulk loads. This allows for collecting multiple independent loads using Reactor's bufferTimeout(maxSize, maxTime) to perform a batch request. I don't believe the annotations support bulk operations yet, but it is a very nice mash-up.

Here is a short (~30 LOC) example demonstrating this using Caffeine directly.

CoalescingBulkLoader
public final class CoalescingBulkLoader<K, V> implements AsyncCacheLoader<K, V> {
  private final Function<Set<K>, Map<K, V>> mappingFunction;
  private final Sinks.Many<Request<K, V>> sink;

  public CoalescingBulkLoader(int maxSize, Duration maxTime,
      int parallelism, Function<Set<K>, Map<K, V>> mappingFunction) {
    this.mappingFunction = requireNonNull(mappingFunction);
    sink = Sinks.many().unicast().onBackpressureBuffer();
    sink.asFlux()
        .bufferTimeout(maxSize, maxTime)
        .parallel(parallelism)
        .runOn(Schedulers.boundedElastic())
        .subscribe(this::handle);
  }

  @Override public CompletableFuture<V> asyncLoad(K key, Executor executor) {
    var result = new CompletableFuture<V>();
    sink.tryEmitNext(new Request<>(key, result)).orThrow();
    return result;
  }

  private void handle(List<Request<K, V>> requests) {
    try {
      var results = mappingFunction.apply(requests.stream().map(Request::key).collect(toSet()));
      requests.forEach(request -> request.result.complete(results.get(request.key())));
    } catch (Throwable t) {
      requests.forEach(request -> request.result.completeExceptionally(t));
    }
  }

  private record Request<K, V>(K key, CompletableFuture<V> result) {}
}
Sample test
@Test
public void coalesce() {
  AsyncLoadingCache<Integer, Integer> cache = Caffeine.newBuilder()
      .buildAsync(new CoalescingBulkLoader<>(
          /* maxSize */ 5, /* maxTime */ Duration.ofMillis(50), /* parallelism */ 5,
          keys -> keys.stream().collect(toMap(key -> key, key -> -key))));

  var results = new HashMap<Integer, CompletableFuture<Integer>>();
  for (int i = 0; i < 82; i++) {
    results.put(i, cache.get(i));
  }
  for (var entry : results.entrySet()) {
    assertThat(entry.getValue().join()).isEqualTo(-entry.getKey());
  }
}

@dalegaspi
Copy link

dalegaspi commented Jul 21, 2023

just to throw things in the mix...there are a couple of additional challenges that we sorta need and would still keep the annotation approach:

  1. refreshAfterWrite support. probably doesn't need too much explanation: most of the time we are ok with returning (old/stale) cached data while key is being refreshed asynchronously. we have an implementation that's in the process of being tested as of this writing.
  2. Conditional caching (aka unless or condition attribute). this is a bit trickier and we are running into thread-safety issues.

we are going to share code snippets when we're a bit more confident that it actually works 🙃

oh and one more thing, i see the implementations here casually using .toFuture() to convert a reactive stream to CompletableFuture...which is fine for the most part, but just be mindful when you have operations that can potentially stall the reactive threads because you converted it into a CompletableFuture (we learned this the hard way). this can be mitigated by switching threadpools using publishOn or subscribeOn where applicable.

@jhoeller
Copy link
Contributor

The initial drop is in main now, feel free to check it out... The first milestone to include this will be 6.1 M4 in mid August.

@mschaeferjelli
Copy link

I may be running into thread issues on unless. With 6.1.5, I'm getting org.springframework.expression.spel.SpelEvaluationException: EL1011E: Method call: Attempted to call method concat(java.lang.String) on null context object at org.springframework.expression.spel.ast.MethodReference.throwIfNotNullSafe(MethodReference.java:166) at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:117) at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:107) at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:67) at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:97) at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:114) at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:273) at org.springframework.cache.interceptor.CacheOperationExpressionEvaluator.key(CacheOperationExpressionEvaluator.java:106) at org.springframework.cache.interceptor.CacheAspectSupport$CacheOperationContext.generateKey(CacheAspectSupport.java:913) at org.springframework.cache.interceptor.CacheAspectSupport.generateKey(CacheAspectSupport.java:703) at org.springframework.cache.interceptor.CacheAspectSupport.findCachedValue(CacheAspectSupport.java:480) at org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:431) at org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:395) at org.springframework.cache.interceptor.CacheInterceptor.invoke(CacheInterceptor.java:74) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:717)

This is the function header and annotations:
@Cacheable(value = "broadcaster_id_cache", key = "#siteExternalId.concat('-').concat(#adNetworkIdentity)", unless = "#result == null") public Optional<Long> getBroadcasterId(final String siteExternalId, final String adNetworkIdentity) {
I confirm that the values are not null in the function.
I've downgraded to 6.0 and it's working correctly.

@sbrannen
Copy link
Member

sbrannen commented Apr 3, 2024

@mschaeferjelli, you have to compile your code with the -parameters flag.

This is documented in the 6.1 upgrade notes: https://github.com/spring-projects/spring-framework/wiki/Upgrading-to-Spring-Framework-6.x#parameter-name-retention

@mschaeferjelli
Copy link

That worked, thanks!. As a note, I had to upgrade the maven-compiler-plugin from 3.8.1 (to 3.13.1), because it was not honoring the xml
<configuration> <parameters>true</parameters> </configuration>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: core Issues in core modules (aop, beans, core, context, expression) type: enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests