Skip to content

Commit

Permalink
[FLINK-4025] RMQ Streaming: Possibility to customize queue
Browse files Browse the repository at this point in the history
This patch adds the possibilty for th user of the RabbitMQ
Streaming Connector to customize the queue which is used. There
are use-cases in which you want to set custom parameters for the
queue (i.e. TTL of the messages if Flink reboots) or the
possibility to bind the queue to an exchange afterwards.

The commit doesn't change the actual behaviour but makes it
possible for users to override the newly create `setupQueue`
method and cutomize their implementation. This was not possible
before.

This closes apache#2073
  • Loading branch information
Dominik Bruhn authored and rmetzger committed Jun 7, 2016
1 parent 7160a68 commit 52c11f1
Showing 1 changed file with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
private final Integer port;
private final String username;
private final String password;
private final String queueName;
protected final String queueName;
private final boolean usesCorrelationId;
protected DeserializationSchema<OUT> schema;

Expand Down Expand Up @@ -177,6 +177,15 @@ protected ConnectionFactory setupConnectionFactory() {
return new ConnectionFactory();
}

/**
* Sets up the queue. The default implementation just declares the queue. The user may override
* this method to have a custom setup for the queue (i.e. binding the queue to an exchange or
* defining custom queue parameters)
*/
protected void setupQueue() throws IOException {
channel.queueDeclare(queueName, true, false, false, null);
}

/**
* Initializes the connection to RMQ.
*/
Expand All @@ -195,7 +204,7 @@ private void initializeConnection() {
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(queueName, true, false, false, null);
setupQueue();
consumer = new QueueingConsumer(channel);

RuntimeContext runtimeContext = getRuntimeContext();
Expand Down

0 comments on commit 52c11f1

Please sign in to comment.