Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Rework createReconnectingClient()
We seem to have competing interests going on, where we want some
information from the first connection attempt propagated outwards.

JIRA: NETCONF-784
Change-Id: I232d523b60c36de85fea909c6405b1d7cac39c57
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
  • Loading branch information
rovarga committed Sep 15, 2021
1 parent ba423ca commit 86ea92e
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 35 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.opendaylight.netconf.client.NetconfClientSession;
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
import org.opendaylight.netconf.nettyutil.ReconnectFuture;
import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
Expand Down Expand Up @@ -92,18 +93,16 @@ public Future<NetconfClientSession> createClient(final NetconfClientConfiguratio
}

@Override
public Future<Void> createReconnectingClient(final NetconfReconnectingClientConfiguration clientConfiguration) {
return activateChannel(clientConfiguration);
public ReconnectFuture createReconnectingClient(final NetconfReconnectingClientConfiguration clientConfiguration) {
return new SingleReconnectFuture(eventExecutor, activateChannel(clientConfiguration));
}

private <V> Future<V> activateChannel(final NetconfClientConfiguration conf) {
private Future<NetconfClientSession> activateChannel(final NetconfClientConfiguration conf) {
final InetSocketAddress remoteAddr = conf.getAddress();
final CallHomeMountSessionContext context = getSessionManager().getByAddress(remoteAddr);
LOG.info("Activating NETCONF channel for ip {} device context {}", remoteAddr, context);
if (context == null) {
return new FailedFuture<>(eventExecutor, new NullPointerException());
}
return context.activateNetconfChannel(conf.getSessionListener());
return context == null ? new FailedFuture<>(eventExecutor, new NullPointerException())
: context.activateNetconfChannel(conf.getSessionListener());
}

void createTopology() {
Expand Down
Expand Up @@ -77,9 +77,8 @@ Node getConfigNode() {
.build();
}

@SuppressWarnings("unchecked")
<V> Promise<V> activateNetconfChannel(final NetconfClientSessionListener sessionListener) {
return (Promise<V>) activator.activate(wrap(sessionListener));
Promise<NetconfClientSession> activateNetconfChannel(final NetconfClientSessionListener sessionListener) {
return activator.activate(wrap(sessionListener));
}

@Override
Expand Down
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.netconf.callhome.mount;

import static java.util.Objects.requireNonNull;

import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import org.opendaylight.netconf.client.NetconfClientSession;
import org.opendaylight.netconf.nettyutil.ReconnectFuture;
import org.opendaylight.yangtools.yang.common.Empty;

final class SingleReconnectFuture extends DefaultPromise<Empty> implements ReconnectFuture {
private final Future<NetconfClientSession> sessionFuture;

SingleReconnectFuture(final EventExecutor eventExecutor, final Future<NetconfClientSession> sessionFuture) {
super(eventExecutor);
this.sessionFuture = requireNonNull(sessionFuture);
sessionFuture.addListener(future -> {
if (!isDone()) {
if (future.isCancelled()) {
cancel(false);
} else if (future.isSuccess()) {
setSuccess(Empty.getInstance());
} else {
setFailure(future.cause());
}
}
});
}

@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
if (super.cancel(mayInterruptIfRunning)) {
if (!sessionFuture.isDone()) {
sessionFuture.cancel(mayInterruptIfRunning);
}
return true;
}
return false;
}

@Override
public Future<?> firstSessionFuture() {
return sessionFuture;
}
}
Expand Up @@ -10,6 +10,7 @@
import io.netty.util.concurrent.Future;
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
import org.opendaylight.netconf.nettyutil.ReconnectFuture;

public interface NetconfClientDispatcher {

Expand All @@ -22,5 +23,5 @@ public interface NetconfClientDispatcher {
*/
Future<NetconfClientSession> createClient(NetconfClientConfiguration clientConfiguration);

Future<Void> createReconnectingClient(NetconfReconnectingClientConfiguration clientConfiguration);
ReconnectFuture createReconnectingClient(NetconfReconnectingClientConfiguration clientConfiguration);
}
Expand Up @@ -20,6 +20,7 @@
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
import org.opendaylight.netconf.nettyutil.AbstractNetconfDispatcher;
import org.opendaylight.netconf.nettyutil.ReconnectFuture;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
Expand Down Expand Up @@ -64,7 +65,7 @@ public Future<NetconfClientSession> createClient(final NetconfClientConfiguratio
}

@Override
public Future<Void> createReconnectingClient(final NetconfReconnectingClientConfiguration clientConfiguration) {
public ReconnectFuture createReconnectingClient(final NetconfReconnectingClientConfiguration clientConfiguration) {
switch (clientConfiguration.getProtocol()) {
case TCP:
return createReconnectingTcpClient(clientConfiguration);
Expand All @@ -84,7 +85,7 @@ private Future<NetconfClientSession> createTcpClient(final NetconfClientConfigur
currentConfiguration.getSessionListener()).initialize(ch, promise));
}

private Future<Void> createReconnectingTcpClient(
private ReconnectFuture createReconnectingTcpClient(
final NetconfReconnectingClientConfiguration currentConfiguration) {
LOG.debug("Creating reconnecting TCP client with configuration: {}", currentConfiguration);
final TcpClientChannelInitializer init =
Expand All @@ -103,7 +104,7 @@ private Future<NetconfClientSession> createSshClient(final NetconfClientConfigur
currentConfiguration.getSshClient()).initialize(ch, sessionPromise));
}

private Future<Void> createReconnectingSshClient(
private ReconnectFuture createReconnectingSshClient(
final NetconfReconnectingClientConfiguration currentConfiguration) {
LOG.debug("Creating reconnecting SSH client with configuration: {}", currentConfiguration);
final SshClientChannelInitializer init = new SshClientChannelInitializer(currentConfiguration.getAuthHandler(),
Expand All @@ -122,7 +123,7 @@ private Future<NetconfClientSession> createTlsClient(final NetconfClientConfigur
.initialize(ch, sessionPromise));
}

private Future<Void> createReconnectingTlsClient(
private ReconnectFuture createReconnectingTlsClient(
final NetconfReconnectingClientConfiguration currentConfiguration) {
LOG.debug("Creating reconnecting TLS client with configuration: {}", currentConfiguration);
final TlsClientChannelInitializer init = new TlsClientChannelInitializer(
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
import org.opendaylight.netconf.nettyutil.ReconnectFuture;
import org.opendaylight.netconf.nettyutil.ReconnectStrategy;
import org.opendaylight.netconf.nettyutil.ReconnectStrategyFactory;
import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
Expand Down Expand Up @@ -89,8 +90,8 @@ public void testNetconfClientDispatcherImpl() throws Exception {
Future<NetconfClientSession> sshSession = dispatcher.createClient(cfg);
Future<NetconfClientSession> tcpSession = dispatcher.createClient(cfg2);

Future<Void> sshReconn = dispatcher.createReconnectingClient(cfg);
final Future<Void> tcpReconn = dispatcher.createReconnectingClient(cfg2);
ReconnectFuture sshReconn = dispatcher.createReconnectingClient(cfg);
final ReconnectFuture tcpReconn = dispatcher.createReconnectingClient(cfg2);

assertNotNull(sshSession);
assertNotNull(tcpSession);
Expand All @@ -109,7 +110,7 @@ public void testNetconfClientDispatcherImpl() throws Exception {
.withSslHandlerFactory(sslHandlerFactory).build();

Future<NetconfClientSession> tlsSession = dispatcher.createClient(cfg3);
Future<Void> tlsReconn = dispatcher.createReconnectingClient(cfg3);
ReconnectFuture tlsReconn = dispatcher.createReconnectingClient(cfg3);

assertNotNull(tlsSession);
assertNotNull(tlsReconn);
Expand Down
Expand Up @@ -216,7 +216,7 @@ protected void initChannel(final SocketChannel ch) {
* @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
* success is never reported, only failure when it runs out of reconnection attempts.
*/
protected Future<Void> createReconnectingClient(final InetSocketAddress address,
protected ReconnectFuture createReconnectingClient(final InetSocketAddress address,
final ReconnectStrategyFactory connectStrategyFactory, final PipelineInitializer<S> initializer) {
final Bootstrap b = new Bootstrap();

Expand Down
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.netconf.nettyutil;

import com.google.common.annotations.Beta;
import io.netty.util.concurrent.Future;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.yangtools.yang.common.Empty;

/**
* A future representing the task of reconnecting of a certain channel. This future never completes successfully, it
* either fails when the underlying strategy gives up, or when it is cancelled. It additionally exposes an additional
* future, which completes when the session is established for the first time.
*/
@Beta
public interface ReconnectFuture extends Future<Empty> {
/**
* Return a Future which completes when the first session is established.
*
* @return First session establishment future
*/
@NonNull Future<?> firstSessionFuture();
}
Expand Up @@ -15,25 +15,28 @@
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
import org.opendaylight.netconf.api.NetconfSession;
import org.opendaylight.netconf.api.NetconfSessionListener;
import org.opendaylight.netconf.nettyutil.AbstractNetconfDispatcher.PipelineInitializer;
import org.opendaylight.yangtools.yang.common.Empty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
final class ReconnectPromise<S extends NetconfSession, L extends NetconfSessionListener<? super S>>
extends DefaultPromise<Void> {
extends DefaultPromise<Empty> implements ReconnectFuture {
private static final Logger LOG = LoggerFactory.getLogger(ReconnectPromise.class);

private final AbstractNetconfDispatcher<S, L> dispatcher;
private final InetSocketAddress address;
private final ReconnectStrategyFactory strategyFactory;
private final Bootstrap bootstrap;
private final PipelineInitializer<S> initializer;
private final Promise<Empty> firstSessionFuture;
/**
* Channel handler that responds to channelInactive event and reconnects the session unless the promise is
* cancelled.
Expand Down Expand Up @@ -66,6 +69,7 @@ public void channelInactive(final ChannelHandlerContext ctx) {
final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
final Bootstrap bootstrap, final PipelineInitializer<S> initializer) {
super(executor);
this.firstSessionFuture = new DefaultPromise<>(executor);
this.bootstrap = requireNonNull(bootstrap);
this.initializer = requireNonNull(initializer);
this.dispatcher = requireNonNull(dispatcher);
Expand All @@ -76,12 +80,18 @@ public void channelInactive(final ChannelHandlerContext ctx) {
@Override
public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
if (super.cancel(mayInterruptIfRunning)) {
firstSessionFuture.cancel(mayInterruptIfRunning);
pending.cancel(mayInterruptIfRunning);
return true;
}
return false;
}

@Override
public Future<?> firstSessionFuture() {
return firstSessionFuture;
}

synchronized void connect() {
lockedConnect();
}
Expand All @@ -104,10 +114,12 @@ private void lockedConnect() {
channel.pipeline().addLast(inboundHandler);
});

pending.addListener(future -> {
if (!future.isSuccess() && !isDone()) {
setFailure(future.cause());
}
});
if (!firstSessionFuture.isDone()) {
pending.addListener(future -> {
if (!future.isSuccess() && !firstSessionFuture.isDone()) {
firstSessionFuture.setFailure(future.cause());
}
});
}
}
}
Expand Up @@ -37,7 +37,6 @@
import com.google.common.util.concurrent.SettableFuture;
import com.typesafe.config.ConfigFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.File;
import java.util.AbstractMap.SimpleEntry;
Expand Down Expand Up @@ -99,6 +98,7 @@
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
import org.opendaylight.netconf.nettyutil.ReconnectFuture;
import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
import org.opendaylight.netconf.sal.connect.impl.DefaultSchemaResourceManager;
Expand Down Expand Up @@ -249,7 +249,7 @@ public void setUp() throws Exception {

yangNodeInstanceId = bindingToNormalized.toYangInstanceIdentifier(NODE_INSTANCE_ID);

doReturn(mock(Future.class)).when(mockClientDispatcher).createReconnectingClient(any());
doReturn(mock(ReconnectFuture.class)).when(mockClientDispatcher).createReconnectingClient(any());

LOG.info("****** Setup complete");
}
Expand Down
Expand Up @@ -64,7 +64,7 @@ public class NetconfDeviceCommunicator
private NetconfClientSession currentSession;

private final SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
private Future<?> initFuture;
private Future<?> taskFuture;

// isSessionClosing indicates a close operation on the session is issued and
// tearDown will surely be called later to finish the close.
Expand Down Expand Up @@ -150,17 +150,26 @@ public void onSessionUp(final NetconfClientSession session) {
*/
public ListenableFuture<NetconfDeviceCapabilities> initializeRemoteConnection(
final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) {

final Future<?> connectFuture;
if (config instanceof NetconfReconnectingClientConfiguration) {
initFuture = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
// FIXME: This is weird. If I understand it correctly we want to know about the first connection so as to
// forward error state. Analyze the call graph to understand what is going on here. We really want
// to move reconnection away from the socket layer, so that it can properly interface with sessions
// and generally has some event-driven state (as all good network glue does). There is a second story
// which is we want to avoid duplicate code, so it depends on other users as well.
final var future = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
taskFuture = future;
connectFuture = future.firstSessionFuture();
} else {
initFuture = dispatcher.createClient(config);
taskFuture = connectFuture = dispatcher.createClient(config);
}

initFuture.addListener(future -> {
connectFuture.addListener(future -> {
if (!future.isSuccess() && !future.isCancelled()) {
LOG.debug("{}: Connection failed", id, future.cause());
NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause());
if (firstConnectionFuture.isDone()) {
remoteDevice.onRemoteSessionFailed(future.cause());
if (!firstConnectionFuture.isDone()) {
firstConnectionFuture.setException(future.cause());
}
}
Expand Down Expand Up @@ -254,8 +263,8 @@ public void onSessionTerminated(final NetconfClientSession session, final Netcon
@Override
public void close() {
// Cancel reconnect if in progress
if (initFuture != null) {
initFuture.cancel(false);
if (taskFuture != null) {
taskFuture.cancel(false);
}
// Disconnect from device
// tear down not necessary, called indirectly by the close in disconnect()
Expand Down

0 comments on commit 86ea92e

Please sign in to comment.