forked from eugenp/tutorials
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConsumerUnitTest.java
48 lines (37 loc) · 1.86 KB
/
ConsumerUnitTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.baeldung;
import java.io.IOException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
public class ConsumerUnitTest {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test-topic";
private static final String SUBSCRIPTION_NAME = "test-subscription";
public static void main(String[] args) throws IOException {
// Create a Pulsar client instance. A single instance can be shared across many
// producers and consumer within the same application
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
//Configure consumer specific settings.
Consumer<byte[]> consumer = client.newConsumer()
.topic(TOPIC_NAME)
// Allow multiple consumers to attach to the same subscription
// and get messages dispatched as a queue
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(SUBSCRIPTION_NAME)
.subscribe();
// Once the consumer is created, it can be used for the entire application lifecycle
System.out.println("Created consumer for the topic "+ TOPIC_NAME);
do {
// Wait until a message is available
Message<byte[]> msg = consumer.receive();
// Extract the message as a printable string and then log
String content = new String(msg.getData());
System.out.println("Received message '"+content+"' with ID "+msg.getMessageId());
// Acknowledge processing of the message so that it can be deleted
consumer.acknowledge(msg);
} while (true);
}
}