forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 0
/
RMQConnectionConfig.java
448 lines (400 loc) · 14.1 KB
/
RMQConnectionConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.rabbitmq.common;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
/**
* Connection Configuration for RMQ.
* If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer,
* Boolean, Boolean, Integer, Integer, Integer, Integer)}
* will be used for initialize the RMQ connection or
* {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String, Integer, Boolean,
* Boolean, Integer, Integer, Integer, Integer)}
* will be used for initialize the RMQ connection
*/
public class RMQConnectionConfig implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class);
private String host;
private Integer port;
private String virtualHost;
private String username;
private String password;
private String uri;
private Integer networkRecoveryInterval;
private Boolean automaticRecovery;
private Boolean topologyRecovery;
private Integer connectionTimeout;
private Integer requestedChannelMax;
private Integer requestedFrameMax;
private Integer requestedHeartbeat;
/**
*
* @param host host name
* @param port port
* @param virtualHost virtual host
* @param username username
* @param password password
* @param networkRecoveryInterval connection recovery interval in milliseconds
* @param automaticRecovery if automatic connection recovery
* @param topologyRecovery if topology recovery
* @param connectionTimeout connection timeout
* @param requestedChannelMax requested maximum channel number
* @param requestedFrameMax requested maximum frame size
* @param requestedHeartbeat requested heartbeat interval
* @throws NullPointerException if host or virtual host or username or password is null
*/
private RMQConnectionConfig(String host, Integer port, String virtualHost, String username, String password,
Integer networkRecoveryInterval, Boolean automaticRecovery,
Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
Integer requestedFrameMax, Integer requestedHeartbeat){
Preconditions.checkNotNull(host, "host can not be null");
Preconditions.checkNotNull(port, "port can not be null");
Preconditions.checkNotNull(virtualHost, "virtualHost can not be null");
Preconditions.checkNotNull(username, "username can not be null");
Preconditions.checkNotNull(password, "password can not be null");
this.host = host;
this.port = port;
this.virtualHost = virtualHost;
this.username = username;
this.password = password;
this.networkRecoveryInterval = networkRecoveryInterval;
this.automaticRecovery = automaticRecovery;
this.topologyRecovery = topologyRecovery;
this.connectionTimeout = connectionTimeout;
this.requestedChannelMax = requestedChannelMax;
this.requestedFrameMax = requestedFrameMax;
this.requestedHeartbeat = requestedHeartbeat;
}
/**
*
* @param uri the connection URI
* @param networkRecoveryInterval connection recovery interval in milliseconds
* @param automaticRecovery if automatic connection recovery
* @param topologyRecovery if topology recovery
* @param connectionTimeout connection timeout
* @param requestedChannelMax requested maximum channel number
* @param requestedFrameMax requested maximum frame size
* @param requestedHeartbeat requested heartbeat interval
* @throws NullPointerException if URI is null
*/
private RMQConnectionConfig(String uri, Integer networkRecoveryInterval, Boolean automaticRecovery,
Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
Integer requestedFrameMax, Integer requestedHeartbeat){
Preconditions.checkNotNull(uri, "Uri can not be null");
this.uri = uri;
this.networkRecoveryInterval = networkRecoveryInterval;
this.automaticRecovery = automaticRecovery;
this.topologyRecovery = topologyRecovery;
this.connectionTimeout = connectionTimeout;
this.requestedChannelMax = requestedChannelMax;
this.requestedFrameMax = requestedFrameMax;
this.requestedHeartbeat = requestedHeartbeat;
}
/** @return the host to use for connections */
public String getHost() {
return host;
}
/** @return the port to use for connections */
public int getPort() {
return port;
}
/**
* Retrieve the virtual host.
* @return the virtual host to use when connecting to the broker
*/
public String getVirtualHost() {
return virtualHost;
}
/**
* Retrieve the user name.
* @return the AMQP user name to use when connecting to the broker
*/
public String getUsername() {
return username;
}
/**
* Retrieve the password.
* @return the password to use when connecting to the broker
*/
public String getPassword() {
return password;
}
/**
* Retrieve the URI.
* @return the connection URI when connecting to the broker
*/
public String getUri() {
return uri;
}
/**
* Returns automatic connection recovery interval in milliseconds.
* @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000
*/
public Integer getNetworkRecoveryInterval() {
return networkRecoveryInterval;
}
/**
* Returns true if automatic connection recovery is enabled, false otherwise
* @return true if automatic connection recovery is enabled, false otherwise
*/
public Boolean isAutomaticRecovery() {
return automaticRecovery;
}
/**
* Returns true if topology recovery is enabled, false otherwise
* @return true if topology recovery is enabled, false otherwise
*/
public Boolean isTopologyRecovery() {
return topologyRecovery;
}
/**
* Retrieve the connection timeout.
* @return the connection timeout, in milliseconds; zero for infinite
*/
public Integer getConnectionTimeout() {
return connectionTimeout;
}
/**
* Retrieve the requested maximum channel number
* @return the initially requested maximum channel number; zero for unlimited
*/
public Integer getRequestedChannelMax() {
return requestedChannelMax;
}
/**
* Retrieve the requested maximum frame size
* @return the initially requested maximum frame size, in octets; zero for unlimited
*/
public Integer getRequestedFrameMax() {
return requestedFrameMax;
}
/**
* Retrieve the requested heartbeat interval.
* @return the initially requested heartbeat interval, in seconds; zero for none
*/
public Integer getRequestedHeartbeat() {
return requestedHeartbeat;
}
/**
*
* @return Connection Factory for RMQ
* @throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException if Malformed URI has been passed
*/
public ConnectionFactory getConnectionFactory() throws URISyntaxException,
NoSuchAlgorithmException, KeyManagementException {
ConnectionFactory factory = new ConnectionFactory();
if (this.uri != null && !this.uri.isEmpty()){
try {
factory.setUri(getUri());
}catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e){
LOG.error("Failed to parse uri {}", e.getMessage());
throw e;
}
} else {
factory.setHost(this.host);
factory.setPort(this.port);
factory.setVirtualHost(this.virtualHost);
factory.setUsername(this.username);
factory.setPassword(this.password);
}
if (this.automaticRecovery != null) {
factory.setAutomaticRecoveryEnabled(this.automaticRecovery);
}
if (this.connectionTimeout != null) {
factory.setConnectionTimeout(this.connectionTimeout);
}
if (this.networkRecoveryInterval != null) {
factory.setNetworkRecoveryInterval(this.networkRecoveryInterval);
}
if (this.requestedHeartbeat != null) {
factory.setRequestedHeartbeat(this.requestedHeartbeat);
}
if (this.topologyRecovery != null) {
factory.setTopologyRecoveryEnabled(this.topologyRecovery);
}
if (this.requestedChannelMax != null) {
factory.setRequestedChannelMax(this.requestedChannelMax);
}
if (this.requestedFrameMax != null) {
factory.setRequestedFrameMax(this.requestedFrameMax);
}
return factory;
}
/**
* The Builder Class for {@link RMQConnectionConfig}
*/
public static class Builder {
private String host;
private Integer port;
private String virtualHost;
private String username;
private String password;
private Integer networkRecoveryInterval;
private Boolean automaticRecovery;
private Boolean topologyRecovery;
private Integer connectionTimeout;
private Integer requestedChannelMax;
private Integer requestedFrameMax;
private Integer requestedHeartbeat;
private String uri;
/**
* Set the target port.
* @param port the default port to use for connections
* @return the Builder
*/
public Builder setPort(int port) {
this.port = port;
return this;
}
/** @param host the default host to use for connections
* @return the Builder
*/
public Builder setHost(String host) {
this.host = host;
return this;
}
/**
* Set the virtual host.
* @param virtualHost the virtual host to use when connecting to the broker
* @return the Builder
*/
public Builder setVirtualHost(String virtualHost) {
this.virtualHost = virtualHost;
return this;
}
/**
* Set the user name.
* @param username the AMQP user name to use when connecting to the broker
* @return the Builder
*/
public Builder setUserName(String username) {
this.username = username;
return this;
}
/**
* Set the password.
* @param password the password to use when connecting to the broker
* @return the Builder
*/
public Builder setPassword(String password) {
this.password = password;
return this;
}
/**
* Convenience method for setting the fields in an AMQP URI: host,
* port, username, password and virtual host. If any part of the
* URI is ommited, the ConnectionFactory's corresponding variable
* is left unchanged.
* @param uri is the AMQP URI containing the data
* @return the Builder
*/
public Builder setUri(String uri) {
this.uri = uri;
return this;
}
/**
* Enables or disables topology recovery
* @param topologyRecovery if true, enables topology recovery
* @return the Builder
*/
public Builder setTopologyRecoveryEnabled(boolean topologyRecovery) {
this.topologyRecovery = topologyRecovery;
return this;
}
/**
* Set the requested heartbeat.
* @param requestedHeartbeat the initially requested heartbeat interval, in seconds; zero for none
* @return the Builder
*/
public Builder setRequestedHeartbeat(int requestedHeartbeat) {
this.requestedHeartbeat = requestedHeartbeat;
return this;
}
/**
* Set the requested maximum frame size
* @param requestedFrameMax initially requested maximum frame size, in octets; zero for unlimited
* @return the Builder
*/
public Builder setRequestedFrameMax(int requestedFrameMax) {
this.requestedFrameMax = requestedFrameMax;
return this;
}
/**
* Set the requested maximum channel number
* @param requestedChannelMax initially requested maximum channel number; zero for unlimited
*/
public Builder setRequestedChannelMax(int requestedChannelMax) {
this.requestedChannelMax = requestedChannelMax;
return this;
}
/**
* Sets connection recovery interval. Default is 5000.
* @param networkRecoveryInterval how long will automatic recovery wait before attempting to reconnect, in ms
* @return the Builder
*/
public Builder setNetworkRecoveryInterval(int networkRecoveryInterval) {
this.networkRecoveryInterval = networkRecoveryInterval;
return this;
}
/**
* Set the connection timeout.
* @param connectionTimeout connection establishment timeout in milliseconds; zero for infinite
* @return the Builder
*/
public Builder setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
return this;
}
/**
* Enables or disables automatic connection recovery
* @param automaticRecovery if true, enables connection recovery
* @return the Builder
*/
public Builder setAutomaticRecovery(boolean automaticRecovery) {
this.automaticRecovery = automaticRecovery;
return this;
}
/**
* The Builder method
* If URI is NULL we use host, port, vHost, username, password combination
* to initialize connection. using {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String,
* Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
*
* else URI will be used to initialize the client connection
* {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
* @return RMQConnectionConfig
*/
public RMQConnectionConfig build(){
if(this.uri != null) {
return new RMQConnectionConfig(this.uri, this.networkRecoveryInterval,
this.automaticRecovery, this.topologyRecovery, this.connectionTimeout, this.requestedChannelMax,
this.requestedFrameMax, this.requestedHeartbeat);
} else {
return new RMQConnectionConfig(this.host, this.port, this.virtualHost, this.username, this.password,
this.networkRecoveryInterval, this.automaticRecovery, this.topologyRecovery,
this.connectionTimeout, this.requestedChannelMax, this.requestedFrameMax, this.requestedHeartbeat);
}
}
}
}