Skip to content

Commit

Permalink
add tests for client connection manager
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed Sep 7, 2015
1 parent 065e281 commit 191d6bf
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 31 deletions.
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;

import com.orientechnologies.common.concur.resource.OCloseable;
Expand Down
Expand Up @@ -60,15 +60,12 @@ public void run() {
}
}, delay, delay);

Orient
.instance()
.getProfiler()
.registerHookValue("server.connections.actives", "Number of active network connections", METRIC_TYPE.COUNTER,
new OProfilerHookValue() {
public Object getValue() {
return connections.size();
}
});
Orient.instance().getProfiler().registerHookValue("server.connections.actives", "Number of active network connections",
METRIC_TYPE.COUNTER, new OProfilerHookValue() {
public Object getValue() {
return connections.size();
}
});
}

public void cleanExpiredConnections() {
Expand Down Expand Up @@ -103,7 +100,7 @@ public void cleanExpiredConnections() {

/**
* Create a connection.
*
*
* @param iProtocol
* protocol which will be used by connection
* @return new connection
Expand Down Expand Up @@ -132,8 +129,11 @@ public OClientConnection connect(final ONetworkProtocol iProtocol) throws IOExce
public OClientConnection connect(final ONetworkProtocol iProtocol, final OClientConnection connection, final byte[] tokenBytes,
final OToken token) throws IOException {

OClientSessions session = new OClientSessions(tokenBytes, token);
sessions.put(new OHashToken(tokenBytes), session);
OClientSessions session;
synchronized (sessions) {
session = new OClientSessions(tokenBytes, token);
sessions.put(new OHashToken(tokenBytes), session);
}
session.addConnection(connection);
OLogManager.instance().config(this, "Remote client connected from: " + connection);

Expand All @@ -142,22 +142,28 @@ public OClientConnection connect(final ONetworkProtocol iProtocol, final OClient

public OClientConnection reConnect(final ONetworkProtocol iProtocol, final byte[] tokenBytes, final OToken token)
throws IOException {
OClientSessions sess = sessions.get(new OHashToken(tokenBytes));
if (sess == null) {
// todo : fail or reconnect.
throw new OException("no session there ?#?#?");
}

final OClientConnection connection;
connection = new OClientConnection(connectionSerial.incrementAndGet(), iProtocol);

connections.put(connection.id, connection);
OHashToken key = new OHashToken(tokenBytes);
OClientSessions sess;
synchronized (sessions) {
sess = sessions.get(key);
if (sess == null) {
// RECONNECT
sess = new OClientSessions(tokenBytes, token);
sessions.put(new OHashToken(tokenBytes), sess);
}
}
sess.addConnection(connection);
return connection;
}

/**
* Retrieves the connection by id.
*
*
* @param iChannelId
* id of connection
* @return The connection if any, otherwise null
Expand All @@ -173,7 +179,7 @@ public OClientConnection getConnection(final int iChannelId, ONetworkProtocol pr

/**
* Retrieves the connection by address/port.
*
*
* @param iAddress
* The address as string in the format address as format <ip>:<port>
* @return The connection if any, otherwise null
Expand All @@ -188,7 +194,7 @@ public OClientConnection getConnection(final String iAddress) {

/**
* Disconnects and kill the associated network manager.
*
*
* @param iChannelId
* id of connection
*/
Expand All @@ -198,7 +204,7 @@ public void kill(final int iChannelId) {

/**
* Disconnects and kill the associated network manager.
*
*
* @param connection
* connection to kill
*/
Expand Down Expand Up @@ -226,7 +232,7 @@ public boolean has(final int id) {

/**
* Interrupt the associated network manager.
*
*
* @param iChannelId
* id of connection
*/
Expand All @@ -242,7 +248,7 @@ public void interrupt(final int iChannelId) {

/**
* Disconnects a client connections
*
*
* @param iChannelId
* id of connection
* @return true if was last one, otherwise false
Expand All @@ -259,8 +265,8 @@ public boolean disconnect(final int iChannelId) {
// CHECK IF THERE ARE OTHER CONNECTIONS
for (Entry<Integer, OClientConnection> entry : connections.entrySet()) {
if (entry.getValue().getProtocol().equals(connection.getProtocol())) {
OLogManager.instance()
.debug(this, "Disconnected connection with id=%d but are present other active channels", iChannelId);
OLogManager.instance().debug(this, "Disconnected connection with id=%d but are present other active channels",
iChannelId);
return false;
}
}
Expand All @@ -278,11 +284,13 @@ private void removeConnectFromSession(OClientConnection connection) {
OBinaryNetworkProtocolAbstract proto = (OBinaryNetworkProtocolAbstract) connection.getProtocol();
byte[] tokenBytes = proto.getTokenBytes();
OHashToken hashToken = new OHashToken(tokenBytes);
OClientSessions sess = sessions.get(hashToken);
if (sess != null) {
sess.removeConnection(connection);
if (!sess.isActive()) {
sessions.remove(hashToken);
synchronized (sessions) {
OClientSessions sess = sessions.get(hashToken);
if (sess != null) {
sess.removeConnection(connection);
if (!sess.isActive()) {
sessions.remove(hashToken);
}
}
}
}
Expand Down Expand Up @@ -446,6 +454,9 @@ public void killAllChannels() {
}

public OClientSessions getSession(ONetworkProtocolBinary iNetworkProtocolBinary) {
return sessions.get(new OHashToken(iNetworkProtocolBinary.getTokenBytes()));
OHashToken key = new OHashToken(iNetworkProtocolBinary.getTokenBytes());
synchronized (sessions) {
return sessions.get(key);
}
}
}
@@ -0,0 +1,70 @@
package com.orientechnologies.orient.server;

import static org.testng.AssertJUnit.*;

import java.io.IOException;

import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.orientechnologies.orient.core.metadata.security.OToken;
import com.orientechnologies.orient.server.network.protocol.binary.ONetworkProtocolBinary;

public class OClientConnectionManagerTest {

@Mock
private ONetworkProtocolBinary protocol;

@Mock
private OToken token;

@BeforeMethod
public void before() {
MockitoAnnotations.initMocks(this);
}

@Test
public void testSimpleConnectDisconnect() throws IOException {
OClientConnectionManager manager = new OClientConnectionManager();
OClientConnection ret = manager.connect(protocol);
assertNotNull(ret);
OClientConnection ret1 = manager.getConnection(ret.id, protocol);
assertSame(ret, ret1);
manager.disconnect(ret);

OClientConnection ret2 = manager.getConnection(ret.id, protocol);
assertNull(ret2);
}

@Test
public void testTokenConnectDisconnect() throws IOException {
byte[] atoken = new byte[] {};
Mockito.when(protocol.getTokenBytes()).thenReturn(atoken);
OClientConnectionManager manager = new OClientConnectionManager();
OClientConnection ret = manager.connect(protocol);
manager.connect(protocol, ret, atoken, token);
assertNotNull(ret);
OClientSessions sess = manager.getSession(protocol);
assertNotNull(sess);
assertEquals(sess.getConnections().size(), 1);
OClientConnection ret1 = manager.getConnection(ret.id, protocol);
assertSame(ret, ret1);
OClientConnection ret2 = manager.reConnect(protocol, atoken, token);
assertNotSame(ret1, ret2);
assertEquals(sess.getConnections().size(), 2);
manager.disconnect(ret);

assertEquals(sess.getConnections().size(), 1);
OClientConnection ret3 = manager.getConnection(ret.id, protocol);
assertNull(ret3);

manager.disconnect(ret2);
assertEquals(sess.getConnections().size(), 0);
OClientConnection ret4 = manager.getConnection(ret2.id, protocol);
assertNull(ret4);
}

}

0 comments on commit 191d6bf

Please sign in to comment.