Skip to content

Commit

Permalink
Distributed: fixed query with sharding and missing clusters because n…
Browse files Browse the repository at this point in the history
…odes down
  • Loading branch information
lvca committed Jul 7, 2015
1 parent aef4b8f commit 0359999
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 21 deletions.
Expand Up @@ -66,7 +66,7 @@ public abstract class ODistributedAbstractPlugin extends OServerPluginAbstract i
protected File defaultDatabaseConfigFile;
protected ConcurrentHashMap<String, ODistributedStorage> storages = new ConcurrentHashMap<String, ODistributedStorage>();

public static Object runInDistributedMode(Callable iCall) throws Exception {
public static Object runInDistributedMode(final Callable iCall) throws Exception {
final OScenarioThreadLocal.RUN_MODE currentRunningMode = OScenarioThreadLocal.INSTANCE.get();
if (currentRunningMode != OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED)
OScenarioThreadLocal.INSTANCE.set(OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED);
Expand All @@ -76,7 +76,21 @@ public static Object runInDistributedMode(Callable iCall) throws Exception {
} finally {

if (currentRunningMode != OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED)
OScenarioThreadLocal.INSTANCE.set(OScenarioThreadLocal.RUN_MODE.DEFAULT);
OScenarioThreadLocal.INSTANCE.set(currentRunningMode);
}
}

public static Object runInDefaultMode(final Callable iCall) throws Exception {
final OScenarioThreadLocal.RUN_MODE currentRunningMode = OScenarioThreadLocal.INSTANCE.get();
if (currentRunningMode != OScenarioThreadLocal.RUN_MODE.DEFAULT)
OScenarioThreadLocal.INSTANCE.set(OScenarioThreadLocal.RUN_MODE.DEFAULT);

try {
return iCall.call();
} finally {

if (currentRunningMode != OScenarioThreadLocal.RUN_MODE.DEFAULT)
OScenarioThreadLocal.INSTANCE.set(currentRunningMode);
}
}

Expand Down
Expand Up @@ -19,6 +19,17 @@
*/
package com.orientechnologies.orient.server.distributed;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

import com.orientechnologies.common.concur.ONeedRetryException;
import com.orientechnologies.common.concur.resource.OSharedResourceAdaptiveExternal;
import com.orientechnologies.common.exception.OException;
Expand Down Expand Up @@ -78,17 +89,6 @@
import com.orientechnologies.orient.server.distributed.ODistributedRequest.EXECUTION_MODE;
import com.orientechnologies.orient.server.distributed.task.*;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

/**
* Distributed storage implementation that routes to the owner node the request.
*
Expand Down Expand Up @@ -333,23 +333,39 @@ else if (result instanceof Throwable)
}
}

protected Map<String, Object> executeOnServers(OCommandRequestText iCommand, Collection<String> involvedClusters,
Map<String, Collection<String>> nodeClusterMap) {
protected Map<String, Object> executeOnServers(final OCommandRequestText iCommand, final Collection<String> involvedClusters,
final Map<String, Collection<String>> nodeClusterMap) {

final Map<String, Object> results = new HashMap<String, Object>(nodeClusterMap.size());

// EXECUTE DIFFERENT TASK ON EACH SERVER
final List<String> nodes = new ArrayList<String>(1);
for (Map.Entry<String, Collection<String>> c : nodeClusterMap.entrySet()) {
final String nodeName = c.getKey();

final OAbstractCommandTask task = iCommand instanceof OCommandScript ? new OScriptTask(iCommand) : new OSQLCommandTask(
iCommand, c.getValue());
task.setResultStrategy(OAbstractRemoteTask.RESULT_STRATEGY.ANY);
if (!dManager.isNodeAvailable(nodeName, getName())) {

nodes.clear();
nodes.add(c.getKey());
ODistributedServerLog.warn(this, dManager.getLocalNodeName(), nodeName, ODistributedServerLog.DIRECTION.OUT,
"Node '%s' is involved in the command '%s' against database '%s', but the node is not active. Excluding it", nodeName,
iCommand, wrapped.getName());

results.put(c.getKey(), dManager.sendRequest(getName(), involvedClusters, nodes, task, EXECUTION_MODE.RESPONSE));
} else {

final OAbstractCommandTask task = iCommand instanceof OCommandScript ? new OScriptTask(iCommand) : new OSQLCommandTask(
iCommand, c.getValue());
task.setResultStrategy(OAbstractRemoteTask.RESULT_STRATEGY.ANY);

nodes.clear();
nodes.add(nodeName);

results.put(nodeName, dManager.sendRequest(getName(), involvedClusters, nodes, task, EXECUTION_MODE.RESPONSE));

}
}

if (results.isEmpty())
throw new ODistributedException("No active nodes found to execute command: " + iCommand);

return results;
}

Expand Down

0 comments on commit 0359999

Please sign in to comment.