Skip to content

Commit

Permalink
Issue #5405: make OrientGraphNoTx.addEdge() reliable & support new tx…
Browse files Browse the repository at this point in the history
…RequiredForSQLGraphOperations to decide when SQL operations against graph require a transaction
  • Loading branch information
lvca committed Dec 1, 2015
1 parent b0a8a96 commit 7a0f1e2
Show file tree
Hide file tree
Showing 24 changed files with 746 additions and 285 deletions.
Expand Up @@ -19,10 +19,7 @@
*/
package com.orientechnologies.common.concur;

import com.orientechnologies.common.exception.OErrorCode;
import com.orientechnologies.orient.core.exception.OCoreException;
import com.orientechnologies.orient.core.storage.impl.local.paginated.atomicoperations.ONestedRollbackException;
import com.orientechnologies.orient.core.storage.impl.local.paginated.base.ODurableComponent;

/**
* Abstract base exception to extend for all the exception that report to the user it has been thrown but re-executing it could
Expand All @@ -34,11 +31,11 @@
public abstract class ONeedRetryException extends OCoreException {
private static final long serialVersionUID = 1L;

public ONeedRetryException(ONeedRetryException exception) {
protected ONeedRetryException(ONeedRetryException exception) {
super(exception);
}

public ONeedRetryException(String message) {
protected ONeedRetryException(String message) {
super(message);
}
}
Expand Up @@ -24,7 +24,7 @@ public abstract class OException extends RuntimeException {

private static final long serialVersionUID = 3882447822497861424L;

public static OException wrapException(OException exception, Exception cause) {
public static OException wrapException(final OException exception, final Throwable cause) {
if (cause instanceof OHighLevelException)
return (OException) cause;

Expand All @@ -40,21 +40,21 @@ public OException(final String message) {
* This constructor is needed to restore and reproduce exception on client side in case of remote storage exception handling.
* Please create "copy constructor" for each exception which has current one as a parent.
*/
public OException(OException exception) {
public OException(final OException exception) {
super(exception.getMessage(), exception);
}

/**
* Passing of root exceptions directly is prohibited use {@link #wrapException(OException, Exception)} instead.
* Passing of root exceptions directly is prohibited use {@link #wrapException(OException, Throwable)} instead.
*/
private OException(Throwable cause) {
private OException(final Throwable cause) {
super(cause);
}

/**
* Passing of root exceptions directly is prohibited use {@link #wrapException(OException, Exception)} instead.
* Passing of root exceptions directly is prohibited use {@link #wrapException(OException, Throwable)} instead.
*/
private OException(String message, Throwable cause) {
private OException(final String message, final Throwable cause) {
super(message, cause);
}

Expand Down
Expand Up @@ -104,6 +104,7 @@ public class OStorageConfiguration implements OSerializableStream {
private volatile String recordSerializer;
private volatile int recordSerializerVersion;
private volatile boolean strictSQL;
private volatile boolean txRequiredForSQLGraphOperations;
private volatile Map<String, Object> loadProperties;
private volatile ConcurrentMap<String, IndexEngineData> indexEngines;
private volatile transient boolean validation = true;
Expand Down Expand Up @@ -146,6 +147,7 @@ public void clear() {
recordSerializer = null;
recordSerializerVersion = 0;
strictSQL = false;
txRequiredForSQLGraphOperations = true;
indexEngines = new ConcurrentHashMap<String, IndexEngineData>();
validation = OGlobalConfiguration.DB_VALIDATION.getValueAsBoolean();

Expand Down Expand Up @@ -766,14 +768,22 @@ public boolean isStrictSql() {
return strictSQL;
}

public boolean isTxRequiredForSQLGraphOperations() {
return txRequiredForSQLGraphOperations;
}

public List<OStorageEntryConfiguration> getProperties() {
return Collections.unmodifiableList(properties);
}

public void setProperty(final String iName, final String iValue) {
if (OStatement.CUSTOM_STRICT_SQL.equalsIgnoreCase(iName))
// SET STRICT SQL VARIABLE
strictSQL = "true".equalsIgnoreCase("" + iValue);
strictSQL = "true".equalsIgnoreCase(iValue);

if ("txRequiredForSQLGraphOperations".equalsIgnoreCase(iName))
// SET TX SQL GRAPH OPERATIONS
txRequiredForSQLGraphOperations = "true".equalsIgnoreCase(iValue);

for (Iterator<OStorageEntryConfiguration> it = properties.iterator(); it.hasNext();) {
final OStorageEntryConfiguration e = it.next();
Expand Down
Expand Up @@ -35,9 +35,9 @@ public class OIntentMassiveInsert implements OIntent {
private boolean previousRetainRecords;
private boolean previousRetainObjects;
private boolean previousValidation;
private boolean previousTxRequiredForSQLGraphOperations;
private Map<ORecordHook, ORecordHook.HOOK_POSITION> removedHooks;
private OSecurityUser currentUser;

private boolean disableValidation = true;
private boolean disableSecurity = true;
private boolean disableHooks = true;
Expand All @@ -51,6 +51,11 @@ public void begin(final ODatabaseDocumentInternal iDatabase) {
}
ODatabaseInternal<?> ownerDb = iDatabase.getDatabaseOwner();

// DISABLE TX IN GRAPH SQL OPERATIONS
previousTxRequiredForSQLGraphOperations = ownerDb.getStorage().getConfiguration().isTxRequiredForSQLGraphOperations();
if (previousTxRequiredForSQLGraphOperations)
ownerDb.getStorage().getConfiguration().setProperty("txRequiredForSQLGraphOperations", Boolean.FALSE.toString());

if (!enableCache) {
ownerDb.getLocalCache().setEnable(enableCache);
}
Expand Down Expand Up @@ -90,12 +95,15 @@ public void begin(final ODatabaseDocumentInternal iDatabase) {
}

public void end(final ODatabaseDocumentInternal iDatabase) {
ODatabaseInternal<?> ownerDb = iDatabase.getDatabaseOwner();

if (disableSecurity)
if (currentUser != null)
// RE-ENABLE CHECK OF SECURITY
iDatabase.getDatabaseOwner().setUser(currentUser);
ownerDb.setUser(currentUser);

ODatabaseInternal<?> ownerDb = iDatabase.getDatabaseOwner();
if (previousTxRequiredForSQLGraphOperations)
ownerDb.getStorage().getConfiguration().setProperty("txRequiredForSQLGraphOperations", Boolean.TRUE.toString());

if (!enableCache) {
ownerDb.getLocalCache().setEnable(!enableCache);
Expand Down
Expand Up @@ -28,11 +28,8 @@
import com.hazelcast.core.IMap;
import com.hazelcast.core.IQueue;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
<<<<<<< HEAD
import com.orientechnologies.common.exception.OException;
=======
import com.orientechnologies.common.concur.lock.OModificationOperationProhibitedException;
>>>>>>> 5db0482... Fixed issue about backup on distributed node github:5758
import com.orientechnologies.common.exception.OException;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.db.OScenarioThreadLocal;
Expand Down
Expand Up @@ -16,6 +16,19 @@

package com.orientechnologies.orient.server.distributed;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.junit.Assert;

import com.orientechnologies.common.concur.ONeedRetryException;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.db.record.OIdentifiable;
Expand All @@ -33,18 +46,6 @@
import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery;
import com.orientechnologies.orient.core.storage.ORecordDuplicatedException;
import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph;
import org.junit.Assert;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* Insert records concurrently against the cluster
Expand Down Expand Up @@ -165,11 +166,7 @@ protected void checkRecord(ODatabaseDocumentTx database, int i) {
Assert.assertEquals(doc.field("updated"), Boolean.TRUE);
}

<<<<<<< HEAD
private void checkIndex(ODatabaseDocumentTx database, final String key, final ORID rid) {
=======
protected void checkIndex(ODatabaseDocumentTx database, final String key, final ORID rid) {
>>>>>>> 5db0482... Fixed issue about backup on distributed node github:5758
final List<OIdentifiable> result = database.command(new OCommandSQL("select from index:Person.name where key = ?"))
.execute(key);
Assert.assertNotNull(result);
Expand Down
Expand Up @@ -30,28 +30,40 @@
import com.orientechnologies.orient.core.exception.OConcurrentModificationException;
import com.orientechnologies.orient.core.metadata.OMetadataInternal;
import com.orientechnologies.orient.core.metadata.schema.OClass;
import com.orientechnologies.orient.core.sql.*;
import com.orientechnologies.orient.core.sql.OCommandExecutorSQLRetryAbstract;
import com.orientechnologies.orient.core.sql.OCommandParameters;
import com.orientechnologies.orient.core.sql.OCommandSQLParsingException;
import com.orientechnologies.orient.core.sql.OSQLEngine;
import com.orientechnologies.orient.core.sql.OSQLHelper;
import com.orientechnologies.orient.core.sql.functions.OSQLFunctionRuntime;
import com.tinkerpop.blueprints.impls.orient.*;

import java.util.*;
import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph;
import com.tinkerpop.blueprints.impls.orient.OrientEdge;
import com.tinkerpop.blueprints.impls.orient.OrientEdgeType;
import com.tinkerpop.blueprints.impls.orient.OrientGraph;
import com.tinkerpop.blueprints.impls.orient.OrientVertex;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* SQL CREATE EDGE command.
*
* @author Luca Garulli
*/
public class OCommandExecutorSQLCreateEdge extends OCommandExecutorSQLRetryAbstract implements OCommandDistributedReplicateRequest {
public static final String NAME = "CREATE EDGE";
private static final String KEYWORD_BATCH = "BATCH";
public static final String NAME = "CREATE EDGE";
private static final String KEYWORD_BATCH = "BATCH";

private String from;
private String to;
private OClass clazz;
private String edgeLabel;
private String clusterName;
private List<OPair<String, Object>> fields;
private int batch = 100;
private int batch = 100;

@SuppressWarnings("unchecked")
public OCommandExecutorSQLCreateEdge parse(final OCommandRequest iRequest) {
Expand All @@ -62,7 +74,7 @@ public OCommandExecutorSQLCreateEdge parse(final OCommandRequest iRequest) {
try {
// System.out.println("NEW PARSER FROM: " + queryText);
queryText = preParse(queryText, iRequest);
// System.out.println("NEW PARSER TO: " + queryText);
// System.out.println("NEW PARSER TO: " + queryText);
textRequest.setText(queryText);

final ODatabaseDocument database = getDatabase();
Expand Down Expand Up @@ -141,9 +153,10 @@ public Object execute(final Map<Object, Object> iArgs) {
if (clazz == null)
throw new OCommandExecutionException("Cannot execute the command because it has not been parsed yet");

return OGraphCommandExecutorSQLFactory.runInTx(new OGraphCommandExecutorSQLFactory.GraphCallBack<List<Object>>() {
return OGraphCommandExecutorSQLFactory.runInConfiguredTxMode(new OGraphCommandExecutorSQLFactory.GraphCallBack<List<Object>>() {
@Override
public List<Object> call(OrientBaseGraph graph) {

final Set<OIdentifiable> fromIds = OSQLEngine.getInstance().parseRIDTarget(graph.getRawGraph(), from, context, iArgs);
final Set<OIdentifiable> toIds = OSQLEngine.getInstance().parseRIDTarget(graph.getRawGraph(), to, context, iArgs);

Expand All @@ -162,8 +175,6 @@ public List<Object> call(OrientBaseGraph graph) {
toVertex = graph.getVertex(to);
}

final String clsName = clazz.getName();

if (fields != null)
// EVALUATE FIELDS
for (final OPair<String, Object> f : fields) {
Expand Down Expand Up @@ -218,8 +229,10 @@ public List<Object> call(OrientBaseGraph graph) {
edges.add(edge);

if (batch > 0 && edges.size() % batch == 0) {
graph.commit();
((OrientGraph) graph).begin();
if (graph instanceof OrientGraph) {
graph.commit();
((OrientGraph) graph).begin();
}
}
}
}
Expand Down
Expand Up @@ -39,7 +39,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -118,7 +117,7 @@ public Object execute(final Map<Object, Object> iArgs) {
if (clazz == null)
throw new OCommandExecutionException("Cannot execute the command because it has not been parsed yet");

return OGraphCommandExecutorSQLFactory.runInTx(new OGraphCommandExecutorSQLFactory.GraphCallBack<ODocument>() {
return OGraphCommandExecutorSQLFactory.runInConfiguredTxMode(new OGraphCommandExecutorSQLFactory.GraphCallBack<ODocument>() {
@Override
public ODocument call(OrientBaseGraph graph) {
final OrientVertex vertex = graph.addTemporaryVertex(clazz.getName());
Expand Down

0 comments on commit 7a0f1e2

Please sign in to comment.