Skip to content

Commit

Permalink
Fixed issue with binary protocol between servers
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Mar 11, 2016
1 parent 5662acb commit aa6ec94
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 106 deletions.
Expand Up @@ -846,6 +846,23 @@ public void memberRemoved(final MembershipEvent iEvent) {
} }
} }


for (String databaseName : getManagedDatabases()) {
final ODatabaseDocumentTx database = (ODatabaseDocumentTx) serverInstance.openDatabase(databaseName, "internal", "internal",
null, true);
ODistributedConfiguration cfg;
try {
// ASSIGN CLUSTERS AT STARTUP
cfg = getDatabaseConfiguration(databaseName);
final boolean distribCfgDirty = rebalanceClusterOwnership(database, cfg);
if (distribCfgDirty) {
OLogManager.instance().info(this, "Distributed configuration modified");
updateCachedDatabaseConfiguration(databaseName, cfg.serialize(), true, true);
}
} finally {
database.close();
}
}

serverInstance.getClientConnectionManager().pushDistribCfg2Clients(getClusterConfiguration()); serverInstance.getClientConnectionManager().pushDistribCfg2Clients(getClusterConfiguration());
} }


Expand Down Expand Up @@ -1474,7 +1491,7 @@ else if (result instanceof Exception) {


@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
final boolean distribCfgDirty = installDbClustersForLocalNode(db, cfg); final boolean distribCfgDirty = rebalanceClusterOwnership(db, cfg);
if (distribCfgDirty) { if (distribCfgDirty) {
OLogManager.instance().warn(this, "Distributed configuration modified"); OLogManager.instance().warn(this, "Distributed configuration modified");
updateCachedDatabaseConfiguration(db.getName(), cfg.serialize(), true, true); updateCachedDatabaseConfiguration(db.getName(), cfg.serialize(), true, true);
Expand Down Expand Up @@ -1575,7 +1592,7 @@ protected void checkDatabaseEvent(final EntryEvent<String, Object> iEvent, final
updateLastClusterChange(); updateLastClusterChange();
} }


protected boolean installDbClustersForLocalNode(final ODatabaseInternal iDatabase, final ODistributedConfiguration cfg) { protected synchronized boolean rebalanceClusterOwnership(final ODatabaseInternal iDatabase, final ODistributedConfiguration cfg) {
final ODistributedConfiguration.ROLES role = cfg.getServerRole(nodeName); final ODistributedConfiguration.ROLES role = cfg.getServerRole(nodeName);
if (role != ODistributedConfiguration.ROLES.MASTER) if (role != ODistributedConfiguration.ROLES.MASTER)
// NO MASTER, DON'T CREATE LOCAL CLUSTERS // NO MASTER, DON'T CREATE LOCAL CLUSTERS
Expand Down Expand Up @@ -1747,7 +1764,7 @@ protected void loadDistributedDatabases() {
try { try {
// ASSIGN CLUSTERS AT STARTUP // ASSIGN CLUSTERS AT STARTUP
cfg = getDatabaseConfiguration(databaseName); cfg = getDatabaseConfiguration(databaseName);
final boolean distribCfgDirty = installDbClustersForLocalNode(database, cfg); final boolean distribCfgDirty = rebalanceClusterOwnership(database, cfg);
if (distribCfgDirty) { if (distribCfgDirty) {
OLogManager.instance().info(this, "Distributed configuration modified"); OLogManager.instance().info(this, "Distributed configuration modified");
updateCachedDatabaseConfiguration(databaseName, cfg.serialize(), true, true); updateCachedDatabaseConfiguration(databaseName, cfg.serialize(), true, true);
Expand Down Expand Up @@ -2089,9 +2106,8 @@ protected void checkForClusterRebalance(final String iDatabaseName) {
} }
} }


private synchronized boolean rebalanceClusterOwnershipOfClass(final ODatabaseInternal iDatabase, private boolean rebalanceClusterOwnershipOfClass(final ODatabaseInternal iDatabase, final ODistributedConfiguration cfg,
final ODistributedConfiguration cfg, final OClass iClass, final Set<String> availableNodes, final OClass iClass, final Set<String> availableNodes, final Set<String> clustersToReassign) {
final Set<String> clustersToReassign) {


if (availableNodes.isEmpty()) if (availableNodes.isEmpty())
return false; return false;
Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.orientechnologies.orient.core.OConstants; import com.orientechnologies.orient.core.OConstants;
import com.orientechnologies.orient.core.config.OContextConfiguration; import com.orientechnologies.orient.core.config.OContextConfiguration;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.exception.OStorageException;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol; import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinarySynchClient; import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinarySynchClient;


Expand All @@ -37,17 +38,16 @@
*/ */
public class ORemoteServerChannel { public class ORemoteServerChannel {
final ODistributedServerManager manager; final ODistributedServerManager manager;
final String server;
final String url; final String url;
final String remoteHost; final String remoteHost;
final int remotePort; final int remotePort;
final String userName; final String userName;
final String userPassword; final String userPassword;
final String server;
OChannelBinarySynchClient channel; OChannelBinarySynchClient channel;


static final String CLIENT_TYPE = "OrientDB Server"; static final String CLIENT_TYPE = "OrientDB Server";
static final boolean COLLECT_STATS = false; static final boolean COLLECT_STATS = false;
private static final int MAX_RETRY = 3;
int sessionId = -1; int sessionId = -1;
byte[] sessionToken; byte[] sessionToken;
OContextConfiguration contextConfig = new OContextConfiguration(); OContextConfiguration contextConfig = new OContextConfiguration();
Expand All @@ -67,57 +67,20 @@ public ORemoteServerChannel(final ODistributedServerManager manager, final Strin
connect(); connect();
} }


protected synchronized <T> T networkOperation(final byte operationId, final OStorageRemoteOperation<T> operation, protected <T> T networkOperation(final byte operationId, final OStorageRemoteOperation<T> operation, final String errorMessage) {
final String errorMessage) { try {
Exception lastException = null; channel.acquireWriteLock();
for (int retry = 1; retry <= MAX_RETRY; ++retry) {
try {
channel.setWaitResponseTimeout();
channel.beginRequest(operationId, sessionId, sessionToken);

final T result = operation.execute();

channel.flush();

return result;

} catch (IOException e) {
// IO EXCEPTION: RETRY THE CONNECTION AND COMMAND
lastException = e;

ODistributedServerLog.warn(this, manager.getLocalNodeName(), server, ODistributedServerLog.DIRECTION.OUT,
"IO Exception during %s (%s). Retrying (%d/%d)...", operation.toString(), lastException.getMessage(), retry, MAX_RETRY);

// DIRTY CONNECTION, CLOSE IT AND RE-ACQUIRE A NEW ONE
for (; retry <= MAX_RETRY; ++retry) {
try {
close();


channel = new OChannelBinarySynchClient(remoteHost, remotePort, null, contextConfig, channel.setWaitResponseTimeout();
OChannelBinaryProtocol.CURRENT_PROTOCOL_VERSION); channel.beginRequest(operationId, sessionId, sessionToken);


authenticate(); return operation.execute();


// OK } catch (Exception e) {
break; // DIRTY CONNECTION, CLOSE IT AND RE-ACQUIRE A NEW ONE

close();
} catch (IOException e1) { throw OException.wrapException(new OStorageException(errorMessage), e);
// IO EXCEPTION: RETRY THE CONNECTION AND COMMAND
lastException = e;

ODistributedServerLog.warn(this, manager.getLocalNodeName(), server, ODistributedServerLog.DIRECTION.OUT,
"IO Exception during %s (%s). Retrying (%d/%d)...", operation.toString(), lastException.getMessage(), retry,
MAX_RETRY);
}
}
}
} }

ODistributedServerLog.warn(this, manager.getLocalNodeName(), server, ODistributedServerLog.DIRECTION.OUT,
"IO Exception during %s (%s)", operation.toString(), lastException.getMessage());

throw OException.wrapException(new ODistributedException(errorMessage),
new IOException("Cannot connect to remote node " + url, lastException));
} }


public interface OStorageRemoteOperation<T> { public interface OStorageRemoteOperation<T> {
Expand All @@ -128,33 +91,32 @@ public void sendRequest(final ODistributedRequest req, final String node) {
networkOperation(OChannelBinaryProtocol.DISTRIBUTED_REQUEST, new OStorageRemoteOperation<Object>() { networkOperation(OChannelBinaryProtocol.DISTRIBUTED_REQUEST, new OStorageRemoteOperation<Object>() {
@Override @Override
public Object execute() throws IOException { public Object execute() throws IOException {
final byte[] serializedRequest;
final ByteArrayOutputStream out = new ByteArrayOutputStream();
try { try {
final ObjectOutputStream outStream = new ObjectOutputStream(out); final byte[] serializedRequest;
final ByteArrayOutputStream out = new ByteArrayOutputStream();
try { try {
req.writeExternal(outStream); final ObjectOutputStream outStream = new ObjectOutputStream(out);
serializedRequest = out.toByteArray(); try {
req.writeExternal(outStream);
serializedRequest = out.toByteArray();


ODistributedServerLog.debug(this, manager.getLocalNodeName(), node, ODistributedServerLog.DIRECTION.OUT, ODistributedServerLog.debug(this, manager.getLocalNodeName(), node, ODistributedServerLog.DIRECTION.OUT,
"Sending request %s (%d bytes)", req, serializedRequest.length); "Sending request %s (%d bytes)", req, serializedRequest.length);


channel.writeBytes(serializedRequest); channel.writeBytes(serializedRequest);


} finally {
outStream.close();
}
} finally { } finally {
outStream.close(); out.close();
} }
} finally { } finally {
out.close(); endRequest();
} }


return null; return null;
} }

@Override
public String toString() {
return "SEND REQUEST";
}
}, "Cannot send distributed request"); }, "Cannot send distributed request");


} }
Expand All @@ -163,32 +125,31 @@ public void sendResponse(final ODistributedResponse response, final String node)
networkOperation(OChannelBinaryProtocol.DISTRIBUTED_RESPONSE, new OStorageRemoteOperation<Object>() { networkOperation(OChannelBinaryProtocol.DISTRIBUTED_RESPONSE, new OStorageRemoteOperation<Object>() {
@Override @Override
public Object execute() throws IOException { public Object execute() throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
try { try {
final ObjectOutputStream outStream = new ObjectOutputStream(out); final ByteArrayOutputStream out = new ByteArrayOutputStream();
try { try {
response.writeExternal(outStream); final ObjectOutputStream outStream = new ObjectOutputStream(out);
final byte[] serializedResponse = out.toByteArray(); try {
response.writeExternal(outStream);
final byte[] serializedResponse = out.toByteArray();


ODistributedServerLog.debug(this, manager.getLocalNodeName(), node, ODistributedServerLog.DIRECTION.OUT, ODistributedServerLog.debug(this, manager.getLocalNodeName(), node, ODistributedServerLog.DIRECTION.OUT,
"Sending response %s (%d bytes)", response, serializedResponse.length); "Sending response %s (%d bytes)", response, serializedResponse.length);


channel.writeBytes(serializedResponse); channel.writeBytes(serializedResponse);


} finally {
outStream.close();
}
} finally { } finally {
outStream.close(); out.close();
} }
} finally { } finally {
out.close(); endRequest();
} }


return null; return null;
} }

@Override
public String toString() {
return "SEND RESPONSE";
}
}, "Cannot send response back to the sender node '" + response.getSenderNodeName() + "'"); }, "Cannot send response back to the sender node '" + response.getSenderNodeName() + "'");


} }
Expand All @@ -200,13 +161,30 @@ public void connect() throws IOException {
networkOperation(OChannelBinaryProtocol.REQUEST_CONNECT, new OStorageRemoteOperation<Void>() { networkOperation(OChannelBinaryProtocol.REQUEST_CONNECT, new OStorageRemoteOperation<Void>() {
@Override @Override
public Void execute() throws IOException { public Void execute() throws IOException {
authenticate(); try {
return null; channel.writeString(CLIENT_TYPE).writeString(OConstants.ORIENT_VERSION)
} .writeShort((short) OChannelBinaryProtocol.CURRENT_PROTOCOL_VERSION).writeString("0");
channel.writeString(ODatabaseDocumentTx.getDefaultSerializer().toString());
channel.writeBoolean(false);
channel.writeBoolean(false); // SUPPORT PUSH
channel.writeBoolean(COLLECT_STATS); // COLLECT STATS

channel.writeString(userName);
channel.writeString(userPassword);

channel.flush();

channel.beginResponse(false);
sessionId = channel.readInt();
sessionToken = channel.readBytes();
if (sessionToken.length == 0) {
sessionToken = null;
}
} finally {
channel.releaseWriteLock();
}


@Override return null;
public String toString() {
return "CONNECT";
} }
}, "Cannot connect to the remote server '" + url + "'"); }, "Cannot connect to the remote server '" + url + "'");
} }
Expand All @@ -216,21 +194,8 @@ public void close() {
channel.close(); channel.close();
} }


private void authenticate() throws IOException { private void endRequest() throws IOException {
channel.writeString(CLIENT_TYPE).writeString(OConstants.ORIENT_VERSION)
.writeShort((short) OChannelBinaryProtocol.CURRENT_PROTOCOL_VERSION).writeString("0");
channel.writeString(ODatabaseDocumentTx.getDefaultSerializer().toString());
channel.writeBoolean(false); // NO TOKEN
channel.writeBoolean(false); // SUPPORT PUSH
channel.writeBoolean(COLLECT_STATS); // COLLECT STATS

channel.writeString(userName);
channel.writeString(userPassword);

channel.flush(); channel.flush();

channel.releaseWriteLock();
sessionToken = channel.beginResponse(false);
if (sessionToken != null && sessionToken.length == 0)
sessionToken = null;
} }
} }

0 comments on commit aa6ec94

Please sign in to comment.