Skip to content

Commit

Permalink
First draft of management of distributed msgs by using binary protoco…
Browse files Browse the repository at this point in the history
…l instead of HZ
  • Loading branch information
lvca committed Feb 29, 2016
1 parent 1e39e78 commit aee8b4a
Show file tree
Hide file tree
Showing 38 changed files with 960 additions and 951 deletions.
Expand Up @@ -41,9 +41,9 @@
* Remote administration class of OrientDB Server instances.
*/
public class OServerAdmin {
private OStorageRemote storage;
private int sessionId = -1;
private byte[] sessionToken = null;
protected OStorageRemote storage;
private int sessionId = -1;
private byte[] sessionToken = null;

/**
* Creates the object passing a remote URL to connect. sessionToken
Expand Down Expand Up @@ -86,7 +86,7 @@ public synchronized OServerAdmin connect(final String iUserName, final String iU
@Override
public Void execute(OChannelBinaryAsynchClient network) throws IOException {
try {
storage.beginRequest(network,OChannelBinaryProtocol.REQUEST_CONNECT);
storage.beginRequest(network, OChannelBinaryProtocol.REQUEST_CONNECT);

storage.sendClientInfo(network);

Expand All @@ -110,7 +110,7 @@ public Void execute(OChannelBinaryAsynchClient network) throws IOException {

return null;
}
},"Cannot connect to the remote server/database '" + storage.getURL() + "'");
}, "Cannot connect to the remote server/database '" + storage.getURL() + "'");
return this;
}

Expand Down Expand Up @@ -210,7 +210,8 @@ public synchronized OServerAdmin createDatabase(final String iDatabaseType, Stri
* @return The instance itself. Useful to execute method in chain
* @throws IOException
*/
public synchronized OServerAdmin createDatabase(final String iDatabaseName, final String iDatabaseType, final String iStorageMode) throws IOException {
public synchronized OServerAdmin createDatabase(final String iDatabaseName, final String iDatabaseType, final String iStorageMode)
throws IOException {

if (iDatabaseName == null || iDatabaseName.length() <= 0) {
final String message = "Cannot create unnamed remote storage. Check your syntax";
Expand Down Expand Up @@ -396,7 +397,7 @@ public synchronized OServerAdmin freezeDatabase(final String storageType) throws
@Override
public Void execute(OChannelBinaryAsynchClient network) throws IOException {
try {
storage.beginRequest(network,OChannelBinaryProtocol.REQUEST_DB_FREEZE);
storage.beginRequest(network, OChannelBinaryProtocol.REQUEST_DB_FREEZE);
network.writeString(storage.getName());
network.writeString(storageType);
} finally {
Expand All @@ -406,7 +407,7 @@ public Void execute(OChannelBinaryAsynchClient network) throws IOException {
storage.getResponse(network);
return null;
}
},"Cannot freeze the remote storage: " + storage.getName());
}, "Cannot freeze the remote storage: " + storage.getName());

return this;
}
Expand Down Expand Up @@ -499,7 +500,6 @@ public Void execute(final OChannelBinaryAsynchClient network) throws IOException
storage.endRequest(network);
}


storage.getResponse(network);
return null;
}
Expand Down Expand Up @@ -532,14 +532,15 @@ public ODocument clusterStatus() {
* @return The instance itself. Useful to execute method in chain
* @throws IOException
*/
public synchronized OServerAdmin copyDatabase(final String databaseName, final String iDatabaseUserName, final String iDatabaseUserPassword, final String iRemoteName, final String iRemoteEngine) throws IOException {
public synchronized OServerAdmin copyDatabase(final String databaseName, final String iDatabaseUserName,
final String iDatabaseUserPassword, final String iRemoteName, final String iRemoteEngine) throws IOException {

networkAdminOperation(new OStorageRemoteOperation<Void>() {
@Override
public Void execute(final OChannelBinaryAsynchClient network) throws IOException {

try {
storage.beginRequest(network,OChannelBinaryProtocol.REQUEST_DB_COPY);
storage.beginRequest(network, OChannelBinaryProtocol.REQUEST_DB_COPY);
network.writeString(databaseName);
network.writeString(iDatabaseUserName);
network.writeString(iDatabaseUserPassword);
Expand All @@ -564,7 +565,7 @@ public synchronized Map<String, String> getGlobalConfigurations() throws IOExcep
@Override
public Map<String, String> execute(OChannelBinaryAsynchClient network) throws IOException {
final Map<String, String> config = new HashMap<String, String>();
storage.beginRequest(network,OChannelBinaryProtocol.REQUEST_CONFIG_LIST);
storage.beginRequest(network, OChannelBinaryProtocol.REQUEST_CONFIG_LIST);
storage.endRequest(network);

try {
Expand All @@ -586,7 +587,7 @@ public synchronized String getGlobalConfiguration(final OGlobalConfiguration con
return networkAdminOperation(new OStorageRemoteOperation<String>() {
@Override
public String execute(OChannelBinaryAsynchClient network) throws IOException {
storage.beginRequest(network,OChannelBinaryProtocol.REQUEST_CONFIG_GET);
storage.beginRequest(network, OChannelBinaryProtocol.REQUEST_CONFIG_GET);
network.writeString(config.getKey());
network.endRequest();

Expand All @@ -597,10 +598,11 @@ public String execute(OChannelBinaryAsynchClient network) throws IOException {
storage.endResponse(network);
}
}
},"Cannot retrieve the configuration value: " + config.getKey());
}, "Cannot retrieve the configuration value: " + config.getKey());
}

public synchronized OServerAdmin setGlobalConfiguration(final OGlobalConfiguration config, final Object iValue) throws IOException {
public synchronized OServerAdmin setGlobalConfiguration(final OGlobalConfiguration config, final Object iValue)
throws IOException {

networkAdminOperation(new OStorageRemoteOperation<Void>() {
@Override
Expand Down Expand Up @@ -637,8 +639,8 @@ public boolean isConnected() {
}

protected ODocument sendRequest(final byte iRequest, final ODocument iPayLoad, final String iActivity) {
//Using here networkOperation because the original retry logic was lik networkOperation
storage.setSessionId(getURL(),sessionId,sessionToken);
// Using here networkOperation because the original retry logic was lik networkOperation
storage.setSessionId(getURL(), sessionId, sessionToken);
return storage.networkOperation(new OStorageRemoteOperation<ODocument>() {
@Override
public ODocument execute(OChannelBinaryAsynchClient network) throws IOException {
Expand Down Expand Up @@ -676,17 +678,16 @@ private boolean handleDBFreeze() {

protected <T> T networkAdminOperation(final OStorageRemoteOperation<T> operation, final String errorMessage) {

OChannelBinaryAsynchClient network=null;
try {
storage.setSessionId(getURL(),sessionId,sessionToken);
//TODO:replace this api with one that get connection for only the specified url.
network = storage.getAvailableNetwork(getURL());
return operation.execute(network);
} catch (Exception e) {
storage.close(true, false);
throw OException.wrapException(new OStorageException(errorMessage), e);
}
OChannelBinaryAsynchClient network = null;
try {
storage.setSessionId(getURL(), sessionId, sessionToken);
// TODO:replace this api with one that get connection for only the specified url.
network = storage.getAvailableNetwork(getURL());
return operation.execute(network);
} catch (Exception e) {
storage.close(true, false);
throw OException.wrapException(new OStorageException(errorMessage), e);
}
}


}
Expand Up @@ -19,22 +19,6 @@
*/
package com.orientechnologies.orient.client.remote;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

import javax.naming.NamingException;
import javax.naming.directory.Attribute;
import javax.naming.directory.Attributes;
import javax.naming.directory.DirContext;
import javax.naming.directory.InitialDirContext;

import com.orientechnologies.common.concur.OOfflineNodeException;
import com.orientechnologies.common.concur.lock.OModificationOperationProhibitedException;
import com.orientechnologies.common.exception.OException;
Expand All @@ -59,13 +43,7 @@
import com.orientechnologies.orient.core.db.record.ORecordOperation;
import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OBonsaiCollectionPointer;
import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OSBTreeCollectionManager;
import com.orientechnologies.orient.core.exception.OCommandExecutionException;
import com.orientechnologies.orient.core.exception.OConfigurationException;
import com.orientechnologies.orient.core.exception.ODatabaseException;
import com.orientechnologies.orient.core.exception.ORecordNotFoundException;
import com.orientechnologies.orient.core.exception.OSecurityException;
import com.orientechnologies.orient.core.exception.OStorageException;
import com.orientechnologies.orient.core.exception.OTransactionException;
import com.orientechnologies.orient.core.exception.*;
import com.orientechnologies.orient.core.id.ORID;
import com.orientechnologies.orient.core.id.ORecordId;
import com.orientechnologies.orient.core.metadata.security.OTokenException;
Expand All @@ -78,20 +56,24 @@
import com.orientechnologies.orient.core.serialization.serializer.stream.OStreamSerializerAnyStreamable;
import com.orientechnologies.orient.core.sql.query.OLiveQuery;
import com.orientechnologies.orient.core.sql.query.OLiveResultListener;
import com.orientechnologies.orient.core.storage.OCluster;
import com.orientechnologies.orient.core.storage.OPhysicalPosition;
import com.orientechnologies.orient.core.storage.ORawBuffer;
import com.orientechnologies.orient.core.storage.ORecordCallback;
import com.orientechnologies.orient.core.storage.ORecordMetadata;
import com.orientechnologies.orient.core.storage.OStorageAbstract;
import com.orientechnologies.orient.core.storage.OStorageOperationResult;
import com.orientechnologies.orient.core.storage.OStorageProxy;
import com.orientechnologies.orient.core.storage.*;
import com.orientechnologies.orient.core.storage.impl.local.paginated.ORecordSerializationContext;
import com.orientechnologies.orient.core.tx.OTransaction;
import com.orientechnologies.orient.core.tx.OTransactionAbstract;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryAsynchClient;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol;

import javax.naming.NamingException;
import javax.naming.directory.Attribute;
import javax.naming.directory.Attributes;
import javax.naming.directory.DirContext;
import javax.naming.directory.InitialDirContext;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.*;

/**
* This object is bound to each remote ODatabase instances.
*/
Expand Down Expand Up @@ -2080,7 +2062,7 @@ protected OChannelBinaryAsynchClient beginRequest(final byte iCommand) throws IO
return beginRequest(getAvailableNetwork(getNextAvailableServerURL(false)), iCommand);
}

protected OChannelBinaryAsynchClient beginRequest(final OChannelBinaryAsynchClient network, final byte iCommand)
public OChannelBinaryAsynchClient beginRequest(final OChannelBinaryAsynchClient network, final byte iCommand)
throws IOException {
final OStorageRemoteThreadLocal instance = OStorageRemoteThreadLocal.INSTANCE;
network.beginRequest(iCommand,instance.get(),tokens.get(network.getServerURL()));
Expand Down Expand Up @@ -2189,7 +2171,7 @@ protected OChannelBinaryAsynchClient getAvailableNetwork(final String iCurrentUR
/**
* Starts listening the response.
*/
protected void beginResponse(final OChannelBinaryAsynchClient iNetwork) throws IOException {
public void beginResponse(final OChannelBinaryAsynchClient iNetwork) throws IOException {
byte[] newToken = iNetwork.beginResponse(getSessionId(), true);
if (newToken != null && newToken.length > 0) {
setSessionId(getServerURL(), getSessionId(), newToken);
Expand Down

0 comments on commit aee8b4a

Please sign in to comment.