forked from elastic/elasticsearch
/
RemoteClusterConnection.java
740 lines (668 loc) · 35.1 KB
/
RemoteClusterConnection.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
* current node is part of the cluster and it won't receive cluster state updates from the remote cluster. Remote clusters are also not
* fully connected with the current node. From a connection perspective a local cluster forms a bi-directional star network while in the
* remote case we only connect to a subset of the nodes in the cluster in an uni-directional fashion.
*
* This class also handles the discovery of nodes from the remote cluster. The initial list of seed nodes is only used to discover all nodes
* in the remote cluster and connects to all eligible nodes, for details see {@link RemoteClusterService#REMOTE_NODE_ATTRIBUTE}.
*
* In the case of a disconnection, this class will issue a re-connect task to establish at most
* {@link RemoteClusterService#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of
* connections per cluster has been reached.
*/
final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener, Closeable {
private final TransportService transportService;
private final ConnectionManager connectionManager;
private final ConnectionProfile remoteProfile;
private final ConnectedNodes connectedNodes;
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private final ThreadPool threadPool;
private volatile String proxyAddress;
private volatile List<Supplier<DiscoveryNode>> seedNodes;
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
/**
* Creates a new {@link RemoteClusterConnection}
* @param settings the nodes settings object
* @param clusterAlias the configured alias of the cluster to connect to
* @param seedNodes a list of seed nodes to discover eligible nodes from
* @param transportService the local nodes transport service
* @param connectionManager the connection manager to use for this remote connection
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
* @param proxyAddress the proxy address
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
Predicate<DiscoveryNode> nodePredicate, String proxyAddress) {
super(settings);
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.nodePredicate = nodePredicate;
this.clusterAlias = clusterAlias;
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING); // TODO make this configurable?
builder.addConnections(0, // we don't want this to be used for anything else but search
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY);
remoteProfile = builder.build();
connectedNodes = new ConnectedNodes(clusterAlias);
this.seedNodes = Collections.unmodifiableList(seedNodes);
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
.getConcreteSettingForNamespace(clusterAlias).get(settings);
this.connectHandler = new ConnectHandler();
this.threadPool = transportService.threadPool;
this.connectionManager = connectionManager;
connectionManager.addListener(this);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
connectionManager.addListener(transportService);
this.proxyAddress = proxyAddress;
}
private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) {
if (proxyAddress == null || proxyAddress.isEmpty()) {
return node;
} else {
// resovle proxy address lazy here
InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress);
return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node
.getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion());
}
}
/**
* Updates the list of seed nodes for this cluster connection
*/
synchronized void updateSeedNodes(String proxyAddress, List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
this.proxyAddress = proxyAddress;
connectHandler.connect(connectListener);
}
/**
* Updates the skipUnavailable flag that can be dynamically set for each remote cluster
*/
void updateSkipUnavailable(boolean skipUnavailable) {
this.skipUnavailable = skipUnavailable;
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
boolean remove = connectedNodes.remove(node);
if (remove && connectedNodes.size() < maxNumRemoteConnections) {
// try to reconnect and fill up the slot of the disconnected node
connectHandler.forceConnect();
}
}
/**
* Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end.
*/
public void fetchSearchShards(ClusterSearchShardsRequest searchRequest,
ActionListener<ClusterSearchShardsResponse> listener) {
final ActionListener<ClusterSearchShardsResponse> searchShardsListener;
final Consumer<Exception> onConnectFailure;
if (skipUnavailable) {
onConnectFailure = (exception) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY);
searchShardsListener = ActionListener.wrap(listener::onResponse, (e) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY));
} else {
onConnectFailure = listener::onFailure;
searchShardsListener = listener;
}
// in case we have no connected nodes we try to connect and if we fail we either notify the listener or not depending on
// the skip_unavailable setting
ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, searchShardsListener), onConnectFailure));
}
/**
* Ensures that this cluster is connected. If the cluster is connected this operation
* will invoke the listener immediately.
*/
public void ensureConnected(ActionListener<Void> voidActionListener) {
if (connectedNodes.size() == 0) {
connectHandler.connect(voidActionListener);
} else {
voidActionListener.onResponse(null);
}
}
private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
final ActionListener<ClusterSearchShardsResponse> listener) {
final DiscoveryNode node = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(node);
transportService.sendRequest(connection, ClusterSearchShardsAction.NAME, searchShardsRequest, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterSearchShardsResponse>() {
@Override
public ClusterSearchShardsResponse newInstance() {
return new ClusterSearchShardsResponse();
}
@Override
public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
listener.onResponse(clusterSearchShardsResponse);
}
@Override
public void handleException(TransportException e) {
listener.onFailure(e);
}
@Override
public String executor() {
return ThreadPool.Names.SEARCH;
}
});
}
/**
* Collects all nodes on the connected cluster and returns / passes a nodeID to {@link DiscoveryNode} lookup function
* that returns <code>null</code> if the node ID is not found.
*/
void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
Runnable runnable = () -> {
final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.nodes(true);
request.local(true); // run this on the node that gets the request it's as good as any other
final DiscoveryNode node = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(node);
transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterStateResponse>() {
@Override
public ClusterStateResponse read(StreamInput in) throws IOException {
ClusterStateResponse response = new ClusterStateResponse();
response.readFrom(in);
return response;
}
@Override
public void handleResponse(ClusterStateResponse response) {
DiscoveryNodes nodes = response.getState().nodes();
listener.onResponse(nodes::get);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
};
try {
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
// this will cause some back pressure on the search end and eventually will cause rejections but that's fine
// we can't proceed with a search on a cluster level.
// in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the
// caller end since they provide the listener.
ensureConnected(ActionListener.wrap((x) -> runnable.run(), listener::onFailure));
} catch (Exception ex) {
listener.onFailure(ex);
}
}
/**
* Returns a connection to the remote cluster, preferably a direct connection to the provided {@link DiscoveryNode}.
* If such node is not connected, the returned connection will be a proxy connection that redirects to it.
*/
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
if (connectionManager.nodeConnected(remoteClusterNode)) {
return connectionManager.getConnection(remoteClusterNode);
}
DiscoveryNode discoveryNode = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(discoveryNode);
return new ProxyConnection(connection, remoteClusterNode);
}
static final class ProxyConnection implements Transport.Connection {
private final Transport.Connection proxyConnection;
private final DiscoveryNode targetNode;
private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) {
this.proxyConnection = proxyConnection;
this.targetNode = targetNode;
}
@Override
public DiscoveryNode getNode() {
return targetNode;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
TransportActionProxy.wrapRequest(targetNode, request), options);
}
@Override
public boolean sendPing() {
return proxyConnection.sendPing();
}
@Override
public void close() {
assert false: "proxy connections must not be closed";
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
proxyConnection.addCloseListener(listener);
}
@Override
public boolean isClosed() {
return proxyConnection.isClosed();
}
@Override
public Version getVersion() {
return proxyConnection.getVersion();
}
}
Transport.Connection getConnection() {
return connectionManager.getConnection(getAnyConnectedNode());
}
@Override
public void close() throws IOException {
IOUtils.close(connectHandler, connectionManager);
}
public boolean isClosed() {
return connectHandler.isClosed();
}
/**
* The connect handler manages node discovery and the actual connect to the remote cluster.
* There is at most one connect job running at any time. If such a connect job is triggered
* while another job is running the provided listeners are queued and batched up until the current running job returns.
*
* The handler has a built-in queue that can hold up to 100 connect attempts and will reject requests once the queue is full.
* In a scenario when a remote cluster becomes unavailable we will queue requests up but if we can't connect quick enough
* we will just reject the connect trigger which will lead to failing searches.
*/
private class ConnectHandler implements Closeable {
private final Semaphore running = new Semaphore(1);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final BlockingQueue<ActionListener<Void>> queue = new ArrayBlockingQueue<>(100);
private final CancellableThreads cancellableThreads = new CancellableThreads();
/**
* Triggers a connect round iff there are pending requests queued up and if there is no
* connect round currently running.
*/
void maybeConnect() {
connect(null);
}
/**
* Triggers a connect round unless there is one running already. If there is a connect round running, the listener will either
* be queued or rejected and failed.
*/
void connect(ActionListener<Void> connectListener) {
connect(connectListener, false);
}
/**
* Triggers a connect round unless there is one already running. In contrast to {@link #maybeConnect()} will this method also
* trigger a connect round if there is no listener queued up.
*/
void forceConnect() {
connect(null, true);
}
private void connect(ActionListener<Void> connectListener, boolean forceRun) {
final boolean runConnect;
final Collection<ActionListener<Void>> toNotify;
final ActionListener<Void> listener = connectListener == null ? null :
ContextPreservingActionListener.wrapPreservingContext(connectListener, threadPool.getThreadContext());
synchronized (queue) {
if (listener != null && queue.offer(listener) == false) {
listener.onFailure(new RejectedExecutionException("connect queue is full"));
return;
}
if (forceRun == false && queue.isEmpty()) {
return;
}
runConnect = running.tryAcquire();
if (runConnect) {
toNotify = new ArrayList<>();
queue.drainTo(toNotify);
if (closed.get()) {
running.release();
ActionListener.onFailure(toNotify, new AlreadyClosedException("connect handler is already closed"));
return;
}
} else {
toNotify = Collections.emptyList();
}
}
if (runConnect) {
forkConnect(toNotify);
}
}
private void forkConnect(final Collection<ActionListener<Void>> toNotify) {
ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
executor.submit(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
synchronized (queue) {
running.release();
}
try {
ActionListener.onFailure(toNotify, e);
} finally {
maybeConnect();
}
}
@Override
protected void doRun() {
ActionListener<Void> listener = ActionListener.wrap((x) -> {
synchronized (queue) {
running.release();
}
try {
ActionListener.onResponse(toNotify, x);
} finally {
maybeConnect();
}
}, (e) -> {
synchronized (queue) {
running.release();
}
try {
ActionListener.onFailure(toNotify, e);
} finally {
maybeConnect();
}
});
collectRemoteNodes(seedNodes.iterator(), transportService, connectionManager, listener);
}
});
}
private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
final TransportService transportService, final ConnectionManager manager, ActionListener<Void> listener) {
if (Thread.currentThread().isInterrupted()) {
listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
}
try {
if (seedNodes.hasNext()) {
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = maybeAddProxyAddress(proxyAddress, seedNodes.next().get());
logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode,
proxyAddress);
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = manager.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
boolean success = false;
try {
try {
handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
(c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get()));
} catch (IllegalStateException ex) {
logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected " +
"cluster name {}", connection.getNode(), remoteClusterName.get()), ex);
throw ex;
}
final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode());
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
manager.connectToNode(handshakeNode, remoteProfile, transportService.connectionValidator(handshakeNode));
if (remoteClusterName.get() == null) {
assert handshakeResponse.getClusterName().value() != null;
remoteClusterName.set(handshakeResponse.getClusterName());
}
connectedNodes.add(handshakeNode);
}
ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.nodes(true);
// here we pass on the connection since we can only close it once the sendRequest returns otherwise
// due to the async nature (it will return before it's actually sent) this can cause the request to fail
// due to an already closed connection.
ThreadPool threadPool = transportService.getThreadPool();
ThreadContext threadContext = threadPool.getThreadContext();
TransportService.ContextRestoreResponseHandler<ClusterStateResponse> responseHandler = new TransportService
.ContextRestoreResponseHandler<>(threadContext.newRestorableContext(false),
new SniffClusterStateResponseHandler(connection, listener, seedNodes,
cancellableThreads));
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we stash any context here since this is an internal execution and should not leak any
// existing context information.
threadContext.markAsSystemContext();
transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
responseHandler);
}
success = true;
} finally {
if (success == false) {
connection.close();
}
}
});
} else {
listener.onFailure(new IllegalStateException("no seed node left"));
}
} catch (CancellableThreads.ExecutionCancelledException ex) {
logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), ex);
listener.onFailure(ex); // we got canceled - fail the listener and step out
} catch (ConnectTransportException | IOException | IllegalStateException ex) {
// ISE if we fail the handshake with an version incompatible node
if (seedNodes.hasNext()) {
logger.debug(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed moving to next node",
clusterAlias), ex);
collectRemoteNodes(seedNodes, transportService, manager, listener);
} else {
logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), ex);
listener.onFailure(ex);
}
}
}
@Override
public void close() throws IOException {
try {
if (closed.compareAndSet(false, true)) {
cancellableThreads.cancel("connect handler is closed");
running.acquire(); // acquire the semaphore to ensure all connections are closed and all thread joined
running.release();
maybeConnect(); // now go and notify pending listeners
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
final boolean isClosed() {
return closed.get();
}
/* This class handles the _state response from the remote cluster when sniffing nodes to connect to */
private class SniffClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {
private final Transport.Connection connection;
private final ActionListener<Void> listener;
private final Iterator<Supplier<DiscoveryNode>> seedNodes;
private final CancellableThreads cancellableThreads;
SniffClusterStateResponseHandler(Transport.Connection connection, ActionListener<Void> listener,
Iterator<Supplier<DiscoveryNode>> seedNodes,
CancellableThreads cancellableThreads) {
this.connection = connection;
this.listener = listener;
this.seedNodes = seedNodes;
this.cancellableThreads = cancellableThreads;
}
@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
}
@Override
public void handleResponse(ClusterStateResponse response) {
try {
if (remoteClusterName.get() == null) {
assert response.getClusterName().value() != null;
remoteClusterName.set(response.getClusterName());
}
try (Closeable theConnection = connection) { // the connection is unused - see comment in #collectRemoteNodes
// we have to close this connection before we notify listeners - this is mainly needed for test correctness
// since if we do it afterwards we might fail assertions that check if all high level connections are closed.
// from a code correctness perspective we could also close it afterwards. This try/with block will
// maintain the possibly exceptions thrown from within the try block and suppress the ones that are possible thrown
// by closing the connection
cancellableThreads.executeIO(() -> {
DiscoveryNodes nodes = response.getState().nodes();
Iterable<DiscoveryNode> nodesIter = nodes.getNodes()::valuesIt;
for (DiscoveryNode n : nodesIter) {
DiscoveryNode node = maybeAddProxyAddress(proxyAddress, n);
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
try {
connectionManager.connectToNode(node, remoteProfile,
transportService.connectionValidator(node)); // noop if node is connected
connectedNodes.add(node);
} catch (ConnectTransportException | IllegalStateException ex) {
// ISE if we fail the handshake with an version incompatible node
// fair enough we can't connect just move on
logger.debug(() -> new ParameterizedMessage("failed to connect to node {}", node), ex);
}
}
}
});
}
listener.onResponse(null);
} catch (CancellableThreads.ExecutionCancelledException ex) {
listener.onFailure(ex); // we got canceled - fail the listener and step out
} catch (Exception ex) {
logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), ex);
collectRemoteNodes(seedNodes, transportService, connectionManager, listener);
}
}
@Override
public void handleException(TransportException exp) {
logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), exp);
try {
IOUtils.closeWhileHandlingException(connection);
} finally {
// once the connection is closed lets try the next node
collectRemoteNodes(seedNodes, transportService, connectionManager, listener);
}
}
@Override
public String executor() {
return ThreadPool.Names.MANAGEMENT;
}
}
}
boolean assertNoRunningConnections() { // for testing only
assert connectHandler.running.availablePermits() == 1;
return true;
}
boolean isNodeConnected(final DiscoveryNode node) {
return connectedNodes.contains(node);
}
DiscoveryNode getAnyConnectedNode() {
return connectedNodes.getAny();
}
void addConnectedNode(DiscoveryNode node) {
connectedNodes.add(node);
}
/**
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
*/
public RemoteConnectionInfo getConnectionInfo() {
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect
(Collectors.toList());
TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(),
initialConnectionTimeout, skipUnavailable);
}
int getNumNodesConnected() {
return connectedNodes.size();
}
private static final class ConnectedNodes {
private final Set<DiscoveryNode> nodeSet = new HashSet<>();
private final String clusterAlias;
private Iterator<DiscoveryNode> currentIterator = null;
private ConnectedNodes(String clusterAlias) {
this.clusterAlias = clusterAlias;
}
public synchronized DiscoveryNode getAny() {
ensureIteratorAvailable();
if (currentIterator.hasNext()) {
return currentIterator.next();
} else {
throw new IllegalStateException("No node available for cluster: " + clusterAlias);
}
}
synchronized boolean remove(DiscoveryNode node) {
final boolean setRemoval = nodeSet.remove(node);
if (setRemoval) {
currentIterator = null;
}
return setRemoval;
}
synchronized boolean add(DiscoveryNode node) {
final boolean added = nodeSet.add(node);
if (added) {
currentIterator = null;
}
return added;
}
synchronized int size() {
return nodeSet.size();
}
synchronized boolean contains(DiscoveryNode node) {
return nodeSet.contains(node);
}
private synchronized void ensureIteratorAvailable() {
if (currentIterator == null) {
currentIterator = nodeSet.iterator();
} else if (currentIterator.hasNext() == false && nodeSet.isEmpty() == false) {
// iterator rollover
currentIterator = nodeSet.iterator();
}
}
}
ConnectionManager getConnectionManager() {
return connectionManager;
}
}