Skip to content

Commit

Permalink
first attempt ot refactor the client push listener
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed Oct 22, 2015
1 parent b6abf65 commit 8a36975
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 96 deletions.
Expand Up @@ -44,19 +44,20 @@
*
* @author Luca Garulli (l.garulli--at--orientechnologies.com)
*/
public class ORemoteConnectionManager implements OChannelListener {
public class ORemoteConnectionManager {
public static final String PARAM_MAX_POOL = "maxpool";

protected final ConcurrentHashMap<String, OResourcePool<String, OChannelBinaryAsynchClient>> connections;
protected final ConcurrentHashMap<String, ORemoteConnectionPool> connections;
protected final long timeout;
protected ORemoteConnectionPushListener listener;

public ORemoteConnectionManager(final int iMaxConnectionPerURL, final long iTimeout) {
connections = new ConcurrentHashMap<String, OResourcePool<String, OChannelBinaryAsynchClient>>();
connections = new ConcurrentHashMap<String, ORemoteConnectionPool>();
timeout = iTimeout;
}

public void close() {
for (Map.Entry<String, OResourcePool<String, OChannelBinaryAsynchClient>> entry : connections.entrySet()) {
for (Map.Entry<String, ORemoteConnectionPool> entry : connections.entrySet()) {
closePool(entry.getValue());
}

Expand All @@ -65,7 +66,7 @@ public void close() {

public OChannelBinaryAsynchClient acquire(String iServerURL, final OContextConfiguration clientConfiguration,
final Map<String, Object> iConfiguration, final ORemoteServerEventListener iListener) {
OResourcePool<String, OChannelBinaryAsynchClient> pool = connections.get(iServerURL);
ORemoteConnectionPool pool = connections.get(iServerURL);
if (pool == null) {
int maxPool = OGlobalConfiguration.CLIENT_CHANNEL_MAX_POOL.getValueAsInteger();

Expand All @@ -74,32 +75,19 @@ public OChannelBinaryAsynchClient acquire(String iServerURL, final OContextConfi
maxPool = Integer.parseInt(iConfiguration.get(PARAM_MAX_POOL).toString());
}

pool = new OResourcePool<String, OChannelBinaryAsynchClient>(maxPool,
new OResourcePoolListener<String, OChannelBinaryAsynchClient>() {
@Override
public OChannelBinaryAsynchClient createNewResource(final String iKey, final Object... iAdditionalArgs) {
return createNetworkConnection(iKey, (OContextConfiguration) iAdditionalArgs[0],
(Map<String, Object>) iAdditionalArgs[1], (ORemoteServerEventListener) iAdditionalArgs[2]);
}

@Override
public boolean reuseResource(final String iKey, final Object[] iAdditionalArgs, final OChannelBinaryAsynchClient iValue) {
return iValue.isConnected();
}

});

final OResourcePool<String, OChannelBinaryAsynchClient> prev = connections.putIfAbsent(iServerURL, pool);
pool = new ORemoteConnectionPool(maxPool);
final ORemoteConnectionPool prev = connections.putIfAbsent(iServerURL, pool);
if (prev != null) {
// ALREADY PRESENT, DESTROY IT AND GET THE ALREADY EXISTENT OBJ
pool.close();
pool.getPool().close();
pool = prev;
}
}
pool.addListener(iListener);

try {
// RETURN THE RESOURCE
OChannelBinaryAsynchClient ret = pool.getResource(iServerURL, timeout, clientConfiguration, iConfiguration, iListener);
OChannelBinaryAsynchClient ret = pool.getPool().getResource(iServerURL, timeout, clientConfiguration, iConfiguration);
return ret;

} catch (RuntimeException e) {
Expand All @@ -113,13 +101,13 @@ public boolean reuseResource(final String iKey, final Object[] iAdditionalArgs,
}

public void release(final OChannelBinaryAsynchClient conn) {
final OResourcePool<String, OChannelBinaryAsynchClient> pool = connections.get(conn.getServerURL());
final ORemoteConnectionPool pool = connections.get(conn.getServerURL());
if (pool != null) {
if (!conn.isConnected()) {
OLogManager.instance().debug(this, "Network connection pool is receiving a closed connection to reuse: discard it");
remove(conn);
} else {
pool.returnResource(conn);
pool.getPool().returnResource(conn);
}
}
}
Expand All @@ -137,23 +125,11 @@ public void remove(final OChannelBinaryAsynchClient conn) {
OLogManager.instance().debug(this, "Cannot close connection", e);
}

final OResourcePool<String, OChannelBinaryAsynchClient> pool = connections.get(conn.getServerURL());
final ORemoteConnectionPool pool = connections.get(conn.getServerURL());
if (pool == null)
throw new IllegalStateException("Connection cannot be released because the pool doesn't exist anymore");

pool.remove(conn);

}

@Override
public void onChannelClose(final OChannel channel) {
OChannelBinaryAsynchClient conn = (OChannelBinaryAsynchClient) channel;

final OResourcePool<String, OChannelBinaryAsynchClient> pool = connections.get(conn.getServerURL());
if (pool == null)
throw new IllegalStateException("Connection cannot be released because the pool doesn't exist anymore");

pool.remove(conn);
pool.getPool().remove(conn);

}

Expand All @@ -162,97 +138,55 @@ public Set<String> getURLs() {
}

public int getMaxResources(final String url) {
final OResourcePool<String, OChannelBinaryAsynchClient> pool = connections.get(url);
final ORemoteConnectionPool pool = connections.get(url);
if (pool == null)
return 0;

return pool.getMaxResources();
return pool.getPool().getMaxResources();
}

public int getAvailableConnections(final String url) {
final OResourcePool<String, OChannelBinaryAsynchClient> pool = connections.get(url);
final ORemoteConnectionPool pool = connections.get(url);
if (pool == null)
return 0;

return pool.getAvailableResources();
return pool.getPool().getAvailableResources();
}

public int getReusableConnections(final String url) {
final OResourcePool<String, OChannelBinaryAsynchClient> pool = connections.get(url);
public int getReusableConnections(final String url){
final ORemoteConnectionPool pool = connections.get(url);
if (pool == null)
return 0;

return pool.getInPoolResources();
return pool.getPool().getInPoolResources();
}

public int getCreatedInstancesInPool(final String url) {
final OResourcePool<String, OChannelBinaryAsynchClient> pool = connections.get(url);
final ORemoteConnectionPool pool = connections.get(url);
if (pool == null)
return 0;

return pool.getCreatedInstances();
return pool.getPool().getCreatedInstances();
}

public void closePool(final String url) {
final OResourcePool<String, OChannelBinaryAsynchClient> pool = connections.remove(url);
final ORemoteConnectionPool pool = connections.remove(url);
if (pool == null)
return;

closePool(pool);
}

protected void closePool(final OResourcePool<String, OChannelBinaryAsynchClient> pool) {
final List<OChannelBinaryAsynchClient> conns = new ArrayList<OChannelBinaryAsynchClient>(pool.getAllResources());
protected void closePool(ORemoteConnectionPool pool) {
final List<OChannelBinaryAsynchClient> conns = new ArrayList<OChannelBinaryAsynchClient>(pool.getPool().getAllResources());
for (OChannelBinaryAsynchClient c : conns)
try {
// Unregister the listener that make the connection return to the closing pool.
c.unregisterListener(this);
//Unregister the listener that make the connection return to the closing pool.
c.unregisterListener(pool);
c.close();
} catch (Exception e) {
OLogManager.instance().debug(this, "Cannot close binary channel", e);
}
pool.close();
}

protected OChannelBinaryAsynchClient createNetworkConnection(String iServerURL, final OContextConfiguration clientConfiguration,
Map<String, Object> iAdditionalArg, final ORemoteServerEventListener asynchEventListener) throws OIOException {
if (iServerURL == null)
throw new IllegalArgumentException("server url is null");

// TRY WITH CURRENT URL IF ANY
try {
OLogManager.instance().debug(this, "Trying to connect to the remote host %s...", iServerURL);

final String serverURL;
final String databaseName;
int sepPos = iServerURL.indexOf("/");
if (sepPos > -1) {
// REMOVE DATABASE NAME IF ANY
serverURL = iServerURL.substring(0, sepPos);
databaseName = iServerURL.substring(sepPos + 1);
} else {
serverURL = iServerURL;
databaseName = null;
}

sepPos = serverURL.indexOf(":");
final String remoteHost = serverURL.substring(0, sepPos);
final int remotePort = Integer.parseInt(serverURL.substring(sepPos + 1));

final OChannelBinaryAsynchClient ch = new OChannelBinaryAsynchClient(remoteHost, remotePort, databaseName,
clientConfiguration, OChannelBinaryProtocol.CURRENT_PROTOCOL_VERSION, asynchEventListener);

// REGISTER MYSELF AS LISTENER TO REMOVE THE CHANNEL FROM THE POOL IN CASE OF CLOSING
ch.registerListener(this);

return ch;

} catch (OIOException e) {
// RE-THROW IT
throw e;
} catch (Exception e) {
OLogManager.instance().debug(this, "Error on connecting to %s", e, iServerURL);
throw OException.wrapException(new OIOException("Error on connecting to " + iServerURL), e);
}
pool.getPool().close();
}
}
@@ -0,0 +1,93 @@
package com.orientechnologies.orient.client.remote;

import com.orientechnologies.common.concur.resource.OResourcePool;
import com.orientechnologies.common.concur.resource.OResourcePoolListener;
import com.orientechnologies.common.io.OIOException;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.config.OContextConfiguration;
import com.orientechnologies.orient.enterprise.channel.OChannel;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryAsynchClient;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelListener;

import java.util.Map;

/**
* Created by tglman on 01/10/15.
*/
public class ORemoteConnectionPool extends ORemoteConnectionPushListener implements OResourcePoolListener<String, OChannelBinaryAsynchClient>, OChannelListener {


private OResourcePool<String, OChannelBinaryAsynchClient> pool;

public ORemoteConnectionPool(int iMaxResources) {
pool = new OResourcePool<String, OChannelBinaryAsynchClient>(iMaxResources, this);
}

protected OChannelBinaryAsynchClient createNetworkConnection(String iServerURL, final OContextConfiguration clientConfiguration, Map<String, Object> iAdditionalArg)
throws OIOException {
if (iServerURL == null)
throw new IllegalArgumentException("server url is null");

// TRY WITH CURRENT URL IF ANY
try {
OLogManager.instance().debug(this, "Trying to connect to the remote host %s...", iServerURL);

final String serverURL;
final String databaseName;
int sepPos = iServerURL.indexOf("/");
if (sepPos > -1) {
// REMOVE DATABASE NAME IF ANY
serverURL = iServerURL.substring(0, sepPos);
databaseName = iServerURL.substring(sepPos + 1);
} else {
serverURL = iServerURL;
databaseName = null;
}

sepPos = serverURL.indexOf(":");
final String remoteHost = serverURL.substring(0, sepPos);
final int remotePort = Integer.parseInt(serverURL.substring(sepPos + 1));

final OChannelBinaryAsynchClient ch = new OChannelBinaryAsynchClient(remoteHost, remotePort, databaseName, clientConfiguration, OChannelBinaryProtocol.CURRENT_PROTOCOL_VERSION, this);

// REGISTER MYSELF AS LISTENER TO REMOVE THE CHANNEL FROM THE POOL IN CASE OF CLOSING
ch.registerListener(this);

return ch;

} catch (OIOException e) {
// RE-THROW IT
throw e;
} catch (Exception e) {
OLogManager.instance().debug(this, "Error on connecting to %s", e, iServerURL);
throw new OIOException("Error on connecting to " + iServerURL, e);
}
}

@Override
public OChannelBinaryAsynchClient createNewResource(String iKey, Object... iAdditionalArgs) {
return createNetworkConnection(iKey, (OContextConfiguration) iAdditionalArgs[0], (Map<String, Object>) iAdditionalArgs[1]);
}

@Override
public boolean reuseResource(String iKey, Object[] iAdditionalArgs, OChannelBinaryAsynchClient iValue) {
return iValue.isConnected();
}

public OResourcePool<String, OChannelBinaryAsynchClient> getPool() {
return pool;
}

@Override
public void onChannelClose(final OChannel channel) {
OChannelBinaryAsynchClient conn = (OChannelBinaryAsynchClient) channel;

if (pool == null)
throw new IllegalStateException("Connection cannot be released because the pool doesn't exist anymore");

pool.remove(conn);

}

}
@@ -0,0 +1,43 @@
package com.orientechnologies.orient.client.remote;

import com.orientechnologies.orient.core.sql.query.OLiveResultListener;
import com.orientechnologies.orient.enterprise.channel.binary.ORemoteServerEventListener;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Created by tglman on 01/10/15.
*/
public class ORemoteConnectionPushListener implements ORemoteServerEventListener {

private Set<ORemoteServerEventListener> listeners = Collections.synchronizedSet(new HashSet<ORemoteServerEventListener>());

public void addListener(ORemoteServerEventListener listener) {
this.listeners.add(listener);
}

public void removeListener(ORemoteServerEventListener listener) {
this.listeners.remove(listener);
}

public void onRequest(final byte iRequestCode, Object obj) {
for (ORemoteServerEventListener listener : listeners) {
listener.onRequest(iRequestCode, obj);
}
}

@Override
public void registerLiveListener(Integer id, OLiveResultListener listener) {

}

@Override
public void unregisterLiveListener(Integer id) {

}


}

0 comments on commit 8a36975

Please sign in to comment.