Skip to content

Commit b0480c6

Browse files
committed
Add headers example
1 parent a155bb4 commit b0480c6

File tree

6 files changed

+214
-0
lines changed

6 files changed

+214
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package com.devdevx.examples.springbootrabbitmq.headers.config;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.amqp.core.Binding;
6+
import org.springframework.amqp.core.BindingBuilder;
7+
import org.springframework.amqp.core.HeadersExchange;
8+
import org.springframework.amqp.core.Queue;
9+
import org.springframework.beans.factory.annotation.Qualifier;
10+
import org.springframework.beans.factory.annotation.Value;
11+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
12+
import org.springframework.context.annotation.Bean;
13+
import org.springframework.context.annotation.Configuration;
14+
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
18+
@Configuration
19+
@ConditionalOnProperty(name = "app.rabbitmq.example", havingValue = "headers")
20+
public class HeadersRmqConfig {
21+
22+
private static final Logger log = LoggerFactory.getLogger(HeadersRmqConfig.class);
23+
24+
@Value("${app.rabbitmq.headers.exchange}")
25+
private String exchangeName;
26+
27+
@Value("${app.rabbitmq.headers.queue-fast}")
28+
private String queueFast;
29+
30+
@Value("${app.rabbitmq.headers.queue-slow}")
31+
private String queueSlow;
32+
33+
@Bean
34+
HeadersExchange exchange() {
35+
log.info("Creating exchange: {}", exchangeName);
36+
return new HeadersExchange(exchangeName);
37+
}
38+
39+
@Bean
40+
Queue queueFast() {
41+
log.info("Creating queue: {}", queueFast);
42+
return new Queue(queueFast, false);
43+
}
44+
45+
@Bean
46+
Queue queueSlow() {
47+
log.info("Creating queue: {}", queueSlow);
48+
return new Queue(queueSlow, false);
49+
}
50+
51+
@Bean
52+
Binding bindingFast(@Qualifier("queueFast") Queue queue, HeadersExchange exchange) {
53+
Map<String, Object> absolutePriority = new HashMap<>();
54+
absolutePriority.put("priority", "high");
55+
absolutePriority.put("client", "premium");
56+
log.info("Binding queue '{}' to the exchange '{}' with all match '{}'", queueFast, exchangeName, absolutePriority);
57+
return BindingBuilder.bind(queue).to(exchange).whereAll(absolutePriority).match();
58+
}
59+
60+
@Bean
61+
Binding bindingSlow(@Qualifier("queueSlow") Queue queue, HeadersExchange exchange) {
62+
Map<String, Object> otherPriority = new HashMap<>();
63+
otherPriority.put("priority", "normal");
64+
otherPriority.put("client", "basic");
65+
log.info("Binding queue '{}' to the exchange '{}' with any match '{}'", queueSlow, exchangeName, otherPriority);
66+
return BindingBuilder.bind(queue).to(exchange).whereAny(otherPriority).match();
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.devdevx.examples.springbootrabbitmq.headers.config;
2+
3+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
4+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
5+
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
6+
import org.springframework.beans.factory.annotation.Value;
7+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.context.annotation.Profile;
11+
12+
@Profile("producer")
13+
@Configuration
14+
@ConditionalOnProperty(name = "app.rabbitmq.example", havingValue = "headers")
15+
public class HeadersRmqProduConfig {
16+
17+
@Value("${app.rabbitmq.headers.exchange}")
18+
private String exchangeName;
19+
20+
@Bean
21+
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
22+
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
23+
rabbitTemplate.setMessageConverter(converter);
24+
rabbitTemplate.setExchange(exchangeName);
25+
return rabbitTemplate;
26+
}
27+
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.devdevx.examples.springbootrabbitmq.headers.messaging;
2+
3+
public class HeadersMessage {
4+
private Integer id;
5+
private String message;
6+
7+
public HeadersMessage() {
8+
}
9+
10+
public HeadersMessage(Integer id, String message) {
11+
this.id = id;
12+
this.message = message;
13+
}
14+
15+
public Integer getId() {
16+
return id;
17+
}
18+
19+
public void setId(Integer id) {
20+
this.id = id;
21+
}
22+
23+
public String getMessage() {
24+
return message;
25+
}
26+
27+
public void setMessage(String message) {
28+
this.message = message;
29+
}
30+
31+
@Override
32+
public String toString() {
33+
return "HeadersMessage{" +
34+
"id=" + id +
35+
", message='" + message + '\'' +
36+
'}';
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.devdevx.examples.springbootrabbitmq.headers.messaging;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
6+
import org.springframework.beans.factory.annotation.Value;
7+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
8+
import org.springframework.context.annotation.Profile;
9+
import org.springframework.stereotype.Component;
10+
11+
@Profile("consumer")
12+
@Component
13+
@ConditionalOnProperty(name = "app.rabbitmq.example", havingValue = "headers")
14+
public class HeadersMessageReceiver {
15+
16+
private static final Logger log = LoggerFactory.getLogger(HeadersMessageReceiver.class);
17+
18+
@Value("${app.rabbitmq.headers.queue-slow}")
19+
private String queueSlow;
20+
21+
@Value("${app.rabbitmq.headers.queue-fast}")
22+
private String queueFast;
23+
24+
@RabbitListener(queues = "${app.rabbitmq.headers.queue-slow}")
25+
public void receiveFast(HeadersMessage message) {
26+
log.info("Message received on queue '{}' : {}", queueSlow, message);
27+
}
28+
29+
@RabbitListener(queues = "${app.rabbitmq.headers.queue-fast}")
30+
public void receiveSlow(HeadersMessage message) {
31+
log.info("Message received on queue '{}' : {}", queueFast, message);
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.devdevx.examples.springbootrabbitmq.headers.messaging;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
6+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
7+
import org.springframework.context.annotation.Profile;
8+
import org.springframework.scheduling.annotation.Scheduled;
9+
import org.springframework.stereotype.Component;
10+
11+
import java.util.Random;
12+
13+
@Profile("producer")
14+
@Component
15+
@ConditionalOnProperty(name = "app.rabbitmq.example", havingValue = "headers")
16+
public class HeadersMessageSender {
17+
18+
private static final Logger log = LoggerFactory.getLogger(HeadersMessageSender.class);
19+
20+
private RabbitTemplate rabbitTemplate;
21+
22+
private int id = 0;
23+
private Random random = new Random();
24+
private String[] priorities = {"high", "normal"};
25+
private String[] clients = {"premium", "basic"};
26+
27+
public HeadersMessageSender(RabbitTemplate rabbitTemplate) {
28+
this.rabbitTemplate = rabbitTemplate;
29+
}
30+
31+
@Scheduled(fixedDelayString = "${app.rabbitmq.delay-ms}")
32+
public void send() {
33+
String priority = priorities[random.nextInt(priorities.length)];
34+
String client = clients[random.nextInt(clients.length)];
35+
HeadersMessage message = new HeadersMessage(id++, priority + " " + client);
36+
log.info("Sending message: {} with headers: priority='{}', client='{}'", message, priority, client);
37+
rabbitTemplate.convertAndSend(message, m -> {
38+
m.getMessageProperties().getHeaders().put("priority", priority);
39+
m.getMessageProperties().getHeaders().put("client", client);
40+
return m;
41+
});
42+
}
43+
}

src/main/resources/application.properties

+4
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,8 @@ app.rabbitmq.direct.exchange=direct-exchange
99
app.rabbitmq.direct.queue=direct-queue
1010
app.rabbitmq.direct.routing-key=direct-routing-key
1111

12+
app.rabbitmq.headers.exchange=headers-exchange
13+
app.rabbitmq.headers.queue-fast=queue-fast
14+
app.rabbitmq.headers.queue-slow=queue-slow
15+
1216
app.rabbitmq.delay-ms=10000

0 commit comments

Comments
 (0)