Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #4 from hechaoli/async
Browse files Browse the repository at this point in the history
Make listener and connector fully asynchronous.
  • Loading branch information
hechaoli committed Apr 20, 2018
2 parents c42889d + 2228c9b commit faeb3fe
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.vmware.ovsdb.netty;

import com.vmware.ovsdb.callback.ConnectionCallback;
import com.vmware.ovsdb.service.OvsdbClient;
import com.vmware.ovsdb.util.PropertyManager;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
Expand All @@ -30,6 +31,7 @@
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
Expand All @@ -46,32 +48,33 @@ public class OvsdbChannelInitializer extends ChannelInitializer<SocketChannel> {
private static long READ_IDLE_TIMEOUT = PropertyManager
.getLongProperty(KEY_CHANNEL_READ_IDLE_TIMEOUT_SEC, DEFAULT_READ_IDLE_TIMEOUT_SEC);

private final boolean isServerChannel;

private final SslContext sslContext;

private final ConnectionCallback connectionCallback;

private final CompletableFuture<OvsdbClient> ovsdbClientFuture;

private final ScheduledExecutorService executorService;

/**
* Create a {@link OvsdbChannelInitializer} object.
* This should be used in active mode.
*
* @param sslContext the SSL context
* @param executorService an {@link ScheduledExecutorService} object
* @param connectionCallback will be called then a new connection is established
* @param isServerChannel true if this is a server channel (i.e. used in passive connection)
* @param connectionCallback will be called then a new connection is established.
* Should be null in active mode
* @param ovsdbClientFuture will complete when the connection is established.
* Should be null in passive mode
*/
public OvsdbChannelInitializer(
SslContext sslContext,
ScheduledExecutorService executorService,
ConnectionCallback connectionCallback,
boolean isServerChannel
private OvsdbChannelInitializer(
SslContext sslContext, ScheduledExecutorService executorService,
ConnectionCallback connectionCallback, CompletableFuture<OvsdbClient> ovsdbClientFuture
) {
this.sslContext = sslContext;
this.executorService = executorService;
this.connectionCallback = connectionCallback;
this.isServerChannel = isServerChannel;
this.ovsdbClientFuture = ovsdbClientFuture;
}

@Override
Expand All @@ -85,7 +88,7 @@ protected void initChannel(SocketChannel ch) {
);
if (sslContext != null) {
SSLEngine engine = sslContext.newEngine(ch.alloc());
if (isServerChannel) {
if (isPassiveMode()) {
engine.setUseClientMode(false);
engine.setNeedClientAuth(true);
} else {
Expand All @@ -97,9 +100,48 @@ protected void initChannel(SocketChannel ch) {
pipeline.addLast("decoder", new JsonNodeDecoder());
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("heartbeatHandler", new HeartBeatHandler());
pipeline.addLast("ovsdbClientHandler",
new OvsdbClientHandler(connectionCallback, executorService));
OvsdbClientHandler ovsdbClientHandler;
if (isPassiveMode()) {
ovsdbClientHandler = new OvsdbClientHandler(connectionCallback, executorService);
} else {
ovsdbClientHandler = new OvsdbClientHandler(ovsdbClientFuture, executorService);
}
pipeline.addLast("ovsdbClientHandler", ovsdbClientHandler);
pipeline.addLast("exceptionHandler", new ExceptionHandler());
}

/**
* Returns true if this is initializer is for passive connection.
*/
private boolean isPassiveMode() {
return connectionCallback != null;
}

/**
* Create an {@link OvsdbChannelInitializer} with passive mode.
*
* @param sslContext the SSL context. Can be null if SSL is not enabled
* @param executorService an {@link ScheduledExecutorService} object
* @param connectionCallback will be called then a new connection is established
*/
public static OvsdbChannelInitializer newOvsdbChannelInitializer(
SslContext sslContext, ScheduledExecutorService executorService,
ConnectionCallback connectionCallback
) {
return new OvsdbChannelInitializer(sslContext, executorService, connectionCallback, null);
}

/**
* Create an {@link OvsdbChannelInitializer} with active mode.
*
* @param sslContext the SSL context. Can be null if SSL is not enabled
* @param executorService an {@link ScheduledExecutorService} object
* @param ovsdbClientFuture will complete when the connection is established.
*/
public static OvsdbChannelInitializer newOvsdbChannelInitializer(
SslContext sslContext, ScheduledExecutorService executorService,
CompletableFuture<OvsdbClient> ovsdbClientFuture
) {
return new OvsdbChannelInitializer(sslContext, executorService, null, ovsdbClientFuture);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.cert.Certificate;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -58,7 +55,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLPeerUnverifiedException;

// TODO: Move the OVSDB logic out of this handler if possible
class OvsdbClientHandler extends ChannelInboundHandlerAdapter {
Expand All @@ -74,6 +70,8 @@ class OvsdbClientHandler extends ChannelInboundHandlerAdapter {

private final ConnectionCallback connectionCallback;

private final CompletableFuture<OvsdbClient> ovsdbClientFuture;

private final ScheduledExecutorService executorService;

private final ConcurrentMap<String, MonitorCallback> monitorCallbacks = new ConcurrentHashMap<>();
Expand All @@ -86,9 +84,37 @@ class OvsdbClientHandler extends ChannelInboundHandlerAdapter {

private OvsdbClient ovsdbClient;

/**
* Create an {@link OvsdbClientHandler} with a connection callback.
* This should be called in passive mode.
*
* @param connectionCallback will be called when there is a new connection
* @param executorService a {@link ScheduledExecutorService} used for asynchronous operations
*/
OvsdbClientHandler(
ConnectionCallback connectionCallback, ScheduledExecutorService executorService
) {
this(null, connectionCallback, executorService);
}

/**
* Create an {@link OvsdbClientHandler} with a {@link CompletableFuture}.
* This should be called in active mode.
*
* @param ovsdbClientFuture will complete with an {@link OvsdbClient} after the connection is done
* @param executorService a {@link ScheduledExecutorService} used for asynchronous operations
*/
OvsdbClientHandler(
CompletableFuture<OvsdbClient> ovsdbClientFuture, ScheduledExecutorService executorService
) {
this(ovsdbClientFuture, null, executorService);
}

private OvsdbClientHandler(
CompletableFuture<OvsdbClient> ovsdbClientFuture, ConnectionCallback connectionCallback,
ScheduledExecutorService executorService
) {
this.ovsdbClientFuture = ovsdbClientFuture;
this.connectionCallback = connectionCallback;
this.executorService = executorService;
}
Expand All @@ -108,13 +134,15 @@ public void channelActive(final ChannelHandlerContext ctx) {
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public void channelInactive(ChannelHandlerContext ctx) {
LOGGER.info("Channel {} is now inactive", ctx.channel());
executorService.submit(() -> connectionCallback.disconnected(ovsdbClient));
if (connectionCallback != null) {
executorService.submit(() -> connectionCallback.disconnected(ovsdbClient));
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
JsonNode jsonNode = (JsonNode) msg;
if (isRequestOrNotification(jsonNode)) {
executorService.submit(() -> {
Expand Down Expand Up @@ -180,7 +208,11 @@ public void close() {

ovsdbClient = new OvsdbClientImpl(OvsdbConnectionInfo.fromChannel(channel));

executorService.submit(() -> connectionCallback.connected(ovsdbClient));
if (connectionCallback != null) {
executorService.submit(() -> connectionCallback.connected(ovsdbClient));
} else {
ovsdbClientFuture.complete(ovsdbClient);
}
}

private String getNextId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,30 @@

package com.vmware.ovsdb.service;

import com.vmware.ovsdb.callback.ConnectionCallback;
import io.netty.handler.ssl.SslContext;

import java.util.concurrent.CompletableFuture;

public interface OvsdbActiveConnectionConnector {

/**
* Connect to the OVSDB server on ip:port.
*
* @param ip the OVSDB server ip
* @param port port to which the OVSDB is listening
* @param connectionCallback called when the connection is established
* @return a {@link CompletableFuture} that will complete with an {@link OvsdbClient}
* object when the connection is established
*/
void connect(String ip, int port, ConnectionCallback connectionCallback);
CompletableFuture<OvsdbClient> connect(String ip, int port);

/**
* Connect to the OVSDB server on ip:port with SSL enabled.
*
* @param ip the OVSDB server ip
* @param port port to which the OVSDB is listening
* @param sslContext the SSL context
* @param connectionCallback called when the connection is established
* @return a {@link CompletableFuture} that will complete with an {@link OvsdbClient}
* object when the connection is established
*/
void connectWithSsl(
String ip, int port, SslContext sslContext, ConnectionCallback connectionCallback
);
CompletableFuture<OvsdbClient> connectWithSsl(String ip, int port, SslContext sslContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,39 @@
import com.vmware.ovsdb.callback.ConnectionCallback;
import io.netty.handler.ssl.SslContext;

import java.util.concurrent.CompletableFuture;

public interface OvsdbPassiveConnectionListener {

/**
* Start listening on the specified port.
*
* @param port port to listen. Usually it is 6640
* @param connectionCallback called when there is a connection from the OVSDB server
* @return a {@link CompletableFuture} that will complete with true if listening starts
* successfully or false otherwise
*/
void startListening(int port, ConnectionCallback connectionCallback);
CompletableFuture<Boolean> startListening(int port, ConnectionCallback connectionCallback);

/**
* Start listening on the specified port with SSL enabled.
*
* @param port port to listen. Usually it should be 6640
* @param sslContext the SSL context used for SSL connection
* @param connectionCallback called when there is a connection from the OVSDB server
* @return a {@link CompletableFuture} that will complete with true if listening starts
* successfully or false otherwise
*/
void startListeningWithSsl(
CompletableFuture<Boolean> startListeningWithSsl(
int port, SslContext sslContext, ConnectionCallback connectionCallback
);

/**
* Stop the OVSDB manager.
* Stop listening on the given port.
*
* @param port the port to stop listening
* @return a {@link CompletableFuture} that will complete with true if listening stops
* successfully or false otherwise
*/
void stopListening(int port);
CompletableFuture<Boolean> stopListening(int port);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,63 +14,50 @@

package com.vmware.ovsdb.service.impl;

import com.vmware.ovsdb.callback.ConnectionCallback;
import com.vmware.ovsdb.netty.OvsdbChannelInitializer;
import static com.vmware.ovsdb.netty.OvsdbChannelInitializer.newOvsdbChannelInitializer;

import com.vmware.ovsdb.service.OvsdbActiveConnectionConnector;
import com.vmware.ovsdb.service.OvsdbClient;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;

// TODO: Integration test for it
public class OvsdbActiveConnectionConnectorImpl implements OvsdbActiveConnectionConnector {

private static final Logger LOGGER = LoggerFactory.getLogger(
MethodHandles.lookup().lookupClass());

private final ScheduledExecutorService executorService;

public OvsdbActiveConnectionConnectorImpl(ScheduledExecutorService executorService) {
this.executorService = executorService;
}

@Override
public void connect(String ip, int port, ConnectionCallback connectionCallback) {
connectTo(ip, port, null, connectionCallback);
public CompletableFuture<OvsdbClient> connect(String ip, int port) {
return doConnect(ip, port, null);
}

@Override
public void connectWithSsl(
String ip, int port, SslContext sslContext, ConnectionCallback connectionCallback
) {
connectTo(ip, port, sslContext, connectionCallback);
public CompletableFuture<OvsdbClient> connectWithSsl(String ip, int port, SslContext sslContext) {
return doConnect(ip, port, sslContext);
}

private void connectTo(
String ip, int port, SslContext sslContext, ConnectionCallback connectionCallback
) {
private CompletableFuture<OvsdbClient> doConnect(String ip, int port, SslContext sslContext) {
CompletableFuture<OvsdbClient> ovsdbClientFuture = new CompletableFuture<>();
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new OvsdbChannelInitializer(
sslContext, executorService, connectionCallback, false
));
bootstrap.connect(ip, port).sync().channel().closeFuture()
.addListener(future -> group.shutdownGracefully());
} catch (InterruptedException ex) {
LOGGER.error("Failed to connect to " + ip + ":" + port + " with ssl " + sslContext, ex);
group.shutdownGracefully();
}
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(newOvsdbChannelInitializer(sslContext, executorService, ovsdbClientFuture));
ChannelFuture channelFuture = bootstrap.connect(ip, port);
channelFuture.channel().closeFuture()
.addListener(future -> group.shutdownGracefully());
return ovsdbClientFuture;
}
}
Loading

0 comments on commit faeb3fe

Please sign in to comment.