Skip to content

Commit

Permalink
Merging of the 2.2.x and develop distributed changes (take 2).
Browse files Browse the repository at this point in the history
  • Loading branch information
SDIPro committed Feb 9, 2017
1 parent e092c9b commit 41edb7e
Show file tree
Hide file tree
Showing 121 changed files with 3,990 additions and 3,118 deletions.
Expand Up @@ -64,6 +64,7 @@
import com.orientechnologies.orient.core.tx.OTransactionAbstract;
import com.orientechnologies.orient.core.tx.OTransactionOptimistic;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol;
import com.orientechnologies.orient.enterprise.channel.binary.ODistributedRedirectException;
import com.orientechnologies.orient.enterprise.channel.binary.OTokenSecurityException;

import javax.naming.NamingException;
Expand Down Expand Up @@ -245,11 +246,15 @@ public <T> T baseNetworkOperation(final OStorageRemoteOperation<T> operation, fi
if (session.commandExecuting)
throw new ODatabaseException(
"Cannot execute the request because an asynchronous operation is in progress. Please use a different connection");


String serverUrl = null;
do {
session.commandExecuting = true;
OChannelBinaryAsynchClient network = null;
String serverUrl = getNextAvailableServerURL(false, session);

if (serverUrl == null)
serverUrl = getNextAvailableServerURL(false, session);

do {
try {
network = getNetwork(serverUrl);
Expand All @@ -270,19 +275,30 @@ public <T> T baseNetworkOperation(final OStorageRemoteOperation<T> operation, fi
}

return operation.execute(network, session);
} catch (ODistributedRedirectException e) {
connectionManager.release(network);
OLogManager.instance()
.debug(this, "Redirecting the request from server '%s' to the server '%s' because %s", e.getFromServer(), e.toString(),
e.getMessage());

// RECONNECT TO THE SERVER SUGGESTED IN THE EXCEPTION
serverUrl = e.getToServerAddress();
} catch (OModificationOperationProhibitedException mope) {
connectionManager.release(network);
handleDBFreeze();
serverUrl = null;
} catch (OTokenException e) {
connectionManager.release(network);
session.removeServerSession(network.getServerURL());
if (--retry <= 0)
throw OException.wrapException(new OStorageException(errorMessage), e);
serverUrl = null;
} catch (OTokenSecurityException e) {
connectionManager.release(network);
session.removeServerSession(network.getServerURL());
if (--retry <= 0)
throw OException.wrapException(new OStorageException(errorMessage), e);
serverUrl = null;
} catch (OOfflineNodeException e) {
connectionManager.release(network);
// Remove the current url because the node is offline
Expand All @@ -293,13 +309,15 @@ public <T> T baseNetworkOperation(final OStorageRemoteOperation<T> operation, fi
// Not thread Safe ...
activeSession.removeServerSession(serverUrl);
}

serverUrl = null;
} catch (IOException e) {
connectionManager.release(network);
retry = handleIOException(retry, network, e);
serverUrl = null;
} catch (OIOException e) {
connectionManager.release(network);
retry = handleIOException(retry, network, e);
serverUrl = null;
} catch (OException e) {
connectionManager.release(network);
throw e;
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/java/com/orientechnologies/common/io/OUtils.java
@@ -1,6 +1,6 @@
/*
*
* * Copyright 2010-2016 OrientDB LTD (http://orientdb.com)
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
Expand All @@ -14,12 +14,20 @@
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://orientdb.com
* * For more information: http://www.orientechnologies.com
*
*/
package com.orientechnologies.common.io;

import com.orientechnologies.orient.core.serialization.serializer.OStringSerializerHelper;

public class OUtils {
public static String getDatabaseNameFromURL(final String name) {
if (OStringSerializerHelper.contains(name, '/'))
return name.substring(name.lastIndexOf("/") + 1);
return name;
}

public static boolean equals(final Object a, final Object b) {
if (a == b)
return true;
Expand Down
Expand Up @@ -53,6 +53,40 @@ public static <T> int indexOf(final List<T> list, final T object, final Comparat
return -1;
}

/**
* This method is used to find an item in an array.
*
* @param array Array in which value should be found.
* @param object Object to find.
*
* @return Index of found item or <code>-1</code> otherwise.
*/
public static int indexOf(final Object[] array, final Comparable object) {
for (int i = 0; i < array.length; ++i) {
if (object.compareTo(array[i]) == 0)
// FOUND
return i;
}
return -1;
}

/**
* This method is used to find a number in an array.
*
* @param array Array of integers in which value should be found.
* @param object number to find.
*
* @return Index of found item or <code>-1</code> otherwise.
*/
public static int indexOf(final int[] array, final int object) {
for (int i = 0; i < array.length; ++i) {
if (array[i] == object)
// FOUND
return i;
}
return -1;
}

/**
* Create a string representation of all objects in the given Iterable. example : [value1,value2,value3]
*
Expand Down
Expand Up @@ -21,8 +21,8 @@

/**
* Interface to know if the command must be distributed in clustered scenario.
*
* @author Luca Garulli (l.garulli--(at)--orientdb.com)
*
* @author Luca Garulli
*/
public interface OCommandDistributedReplicateRequest {

Expand Down Expand Up @@ -77,4 +77,9 @@ enum QUORUM_TYPE {
* Returns the undo command if any.
*/
String getUndoCommand();

/**
* Returns true if the command is executed on local node first and then distributed, or false if it's executed to all the servers at the same time.
*/
boolean isDistributedExecutingOnLocalNodeFirst();
}
Expand Up @@ -201,4 +201,7 @@ else if (nodeResult instanceof OIdentifiable) {
return aggregatedResult;
}

public boolean isDistributedExecutingOnLocalNodeFirst(){
return true;
}
}
Expand Up @@ -659,6 +659,9 @@ public void change(final Object iCurrentValue, final Object iNewValue) {
DISTRIBUTED_CRUD_TASK_SYNCH_TIMEOUT("distributed.crudTaskTimeout", "Maximum timeout (in ms) to wait for CRUD remote tasks",
Long.class, 3000l, true),

DISTRIBUTED_MAX_STARTUP_DELAY("distributed.maxStartupDelay", "Maximum delay time (in ms) to wait for a server to start",
Long.class, 10000l, true),

DISTRIBUTED_COMMAND_TASK_SYNCH_TIMEOUT("distributed.commandTaskTimeout",
"Maximum timeout (in ms) to wait for command distributed tasks", Long.class, 2 * 60 * 1000l, true),

Expand Down Expand Up @@ -784,6 +787,12 @@ public void change(final Object iCurrentValue, final Object iNewValue) {
"Directory where the copy of an existent database is saved, before it is downloaded from the cluster. Leave it empty to avoid the backup.",
String.class, "../backup/databases"),

/**
* @Since 2.2.15
*/
@OApi(maturity = OApi.MATURITY.NEW)DISTRIBUTED_BACKUP_TRY_INCREMENTAL_FIRST("distributed.backupTryIncrementalFirst",
"Try to execute an incremental backup first.", Boolean.class, true),

/**
* @Since 2.1
*/
Expand Down
Expand Up @@ -101,13 +101,11 @@ public static <T> Object executeAsDefault(final Callable<T> iCallback) {
public void setRunMode(final RUN_MODE value) {
final RunContext context = get();
context.runMode = value;
super.set(context);
}

public void setInDatabaseLock(final boolean value) {
final RunContext context = get();
context.inDatabaseLock = value;
super.set(context);
}

public RUN_MODE getRunMode() {
Expand All @@ -123,10 +121,7 @@ public boolean isInDatabaseLock() {
}

@Override
public RunContext get() {
RunContext result = super.get();
if (result == null)
result = new RunContext();
return result;
protected RunContext initialValue() {
return new RunContext();
}
}
Expand Up @@ -19,6 +19,7 @@
*/
package com.orientechnologies.orient.core.db.record;

import com.orientechnologies.orient.core.id.ORID;
import com.orientechnologies.orient.core.record.ORecord;

/**
Expand Down Expand Up @@ -75,6 +76,10 @@ public ORecord getRecord() {
return record != null ? record.getRecord() : null;
}

public ORID getRID() {
return record != null ? record.getIdentity() : null;
}

public static String getName(final int type) {
String operation = "?";
switch (type) {
Expand Down
Expand Up @@ -52,4 +52,8 @@ public boolean equals(final Object obj) {

return rid != null ? rid.equals(((ORecordNotFoundException) obj).rid) : ((ORecordNotFoundException) obj).rid.equals(rid);
}

public ORID getRid() {
return rid;
}
}
Expand Up @@ -881,18 +881,18 @@ public void dropProperty(final String propertyName) {
if (storage instanceof OStorageProxy) {
database.command(new OCommandSQL("drop property " + name + '.' + propertyName)).execute();
} else if (isDistributedCommand()) {
final OCommandSQL commandSQL = new OCommandSQL("drop property " + name + '.' + propertyName);
commandSQL.addExcludedNode(((OAutoshardedStorage) storage).getNodeId());

database.command(commandSQL).execute();

OScenarioThreadLocal.executeAsDistributed(new Callable<OProperty>() {
@Override
public OProperty call() throws Exception {
dropPropertyInternal(propertyName);
return null;
}
});

final OCommandSQL commandSQL = new OCommandSQL("drop property " + name + '.' + propertyName);
commandSQL.addExcludedNode(((OAutoshardedStorage) storage).getNodeId());

database.command(commandSQL).execute();
} else
OScenarioThreadLocal.executeAsDistributed(new Callable<OProperty>() {
@Override
Expand Down Expand Up @@ -2578,17 +2578,19 @@ private OProperty addProperty(final String propertyName, final OType type, final

return getProperty(propertyName);
} else if (isDistributedCommand()) {
final OCommandSQL commandSQL = new OCommandSQL(cmd.toString());
commandSQL.addExcludedNode(((OAutoshardedStorage) storage).getNodeId());

database.command(commandSQL).execute();

return (OProperty) OScenarioThreadLocal.executeAsDistributed(new Callable<OProperty>() {
final OProperty prop = (OProperty) OScenarioThreadLocal.executeAsDistributed(new Callable<OProperty>() {
@Override
public OProperty call() throws Exception {
return addPropertyInternal(propertyName, type, linkedType, linkedClass, unsafe);
}
});

final OCommandSQL commandSQL = new OCommandSQL(cmd.toString());
commandSQL.addExcludedNode(((OAutoshardedStorage) storage).getNodeId());

database.command(commandSQL).execute();

return prop;
} else
return (OProperty) OScenarioThreadLocal.executeAsDistributed(new Callable<OProperty>() {
@Override
Expand Down
Expand Up @@ -447,7 +447,7 @@ public void dropClass(final String className) {
cmd.append(className);
cmd.append(" unsafe");

if (isDistributedCommand()) {
if (executeThroughDistributedStorage()) {
final OAutoshardedStorage autoshardedStorage = (OAutoshardedStorage) storage;
OCommandSQL commandSQL = new OCommandSQL(cmd.toString());
commandSQL.addExcludedNode(autoshardedStorage.getNodeId());
Expand Down Expand Up @@ -806,6 +806,9 @@ public void create(final ODatabaseDocumentInternal database) {
@Override
public void close() {
classes.clear();
clustersToClasses.clear();
blobClusters.clear();
properties.clear();
document.clear();
}

Expand Down Expand Up @@ -885,7 +888,7 @@ protected OGlobalProperty findOrCreateGlobalProperty(final String name, final OT
return global;
}

private OClass doCreateClass(final String className, final int[] clusterIds, int retry, OClass... superClasses)
private OClass doCreateClass(final String className, int[] clusterIds, int retry, OClass... superClasses)
throws ClusterIdsAreEmptyException {
OClass result;

Expand All @@ -904,9 +907,13 @@ private OClass doCreateClass(final String className, final int[] clusterIds, int
if (classes.containsKey(key) && retry == 0)
throw new OSchemaException("Class '" + className + "' already exists in current database");

if (!isDistributedCommand())
if (!executeThroughDistributedStorage())
checkClustersAreAbsent(clusterIds);

if (clusterIds == null || clusterIds.length == 0) {
clusterIds = createClusters(className, getDatabase().getStorage().getConfiguration().getMinimumClusters());
}

cmd = new StringBuilder("create class ");
if (getDatabase().getStorage().getConfiguration().isStrictSql())
cmd.append('`');
Expand Down Expand Up @@ -947,7 +954,7 @@ private OClass doCreateClass(final String className, final int[] clusterIds, int
}
}

if (isDistributedCommand()) {
if (executeThroughDistributedStorage()) {
createClassInternal(className, clusterIds, superClassesList);

final OAutoshardedStorage autoshardedStorage = (OAutoshardedStorage) storage;
Expand Down Expand Up @@ -1027,7 +1034,7 @@ private OClass doCreateClass(final String className, final int clusters, final i
cmd.append(clusters);
}

if (isDistributedCommand()) {
if (executeThroughDistributedStorage()) {

final int[] clusterIds = createClusters(className, clusters);
createClassInternal(className, clusterIds, superClassesList);
Expand Down Expand Up @@ -1064,7 +1071,7 @@ private OClass doCreateClass(final String className, final int clusters, final i
return result;
}

private boolean isDistributedCommand() {
private boolean executeThroughDistributedStorage() {
return getDatabase().getStorage() instanceof OAutoshardedStorage && !OScenarioThreadLocal.INSTANCE.isRunModeDistributed();
}

Expand Down
Expand Up @@ -163,7 +163,7 @@ public Object execute(final Map<Object, Object> iArgs) {
}
cls.set(attribute, value);

return null;
return Boolean.TRUE;
}

@Override
Expand Down

0 comments on commit 41edb7e

Please sign in to comment.