Skip to content

Commit

Permalink
Selector header name is exposed for configuration
Browse files Browse the repository at this point in the history
Issue: SPR-16732
  • Loading branch information
rstoyanchev committed Apr 17, 2018
1 parent 8748ba4 commit 246a6db
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 34 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,6 +43,7 @@
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.PathMatcher;
import org.springframework.util.StringUtils;

/**
* Implementation of {@link SubscriptionRegistry} that stores subscriptions
Expand Down Expand Up @@ -113,24 +114,25 @@ public int getCacheLimit() {
}

/**
* Configure the name of a selector header that a subscription message can
* have in order to filter messages based on their headers. The value of the
* header can use Spring EL expressions against message headers.
* <p>For example the following expression expects a header called "foo" to
* have the value "bar":
* Configure the name of a header that a subscription message can have for
* the purpose of filtering messages matched to the subscription. The header
* value is expected to be a Spring EL boolean expression to be applied to
* the headers of messages matched to the subscription.
* <p>For example:
* <pre>
* headers.foo == 'bar'
* </pre>
* <p>By default this is set to "selector".
* <p>By default this is set to "selector". You can set it to a different
* name, or to {@code null} to turn off support for a selector header.
* @param selectorHeaderName the name to use for a selector header
* @since 4.2
*/
public void setSelectorHeaderName(String selectorHeaderName) {
Assert.notNull(selectorHeaderName, "'selectorHeaderName' must not be null");
this.selectorHeaderName = selectorHeaderName;
this.selectorHeaderName = StringUtils.hasText(selectorHeaderName) ? selectorHeaderName : null;
}

/**
* Return the name for the selector header.
* Return the name for the selector header name.
* @since 4.2
*/
public String getSelectorHeaderName() {
Expand All @@ -142,25 +144,31 @@ public String getSelectorHeaderName() {
protected void addSubscriptionInternal(
String sessionId, String subsId, String destination, Message<?> message) {

Expression expression = getSelectorExpression(message.getHeaders());
this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
}

private Expression getSelectorExpression(MessageHeaders headers) {
Expression expression = null;
MessageHeaders headers = message.getHeaders();
String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
if (selector != null) {
try {
expression = this.expressionParser.parseExpression(selector);
this.selectorHeaderInUse = true;
if (logger.isTraceEnabled()) {
logger.trace("Subscription selector: [" + selector + "]");
if (getSelectorHeaderName() != null) {
String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
if (selector != null) {
try {
expression = this.expressionParser.parseExpression(selector);
this.selectorHeaderInUse = true;
if (logger.isTraceEnabled()) {
logger.trace("Subscription selector: [" + selector + "]");
}
}
}
catch (Throwable ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to parse selector: " + selector, ex);
catch (Throwable ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to parse selector: " + selector, ex);
}
}
}
}
this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
return expression;
}

@Override
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,23 +51,27 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {

private static final byte[] EMPTY_PAYLOAD = new byte[0];

private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<String, SessionInfo>();

private SubscriptionRegistry subscriptionRegistry;

private PathMatcher pathMatcher;

private Integer cacheLimit;

private String selectorHeaderName = "selector";

private TaskScheduler taskScheduler;

private long[] heartbeatValue;

private ScheduledFuture<?> heartbeatFuture;

private MessageHeaderInitializer headerInitializer;


private SubscriptionRegistry subscriptionRegistry;

private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<String, SessionInfo>();

private ScheduledFuture<?> heartbeatFuture;


/**
* Create a SimpleBrokerMessageHandler instance with the given message channels
* and destination prefixes.
Expand Down Expand Up @@ -96,12 +100,40 @@ public void setSubscriptionRegistry(SubscriptionRegistry subscriptionRegistry) {
this.subscriptionRegistry = subscriptionRegistry;
initPathMatcherToUse();
initCacheLimitToUse();
initSelectorHeaderNameToUse();
}

public SubscriptionRegistry getSubscriptionRegistry() {
return this.subscriptionRegistry;
}

/**
* Configure the name of a header that a subscription message can have for
* the purpose of filtering messages matched to the subscription. The header
* value is expected to be a Spring EL boolean expression to be applied to
* the headers of messages matched to the subscription.
* <p>For example:
* <pre>
* headers.foo == 'bar'
* </pre>
* <p>By default this is set to "selector". You can set it to a different
* name, or to {@code null} to turn off support for a selector header.
* @param selectorHeaderName the name to use for a selector header
* @since 4.3.17
* @see #setSubscriptionRegistry
* @see DefaultSubscriptionRegistry#setSelectorHeaderName(String)
*/
public void setSelectorHeaderName(String selectorHeaderName) {
this.selectorHeaderName = selectorHeaderName;
initSelectorHeaderNameToUse();
}

private void initSelectorHeaderNameToUse() {
if (this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
((DefaultSubscriptionRegistry) this.subscriptionRegistry).setSelectorHeaderName(this.selectorHeaderName);
}
}

/**
* When configured, the given PathMatcher is passed down to the underlying
* SubscriptionRegistry to use for matching destination to subscriptions.
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,8 @@ public class SimpleBrokerRegistration extends AbstractBrokerRegistration {

private long[] heartbeat;

private String selectorHeaderName = "selector";


public SimpleBrokerRegistration(SubscribableChannel inChannel, MessageChannel outChannel, String[] prefixes) {
super(inChannel, outChannel, prefixes);
Expand Down Expand Up @@ -65,6 +67,24 @@ public SimpleBrokerRegistration setHeartbeatValue(long[] heartbeat) {
return this;
}

/**
* Configure the name of a header that a subscription message can have for
* the purpose of filtering messages matched to the subscription. The header
* value is expected to be a Spring EL boolean expression to be applied to
* the headers of messages matched to the subscription.
* <p>For example:
* <pre>
* headers.foo == 'bar'
* </pre>
* <p>By default this is set to "selector". You can set it to a different
* name, or to {@code null} to turn off support for a selector header.
* @param selectorHeaderName the name to use for a selector header
* @since 4.3.17
*/
public void setSelectorHeaderName(String selectorHeaderName) {
this.selectorHeaderName = selectorHeaderName;
}


@Override
protected SimpleBrokerMessageHandler getMessageHandler(SubscribableChannel brokerChannel) {
Expand All @@ -76,6 +96,7 @@ protected SimpleBrokerMessageHandler getMessageHandler(SubscribableChannel broke
if (this.heartbeat != null) {
handler.setHeartbeatValue(this.heartbeat);
}
handler.setSelectorHeaderName(this.selectorHeaderName);
return handler;
}

Expand Down
Expand Up @@ -264,6 +264,8 @@ public void registerSubscriptionWithSelector() throws Exception {
String destination = "/foo";
String selector = "headers.foo == 'bar'";

// First, try with selector header

this.registry.registerSubscription(subscribeMessage(sessionId, subscriptionId, destination, selector));

SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create();
Expand All @@ -276,11 +278,34 @@ public void registerSubscriptionWithSelector() throws Exception {
assertEquals(1, actual.size());
assertEquals(Collections.singletonList(subscriptionId), actual.get(sessionId));

// Then without

actual = this.registry.findSubscriptions(createMessage(destination));
assertNotNull(actual);
assertEquals(0, actual.size());
}

@Test
public void registerSubscriptionWithSelectorNotSupported() {
String sessionId = "sess01";
String subscriptionId = "subs01";
String destination = "/foo";
String selector = "headers.foo == 'bar'";

this.registry.setSelectorHeaderName(null);
this.registry.registerSubscription(subscribeMessage(sessionId, subscriptionId, destination, selector));

SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create();
accessor.setDestination(destination);
accessor.setNativeHeader("foo", "bazz");
Message<?> message = MessageBuilder.createMessage("", accessor.getMessageHeaders());

MultiValueMap<String, String> actual = this.registry.findSubscriptions(message);
assertNotNull(actual);
assertEquals(1, actual.size());
assertEquals(Collections.singletonList(subscriptionId), actual.get(sessionId));
}

@Test // SPR-11931
public void registerSubscriptionTwiceAndUnregister() {
this.registry.registerSubscription(subscribeMessage("sess01", "subs01", "/foo"));
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -381,6 +381,10 @@ private RootBeanDefinition registerMessageBroker(Element brokerElement,
String heartbeatValue = simpleBrokerElem.getAttribute("heartbeat");
brokerDef.getPropertyValues().add("heartbeatValue", heartbeatValue);
}
if (simpleBrokerElem.hasAttribute("selector-header")) {
String headerName = simpleBrokerElem.getAttribute("selector-header");
brokerDef.getPropertyValues().add("selectorHeaderName", headerName);
}
}
else if (brokerRelayElem != null) {
String prefix = brokerRelayElem.getAttribute("prefix");
Expand Down
Expand Up @@ -384,6 +384,22 @@
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="selector-header" type="xsd:string">
<xsd:annotation>
<xsd:documentation source="java:org.springframework.messaging.simp.stomp.SimpleBrokerMessageHandler"><![CDATA[
Configure the name of a header that a subscription message can have for
the purpose of filtering messages matched to the subscription. The header
value is expected to be a Spring EL boolean expression to be applied to
the headers of messages matched to the subscription.
For example:
headers.foo == 'bar'
By default this is set to "selector". You can set it to a different
name, or to "" to turn off support for a selector header.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>

<xsd:complexType name="channel">
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,6 +45,7 @@
import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler;
import org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry;
import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.messaging.simp.user.DefaultUserDestinationResolver;
Expand Down Expand Up @@ -199,6 +200,8 @@ public void simpleBroker() throws Exception {
assertNotNull(brokerMessageHandler);
Collection<String> prefixes = brokerMessageHandler.getDestinationPrefixes();
assertEquals(Arrays.asList("/topic", "/queue"), new ArrayList<String>(prefixes));
DefaultSubscriptionRegistry registry = (DefaultSubscriptionRegistry) brokerMessageHandler.getSubscriptionRegistry();
assertEquals("my-selector", registry.getSelectorHeaderName());
assertNotNull(brokerMessageHandler.getTaskScheduler());
assertArrayEquals(new long[] {15000, 15000}, brokerMessageHandler.getHeartbeatValue());

Expand Down
Expand Up @@ -35,7 +35,8 @@

<websocket:stomp-error-handler ref="errorHandler" />

<websocket:simple-broker prefix="/topic, /queue" heartbeat="15000,15000" scheduler="scheduler" />
<websocket:simple-broker prefix="/topic, /queue" selector-header="my-selector"
heartbeat="15000,15000" scheduler="scheduler" />

</websocket:message-broker>

Expand Down

0 comments on commit 246a6db

Please sign in to comment.