Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot receive message sometimes with SimpleBrokerMessageHandler [SPR-15282] #19848

Closed
spring-projects-issues opened this issue Feb 24, 2017 · 10 comments
Assignees
Labels
in: messaging Issues in messaging modules (jms, messaging)

Comments

@spring-projects-issues
Copy link
Collaborator

spring-projects-issues commented Feb 24, 2017

glodon opened SPR-15282 and commented

when large number of clients reconnected, some client cannot receive data

this problem seems to be caused by class DefaultSubscriptionRegistry,

if connections is more than CacheLimit, when all of them disconnected, updatecache will be cleared, but some destination will be remain in accessCache,

when clients reconnect , if the destination exist in accessCache,destination Info will not be update

so the message will not send sucessfully

// code for this problem

// add destination to cache
	public LinkedMultiValueMap<String, String> getSubscriptions(String destination, Message<?> message) {
		LinkedMultiValueMap<String, String> result = this.accessCache.get(destination);
		// only put when destination not exist in accessCache
		if (result == null) {
			synchronized (this.updateCache) {
				...
				if (!result.isEmpty()) {
					this.updateCache.put(destination, result.deepCopy());
					this.accessCache.put(destination, result);
				}
			}
		}
		return result;
	}
// clients disconnect
	public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
		synchronized (this.updateCache) {
			Set<String> destinationsToRemove = new HashSet<>();
			for (Map.Entry<String, LinkedMultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
				// set value for destinationsToRemove
			}
			for (String destination : destinationsToRemove) {
				this.updateCache.remove(destination);
				this.accessCache.remove(destination);
			}
		}
	}

Affects: 4.2.9, 4.3.6, 5.0 M5

Issue Links:

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

Any idea why the accessCache has destinations while the updateCache is cleared? Looking at DefaultSubscriptionRegistry$DestinationCache they are always updated together and I cannot see why or how they get out of sync.

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

I'm guessing this comment is related to the above since it was made on the same day and seems very similar?

The CacheLimit default value is 1024,we have a problem.
Firstly, we connected 5000 clients, and sended 5000 message for each client. All 5000 clients received msg.
Secondly, we disconnected the 5000 clients, and reconnected 5000 clients again.
Finally, we also sended 5000 messages for each client, only 1024 clients received msg. 
we want that all 5000 clients can receive message.

How many unique destinations are these 5000 clients subscribing to -- a small number of shared "topic" destinations or a larger number of unique "user" destinations? For the 5000 sent messages, are those 1 "user" destination message per user or 5000 messages to a shared topic that all users are subscribed to?

Not that it should matter, just trying to get a better sense of the scenario and reason about it. I still can't see why the accessCache would not clear together with the updateCache. Note also that the cache size is exposed for configuration and can be optimized through the MessageBrokerRegistry in the Java config.

@spring-projects-issues
Copy link
Collaborator Author

flyingsu commented

Thank u.
Firstly, I'm sorry I don't know how to config cache size in the Java config through +MessageBrokerRegistry+ after i reading Spring Doc.Can u tell me how to do ? Thank u very mach!
Secondly, I'll explain the issue detailedly. ok.
1.Default size of +updateCache+ is 1024, and override removeEldestEntry function.That means the max size of+ updateCache+ is 1024.But , there is not limit to the size of +accessCache+ . It can be 2000, 5000, or 10000 etc.

/** Map from destination -> <sessionId, subscriptionId> for fast look-ups */
		private final Map<String, MultiValueMap<String, String>> accessCache =
				new ConcurrentHashMap<String, MultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT);

		/** Map from destination -> <sessionId, subscriptionId> with locking */
		@SuppressWarnings("serial")
		private final Map<String, MultiValueMap<String, String>> updateCache =
				new LinkedHashMap<String, MultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT, 0.75f, true) {
					@Override
					protected boolean removeEldestEntry(Map.Entry<String, MultiValueMap<String, String>> eldest) {
						return size() > getCacheLimit();
					}
				};
/** Default maximum number of entries for the destination cache: 1024 */
	public static final int DEFAULT_CACHE_LIMIT = 1024;
  1. It will call +updateAfterRemovedSession+ function while the client disconnect. In this function, we Iterate +updateCache+ , and remove some elements from +updateCache+ and +accessCache+. That means it can remove 1024 elements mostly. If the size of +accessCache+ is 5000, 3976 elements still remain.
public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
			synchronized (this.updateCache) {
				Set<String> destinationsToRemove = new HashSet<String>();
				for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
					String destination = entry.getKey();
					MultiValueMap<String, String> sessionMap = entry.getValue();
					if (sessionMap.remove(info.getSessionId()) != null) {
						if (sessionMap.isEmpty()) {
							destinationsToRemove.add(destination);
						}
						else {
							this.accessCache.put(destination, new LinkedMultiValueMap<String, String>(sessionMap));
						}
					}
				}
				for (String destination : destinationsToRemove) {
					this.updateCache.remove(destination);
					this.accessCache.remove(destination);
				}
			}
		}
  1. It is not add any element to +updateCache+ an +accessCache+, when clients connect server.
  2. It is also not add any element to +updateCache+ an +accessCache+ ,when clients subscribe.Because we disconnect all clients ,and clear +updateCache+.
public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
			synchronized (this.updateCache) {
				for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
					String cachedDestination = entry.getKey();
					if (getPathMatcher().match(destination, cachedDestination)) {
						MultiValueMap<String, String> subs = entry.getValue();
						subs.add(sessionId, subsId);
						this.accessCache.put(cachedDestination, new LinkedMultiValueMap<String, String>(subs));
					}
				}
			}
		}

5.It will call +findSubscriptionsInternal,getSubscriptions+ functions when sending message.
At this moment , +updateCache+ is empty, and +accessCache+ has 3976 elements.
if destination is the same as last, the result is not null, the message is sending the wrong client that may be disconnecting.

public MultiValueMap<String, String> getSubscriptions(String destination) {
			return this.accessCache.get(destination);
		}
@Override
	protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) {
		MultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination);
		if (result != null) {
			return result;
		}
		result = new LinkedMultiValueMap<String, String>();
		for (SessionSubscriptionInfo info : this.subscriptionRegistry.getAllSubscriptions()) {
			for (String destinationPattern : info.getDestinations()) {
				if (this.pathMatcher.match(destinationPattern, destination)) {
					for (String subscriptionId : info.getSubscriptions(destinationPattern)) {
						result.add(info.sessionId, subscriptionId);
					}
				}
			}
		}
		if (!result.isEmpty()) {
			this.destinationCache.addSubscriptions(destination, result);
		}
		return result;
	}
  1. Our destination is like "\queue\pc\1", "\queue\pc\2".... The sessionID is different from last ,but the destination is the same as last when client reconnecting. So , when 5000 clients reconnecting server, +updateCache+ is empty, +accessCache+ remains 3972, we only receive valid 1024 messages.

@spring-projects-issues
Copy link
Collaborator Author

flyingsu commented

This +Glodon+ is my leader. haha

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

For the cacheLimit something like this (as of 4.3.2):

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

	@Override
	public void configureMessageBroker(MessageBrokerRegistry registry) {
		registry.setCacheLimit(1024);
		// ...
	}
}

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

Okay I think I know where the problem is.

I see your point that the accessCache can be larger than the updateCache but they are intended to remain in sync (same size, same contents). I think the issue is that when getting subscriptions, and adding to the cache, we should be putting into the accessCacheFirst. I'll experiment and confirm.

@spring-projects-issues
Copy link
Collaborator Author

flyingsu commented

Thank you~

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

flyingsu I thought I had this but I cannot find a way to grow the access cache beyond the size of the cacheLimit.

In DestinationCache the getSubscriptions method is the only one that can grow the cache size. In that method however when the updateCache reaches the limit, it also removes the same entry from the accessCache. So the two caches always have the same content. The other methods in DestinationCache do not grow the cache but only make updates to what is already in the cache.

Basically in your steps #1 and #2 above:

If the size of accessCache is 5000, 3976 elements still remain.

How did the accessCache get to 5000 in the first place? What sequence leads to this?

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

hi with 4.3.8 coming next week, just wondering if you have any further input? I presume you're looking at an actual case as opposed to theoretical possibility?

@spring-projects-issues
Copy link
Collaborator Author

spring-projects-issues commented May 19, 2017

Rossen Stoyanchev commented

I believe #20102 may have something to do with this ticket as well. Do you have a way to confirm that in your environment? It would be useful to know whether we can close this ticket which we would have to in any case otherwise with no further clues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: messaging Issues in messaging modules (jms, messaging)
Projects
None yet
Development

No branches or pull requests

2 participants