Skip to content

Commit

Permalink
Synchronize cache update (#3408)
Browse files Browse the repository at this point in the history
Fixes gh-3407
  • Loading branch information
abelsromero committed May 29, 2024
1 parent a54297b commit fbbc8a6
Showing 1 changed file with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,25 @@ public void onApplicationEvent(RefreshRoutesEvent event) {
.onErrorResume(s -> Mono.just(List.of()));

scopedRoutes.subscribe(scopedRoutesList -> {
Flux.concat(Flux.fromIterable(scopedRoutesList), getNonScopedRoutes(event))
.sort(AnnotationAwareOrderComparator.INSTANCE).materialize().collect(Collectors.toList())
.subscribe(this::publishRefreshEvent, this::handleRefreshError);
updateCache(Flux.concat(Flux.fromIterable(scopedRoutesList), getNonScopedRoutes(event))
.sort(AnnotationAwareOrderComparator.INSTANCE));
}, this::handleRefreshError);
}
else {
final Mono<List<Route>> allRoutes = fetch().collect(Collectors.toList());

allRoutes.subscribe(
list -> Flux.fromIterable(list).materialize().collect(Collectors.toList())
.subscribe(this::publishRefreshEvent, this::handleRefreshError),
this::handleRefreshError);
allRoutes.subscribe(list -> updateCache(Flux.fromIterable(list)), this::handleRefreshError);
}
}
catch (Throwable e) {
handleRefreshError(e);
}
}

private synchronized void updateCache(Flux<Route> routes) {
routes.materialize().collect(Collectors.toList()).subscribe(this::publishRefreshEvent,
this::handleRefreshError);
}

private void publishRefreshEvent(List<Signal<Route>> signals) {
cache.put(CACHE_KEY, signals);
applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this));
Expand Down

0 comments on commit fbbc8a6

Please sign in to comment.