Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge ce63dea into b0ddb57
Browse files Browse the repository at this point in the history
  • Loading branch information
hechaoli committed Apr 20, 2018
2 parents b0ddb57 + ce63dea commit 3024bdc
Show file tree
Hide file tree
Showing 13 changed files with 1,097 additions and 674 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ System.out.println(Arrays.toString(dbs));
From above example we can see the steps of getting an `OvsdbClient` object.

(1) Construct a `OvsdbPassiveConnectionListener`. The `OvsdbPassiveConnectionListenerImpl`
takes a `ScheduledExecutorService` for asynchronous operations.
takes a `ScheduledExecutorService` for asynchronous operations.
(2) Implement the `ConnectionCallback` interface and construct a callback object.
(3) Start listening on the port.
(4) Get the `OvsdbClient` object from the callback and use it for operations on the OVSDB server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public OvsdbChannelInitializer(
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
protected void initChannel(SocketChannel ch) {
LOGGER.info("New channel created: {}", ch);

ChannelPipeline pipeline = ch.pipeline();
Expand All @@ -93,9 +93,9 @@ protected void initChannel(SocketChannel ch) throws Exception {
}
pipeline.addLast("ssl", new SslHandler(engine));
}
pipeline.addLast("logger", new LoggingHandler(LogLevel.TRACE));
pipeline.addLast("decoder", new JsonNodeDecoder());
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("logger", new LoggingHandler(LogLevel.TRACE));
pipeline.addLast("heartbeatHandler", new HeartBeatHandler());
pipeline.addLast("ovsdbClientHandler",
new OvsdbClientHandler(connectionCallback, executorService));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ private void connectTo(
.handler(new OvsdbChannelInitializer(
sslContext, executorService, connectionCallback, false
));
ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
channelFuture.channel().closeFuture().addListener(future -> group.shutdownGracefully());
bootstrap.connect(ip, port).sync().channel().closeFuture()
.addListener(future -> group.shutdownGracefully());
} catch (InterruptedException ex) {
LOGGER.error("Failed to connect to " + ip + ":" + port + " with ssl " + sslContext, ex);
group.shutdownGracefully();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,8 @@ private void startListeningOnPort(
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new OvsdbChannelInitializer(
sslContext, executorService, connectionCallback, true
));

serverBootstrap.option(
ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(65535, 65535, 65535)
);
)).option(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));

// Start the server.
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2018 VMware, Inc. All Rights Reserved.
*
* This product is licensed to you under the BSD-2 license (the "License").
* You may not use this product except in compliance with the BSD-2 License.
*
* This product may include a number of subcomponents with separate copyright
* notices and license terms. Your use of these subcomponents is subject to the
* terms and conditions of the subcomponent's license, as noted in the LICENSE
* file.
*
* SPDX-License-Identifier: BSD-2-Clause
*/

package com.vmware.ovsdb.service;

import static com.vmware.ovsdb.utils.SslUtil.newSelfSignedSslContextPair;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import com.vmware.ovsdb.callback.ConnectionCallback;
import com.vmware.ovsdb.service.impl.OvsdbActiveConnectionConnectorImpl;
import com.vmware.ovsdb.util.PropertyManager;
import com.vmware.ovsdb.utils.PassiveOvsdbServerEmulator;
import com.vmware.ovsdb.utils.SslUtil.SelfSignedSslContextPair;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class OvsdbActiveConnectionConnectorTest {

private static final String HOST = "127.0.0.1";

private static final int PORT = 6641; // Use port 6641 for testing purpose

private static final int TEST_TIMEOUT_MILLIS = 60 * 1000; // 60 seconds

private static final int VERIFY_TIMEOUT_MILLIS = 5000; // 5 seconds

private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);

private final OvsdbActiveConnectionConnector activeConnectionConnector =
new OvsdbActiveConnectionConnectorImpl(executorService);

@Test(timeout = TEST_TIMEOUT_MILLIS)
public void testTcpConnection() {
testConnectionBasic(null);
testWriteInvalidJson(null);
testChannelTimeout(null);
}

@Test(timeout = TEST_TIMEOUT_MILLIS)
public void testSslConnection() throws Exception {
SelfSignedSslContextPair sslContextPair = newSelfSignedSslContextPair();
// In active connection test, the controller is the client and the ovsdb-server is the server
testConnectionBasic(sslContextPair);
testWriteInvalidJson(sslContextPair);
testChannelTimeout(sslContextPair);
}

private void testConnectionBasic(SelfSignedSslContextPair sslCtxPair) {
final int ovsdbServerCnt = 10;
final List<Integer> ports = IntStream.range(PORT, PORT + ovsdbServerCnt)
.boxed().collect(Collectors.toList());

final List<PassiveOvsdbServerEmulator> passiveOvsdbServers = new ArrayList<>();

for (int port : ports) {
PassiveOvsdbServerEmulator passiveOvsdbServer = new PassiveOvsdbServerEmulator(port);
passiveOvsdbServers.add(passiveOvsdbServer);
if (sslCtxPair == null) {
passiveOvsdbServer.startListening().join();
} else {
passiveOvsdbServer.startListeningWithSsl(sslCtxPair.getServerSslCtx()).join();
}
}

final List<ConnectionCallback> connectionCallbacks = new ArrayList<>();

for (int port : ports) {
ConnectionCallback connectionCallback = mock(ConnectionCallback.class);
connectionCallbacks.add(connectionCallback);
if (sslCtxPair == null) {
activeConnectionConnector.connect(HOST, port, connectionCallback);
} else {
activeConnectionConnector.connectWithSsl(HOST, port,
sslCtxPair.getClientSslCtx(), connectionCallback);
}
}

for (ConnectionCallback connectionCallback : connectionCallbacks) {
verify(connectionCallback, timeout(VERIFY_TIMEOUT_MILLIS)).connected(any());
}

passiveOvsdbServers.forEach(PassiveOvsdbServerEmulator::disconnect);

for (ConnectionCallback connectionCallback : connectionCallbacks) {
verify(connectionCallback, timeout(VERIFY_TIMEOUT_MILLIS)).disconnected(any());
}

passiveOvsdbServers.forEach(passiveOvsdbServer -> passiveOvsdbServer.stopListening().join());
}

private void testWriteInvalidJson(SelfSignedSslContextPair sslCtxPair) {
final PassiveOvsdbServerEmulator passiveOvsdbServer = new PassiveOvsdbServerEmulator(PORT);
ConnectionCallback mockConnectionCallback = mock(ConnectionCallback.class);
if (sslCtxPair == null) {
passiveOvsdbServer.startListening().join();
activeConnectionConnector.connect(HOST, PORT, mockConnectionCallback);
} else {
passiveOvsdbServer.startListeningWithSsl(sslCtxPair.getServerSslCtx()).join();
activeConnectionConnector.connectWithSsl(HOST, PORT,
sslCtxPair.getClientSslCtx(), mockConnectionCallback);
}

verify(mockConnectionCallback, timeout(VERIFY_TIMEOUT_MILLIS)).connected(any());

// Write an invalid Json to the channel. The ExceptionHandler should
// close the channel
passiveOvsdbServer.write("}\"msg\":\"IAmInvalidJson\"{");
//passiveOvsdbServer.write("{\"method\":\"echo\",\"params\":[],\"id\":\"1111\"");

verify(mockConnectionCallback, timeout(VERIFY_TIMEOUT_MILLIS)).disconnected(any());

passiveOvsdbServer.stopListening().join();
}

private void testChannelTimeout(SelfSignedSslContextPair sslCtxPair) {
final PassiveOvsdbServerEmulator passiveOvsdbServer = new PassiveOvsdbServerEmulator(PORT);
ConnectionCallback mockConnectionCallback = mock(ConnectionCallback.class);
if (sslCtxPair == null) {
passiveOvsdbServer.startListening().join();
activeConnectionConnector.connect(HOST, PORT, mockConnectionCallback);
} else {
passiveOvsdbServer.startListeningWithSsl(sslCtxPair.getServerSslCtx()).join();
activeConnectionConnector.connectWithSsl(HOST, PORT,
sslCtxPair.getClientSslCtx(), mockConnectionCallback);
}

verify(mockConnectionCallback, timeout(VERIFY_TIMEOUT_MILLIS)).connected(any());

long readIdleTimeout = PropertyManager.getLongProperty("channel.read.idle.timeout.sec", 5);
int readIdleMax = PropertyManager.getIntProperty("channel.read.idle.max", 3);
// Wait until the ovsdb manager closes the channel
verify(mockConnectionCallback, timeout((readIdleTimeout * readIdleMax + 2) * 1000))
.disconnected(any());

passiveOvsdbServer.stopListening().join();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2018 VMware, Inc. All Rights Reserved.
*
* This product is licensed to you under the BSD-2 license (the "License").
* You may not use this product except in compliance with the BSD-2 License.
*
* This product may include a number of subcomponents with separate copyright
* notices and license terms. Your use of these subcomponents is subject to the
* terms and conditions of the subcomponent's license, as noted in the LICENSE
* file.
*
* SPDX-License-Identifier: BSD-2-Clause
*/

package com.vmware.ovsdb.service;

import com.vmware.ovsdb.exception.OvsdbClientException;
import com.vmware.ovsdb.service.impl.OvsdbActiveConnectionConnectorImpl;
import com.vmware.ovsdb.utils.PassiveOvsdbServerEmulator;
import io.netty.handler.ssl.SslContext;
import org.junit.After;
import org.junit.Test;

import java.io.IOException;

public class OvsdbClientActiveConnectionTest extends OvsdbClientTest {

private static final OvsdbActiveConnectionConnector activeConnector
= new OvsdbActiveConnectionConnectorImpl(executorService);

private static final PassiveOvsdbServerEmulator passiveOvsdbServer =
new PassiveOvsdbServerEmulator(PORT);

public OvsdbClientActiveConnectionTest() {
super(passiveOvsdbServer);
}

@After
public void tearDown() {
passiveOvsdbServer.stopListening().join();
}

@Override
void setUp(boolean withSsl) {
if (!withSsl) {
passiveOvsdbServer.startListening().join();
activeConnector.connect(HOST, PORT, connectionCallback);
} else {
// In passive connection test, the controller is the server and the ovsdb-server is the client
SslContext serverSslCtx = sslContextPair.getServerSslCtx();
SslContext clientSslCtx = sslContextPair.getClientSslCtx();
passiveOvsdbServer.startListeningWithSsl(serverSslCtx);
activeConnector.connectWithSsl(HOST, PORT, clientSslCtx, connectionCallback);
}
}

@Test(timeout = TEST_TIMEOUT_MILLIS)
public void testTcpConnection() throws OvsdbClientException, IOException {
super.testTcpConnection();
}

@Test(timeout = TEST_TIMEOUT_MILLIS)
public void testSslConnection() throws OvsdbClientException, IOException {
super.testSslConnection();
}
}
Loading

0 comments on commit 3024bdc

Please sign in to comment.