Skip to content

Commit

Permalink
First commit for issue #5049
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Oct 2, 2015
1 parent a2f1c92 commit de95971
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 224 deletions.

Large diffs are not rendered by default.

Expand Up @@ -19,8 +19,21 @@
*/
package com.orientechnologies.orient.server.hazelcast;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;

import com.hazelcast.core.IQueue;
import com.orientechnologies.common.exception.OException;
import com.orientechnologies.common.util.OPair;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
Expand All @@ -44,24 +57,12 @@
import com.orientechnologies.orient.server.distributed.task.OTxTask;
import com.orientechnologies.orient.server.distributed.task.OUpdateRecordTask;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;

/**
* Hazelcast implementation of distributed peer. There is one instance per database. Each node creates own instance to talk with
* each others.
*
*
* @author Luca Garulli (l.garulli--at--orientechnologies.com)
*
*
*/
public class OHazelcastDistributedDatabase implements ODistributedDatabase {

Expand Down Expand Up @@ -111,7 +112,7 @@ public ODistributedResponse send2Nodes(final ODistributedRequest iRequest, final
final ODistributedConfiguration cfg = manager.getDatabaseConfiguration(databaseName);

// TODO: REALLY STILL MATTERS THE NUMBER OF THE QUEUES?
final IQueue<ODistributedRequest>[] reqQueues = getRequestQueues(databaseName, iNodes, iRequest.getTask());
final OPair<String, IQueue<ODistributedRequest>>[] reqQueues = getRequestQueues(databaseName, iNodes, iRequest.getTask());

iRequest.setSenderNodeName(getLocalNodeName());

Expand All @@ -121,21 +122,21 @@ public ODistributedResponse send2Nodes(final ODistributedRequest iRequest, final
availableNodes = 0;
int i = 0;
for (String node : iNodes) {
if (reqQueues[i] != null && manager.isNodeAvailable(node, databaseName))
if (reqQueues[i].getValue() != null && manager.isNodeAvailable(node, databaseName))
availableNodes++;
else {
if (ODistributedServerLog.isDebugEnabled())
ODistributedServerLog.debug(this, getLocalNodeName(), node, DIRECTION.OUT,
"skip expected response from node '%s' for request %s because it's not online (queue=%s)", node, iRequest,
reqQueues[i] != null);
reqQueues[i].getValue() != null);
}
++i;
}
} else {
// EXPECT ANSWER FROM ALL NODES WITH A QUEUE
availableNodes = 0;
for (IQueue<ODistributedRequest> q : reqQueues)
if (q != null)
for (OPair<String, IQueue<ODistributedRequest>> q : reqQueues)
if (q.getValue() != null)
availableNodes++;
}

Expand All @@ -162,6 +163,8 @@ public ODistributedResponse send2Nodes(final ODistributedRequest iRequest, final

final long timeout = OGlobalConfiguration.DISTRIBUTED_QUEUE_TIMEOUT.getValueAsLong();

final int queueMaxSize = OGlobalConfiguration.DISTRIBUTED_QUEUE_MAXSIZE.getValueAsInteger();

try {
requestLock.lock();
try {
Expand All @@ -177,9 +180,24 @@ public ODistributedResponse send2Nodes(final ODistributedRequest iRequest, final
// TODO: CAN I MOVE THIS OUTSIDE?
msgService.registerRequest(iRequest.getId(), currentResponseMgr);

for (IQueue queue : reqQueues) {
if (queue != null)
queue.offer(iRequest, timeout, TimeUnit.MILLISECONDS);
for (OPair<String, IQueue<ODistributedRequest>> entry : reqQueues) {
final IQueue queue = entry.getValue();

if (queue != null) {
System.out.println(String.format("%10s -> %10s queue size: %d", getLocalNodeName(), entry.getKey(), queue.size()));

if (queueMaxSize > 0 && queue.size() > queueMaxSize) {
ODistributedServerLog.warn(this, getLocalNodeName(), iNodes.toString(), DIRECTION.OUT,
"queue has too many messages (%d), treating the node as in stall: trying to restart it...", queue.size());
queue.clear();

manager.unjoinNode(entry.getKey());

} else {
// SEND THE MESSAGE
queue.offer(iRequest, timeout, TimeUnit.MILLISECONDS);
}
}
}

} finally {
Expand Down Expand Up @@ -408,16 +426,17 @@ protected ODistributedResponse waitForResponse(final ODistributedRequest iReques
return currentResponseMgr.getFinalResponse();
}

protected IQueue<ODistributedRequest>[] getRequestQueues(final String iDatabaseName, final Collection<String> nodes,
final OAbstractRemoteTask iTask) {
final IQueue<ODistributedRequest>[] queues = new IQueue[nodes.size()];
protected OPair<String, IQueue<ODistributedRequest>>[] getRequestQueues(final String iDatabaseName,
final Collection<String> nodes, final OAbstractRemoteTask iTask) {
final OPair<String, IQueue<ODistributedRequest>>[] queues = new OPair[nodes.size()];

int i = 0;

// GET ALL THE EXISTENT QUEUES
for (String node : nodes) {
final String queueName = OHazelcastDistributedMessageService.getRequestQueueName(node, iDatabaseName);
final IQueue<ODistributedRequest> queue = msgService.getQueue(queueName);
queues[i++] = queue;
queues[i++] = new OPair<String, IQueue<ODistributedRequest>>(node, queue);
}

return queues;
Expand Down Expand Up @@ -446,7 +465,7 @@ protected void restoreMessagesBeforeFailure(final boolean iRestoreMessages) {
/**
* Checks if last pending operation must be re-executed or not. In some circustamces the exception
* OHotAlignmentNotPossibleExeption is raised because it's not possible to recover the database state.
*
*
* @throws OHotAlignmentNotPossibleExeption
*/
protected void hotAlignmentError(final ODistributedRequest iLastPendingRequest, final String iMessage, final Object... iParams)
Expand Down Expand Up @@ -494,7 +513,10 @@ protected void checkLocalNodeInConfiguration() {
}

protected void removeNodeInConfiguration(final String iNode, final boolean iForce) {
final Lock lock = manager.getLock("orientdb.clusterEvents");
lock.lock();
try {

// GET DATABASE CFG
final ODistributedConfiguration cfg = manager.getDatabaseConfiguration(databaseName);

Expand All @@ -516,6 +538,9 @@ protected void removeNodeInConfiguration(final String iNode, final boolean iForc
} catch (Exception e) {
ODistributedServerLog.debug(this, getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE,
"unable to remove node or change mastership for '%s' in distributed configuration, db=%s", e, iNode, databaseName);

} finally {
lock.unlock();
}
}

Expand Down
Expand Up @@ -36,6 +36,7 @@
import com.orientechnologies.orient.server.distributed.ODistributedResponseManager;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION;
import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask;

import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -110,10 +111,19 @@ public void run() {

if (message != null) {
senderNode = message.getSenderNodeName();
final long responseTime = dispatchResponseToThread(message);

if (responseTime > -1)
collectMetric(responseTime);
final long reqId = message.getRequestId();
if (reqId < 0) {
// REQUEST
final OAbstractRemoteTask task = (OAbstractRemoteTask) message.getPayload();
task.execute(manager.getServerInstance(), manager, null);
} else {
// RESPONSE
final long responseTime = dispatchResponseToThread(message);

if (responseTime > -1)
collectMetric(responseTime);
}
}

} catch (InterruptedException e) {
Expand Down

0 comments on commit de95971

Please sign in to comment.