Skip to content

Commit

Permalink
implemented listener for correct propagation of message cross user an…
Browse files Browse the repository at this point in the history
…d connection drop handling for live query
  • Loading branch information
tglman committed Oct 23, 2015
1 parent 3295eb4 commit 55e7610
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 187 deletions.
Expand Up @@ -41,15 +41,15 @@
/**
* Manages network connections against OrientDB servers. All the connection pools are managed in a Map<url,pool>, but in the future
* we could have a unique pool per sever and manage database connections over the protocol.
*
*
* @author Luca Garulli (l.garulli--at--orientechnologies.com)
*/
public class ORemoteConnectionManager {
public static final String PARAM_MAX_POOL = "maxpool";
public static final String PARAM_MAX_POOL = "maxpool";

protected final ConcurrentHashMap<String, ORemoteConnectionPool> connections;
protected final long timeout;
protected ORemoteConnectionPushListener listener;
protected final long timeout;
protected ORemoteConnectionPushListener listener;

public ORemoteConnectionManager(final int iMaxConnectionPerURL, final long iTimeout) {
connections = new ConcurrentHashMap<String, ORemoteConnectionPool>();
Expand All @@ -64,8 +64,7 @@ public void close() {
connections.clear();
}

public OChannelBinaryAsynchClient acquire(String iServerURL, final OContextConfiguration clientConfiguration,
final Map<String, Object> iConfiguration, final ORemoteServerEventListener iListener) {
public OChannelBinaryAsynchClient acquire(String iServerURL, final OContextConfiguration clientConfiguration, final Map<String, Object> iConfiguration, final OStorageRemoteAsynchEventListener iListener) {
ORemoteConnectionPool pool = connections.get(iServerURL);
if (pool == null) {
int maxPool = OGlobalConfiguration.CLIENT_CHANNEL_MAX_POOL.getValueAsInteger();
Expand All @@ -83,11 +82,10 @@ public OChannelBinaryAsynchClient acquire(String iServerURL, final OContextConfi
pool = prev;
}
}
pool.addListener(iListener);

try {
// RETURN THE RESOURCE
OChannelBinaryAsynchClient ret = pool.getPool().getResource(iServerURL, timeout, clientConfiguration, iConfiguration);
OChannelBinaryAsynchClient ret = pool.acquire(iServerURL, timeout, clientConfiguration, iConfiguration, iListener);
return ret;

} catch (RuntimeException e) {
Expand Down Expand Up @@ -153,7 +151,7 @@ public int getAvailableConnections(final String url) {
return pool.getPool().getAvailableResources();
}

public int getReusableConnections(final String url){
public int getReusableConnections(final String url) {
final ORemoteConnectionPool pool = connections.get(url);
if (pool == null)
return 0;
Expand Down Expand Up @@ -189,4 +187,9 @@ protected void closePool(ORemoteConnectionPool pool) {
}
pool.getPool().close();
}

public ORemoteConnectionPool getPool(String url) {
return connections.get(url);
}

}
Expand Up @@ -16,17 +16,17 @@
/**
* Created by tglman on 01/10/15.
*/
public class ORemoteConnectionPool extends ORemoteConnectionPushListener implements OResourcePoolListener<String, OChannelBinaryAsynchClient>, OChannelListener {
public class ORemoteConnectionPool implements OResourcePoolListener<String, OChannelBinaryAsynchClient>, OChannelListener {


private OResourcePool<String, OChannelBinaryAsynchClient> pool;
private ORemoteConnectionPushListener listener = new ORemoteConnectionPushListener();

public ORemoteConnectionPool(int iMaxResources) {
pool = new OResourcePool<String, OChannelBinaryAsynchClient>(iMaxResources, this);
}

protected OChannelBinaryAsynchClient createNetworkConnection(String iServerURL, final OContextConfiguration clientConfiguration, Map<String, Object> iAdditionalArg)
throws OIOException {
protected OChannelBinaryAsynchClient createNetworkConnection(String iServerURL, final OContextConfiguration clientConfiguration, Map<String, Object> iAdditionalArg) throws OIOException {
if (iServerURL == null)
throw new IllegalArgumentException("server url is null");

Expand All @@ -50,7 +50,7 @@ protected OChannelBinaryAsynchClient createNetworkConnection(String iServerURL,
final String remoteHost = serverURL.substring(0, sepPos);
final int remotePort = Integer.parseInt(serverURL.substring(sepPos + 1));

final OChannelBinaryAsynchClient ch = new OChannelBinaryAsynchClient(remoteHost, remotePort, databaseName, clientConfiguration, OChannelBinaryProtocol.CURRENT_PROTOCOL_VERSION, this);
final OChannelBinaryAsynchClient ch = new OChannelBinaryAsynchClient(remoteHost, remotePort, databaseName, clientConfiguration, OChannelBinaryProtocol.CURRENT_PROTOCOL_VERSION, listener);

// REGISTER MYSELF AS LISTENER TO REMOVE THE CHANNEL FROM THE POOL IN CASE OF CLOSING
ch.registerListener(this);
Expand Down Expand Up @@ -91,4 +91,9 @@ public void onChannelClose(final OChannel channel) {

}

public OChannelBinaryAsynchClient acquire(String iServerURL, long timeout, OContextConfiguration clientConfiguration, Map<String, Object> iConfiguration, OStorageRemoteAsynchEventListener iListener) {
OChannelBinaryAsynchClient ret = pool.getResource(iServerURL, timeout, clientConfiguration, iConfiguration);
listener.addListener(this, ret, iListener);
return ret;
}
}
@@ -1,22 +1,46 @@
package com.orientechnologies.orient.client.remote;

import com.orientechnologies.orient.core.sql.query.OLiveResultListener;
import com.orientechnologies.orient.enterprise.channel.OChannel;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryAsynchClient;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelListener;
import com.orientechnologies.orient.enterprise.channel.binary.ORemoteServerEventListener;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Created by tglman on 01/10/15.
*/
public class ORemoteConnectionPushListener implements ORemoteServerEventListener {

private Set<ORemoteServerEventListener> listeners = Collections.synchronizedSet(new HashSet<ORemoteServerEventListener>());
private Set<ORemoteServerEventListener> listeners = Collections.synchronizedSet(new HashSet<ORemoteServerEventListener>());
private ConcurrentMap<ORemoteServerEventListener, Set<OChannelBinaryAsynchClient>> conns = new ConcurrentHashMap<ORemoteServerEventListener, Set<OChannelBinaryAsynchClient>>();

public void addListener(ORemoteServerEventListener listener) {
public void addListener(final ORemoteConnectionPool pool, final OChannelBinaryAsynchClient connection, final OStorageRemoteAsynchEventListener listener) {
this.listeners.add(listener);
Set<OChannelBinaryAsynchClient> ans = conns.get(listener);
if (ans == null) {
ans =Collections.synchronizedSet(new HashSet<OChannelBinaryAsynchClient>());
Set<OChannelBinaryAsynchClient> putRet = conns.putIfAbsent(listener, ans);
if(putRet != null)
ans = putRet;
}
if (!ans.contains(connection)) {
ans.add(connection);
connection.registerListener(new OChannelListener() {
@Override
public void onChannelClose(OChannel iChannel) {
Set<OChannelBinaryAsynchClient> all = conns.get(listener);
all.remove(iChannel);
if (all.isEmpty()){
listener.onEndUsedConnections(pool);
}
connection.unregisterListener(this);
}
});
}
}

public void removeListener(ORemoteServerEventListener listener) {
Expand All @@ -28,16 +52,4 @@ public void onRequest(final byte iRequestCode, Object obj) {
listener.onRequest(iRequestCode, obj);
}
}

@Override
public void registerLiveListener(Integer id, OLiveResultListener listener) {

}

@Override
public void unregisterLiveListener(Integer id) {

}


}

0 comments on commit 55e7610

Please sign in to comment.