Skip to content

Commit

Permalink
first poc implementation for transfer operations to the other node
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed Jul 6, 2018
1 parent a734d76 commit 83dfe32
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 58 deletions.
Expand Up @@ -59,41 +59,41 @@ public OScenarioThreadLocal() {
setRunMode(RUN_MODE.DEFAULT); setRunMode(RUN_MODE.DEFAULT);
} }


public static Object executeAsDistributed(final Callable<? extends Object> iCallback) { public static <T> Object executeAsDefault(final Callable<T> iCallback) {
final OScenarioThreadLocal.RUN_MODE currentDistributedMode = OScenarioThreadLocal.INSTANCE.getRunMode(); final OScenarioThreadLocal.RUN_MODE currentDistributedMode = OScenarioThreadLocal.INSTANCE.getRunMode();
if (currentDistributedMode != OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED) if (currentDistributedMode == OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED)
// ASSURE SCHEMA CHANGES ARE NEVER PROPAGATED ON CLUSTER // ASSURE SCHEMA CHANGES ARE NEVER PROPAGATED ON CLUSTER
OScenarioThreadLocal.INSTANCE.setRunMode(OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED); OScenarioThreadLocal.INSTANCE.setRunMode(RUN_MODE.DEFAULT);


try { try {
return iCallback.call(); return (T) iCallback.call();
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {
if (currentDistributedMode != OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED) if (currentDistributedMode == OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED)
// RESTORE PREVIOUS MODE // RESTORE PREVIOUS MODE
OScenarioThreadLocal.INSTANCE.setRunMode(OScenarioThreadLocal.RUN_MODE.DEFAULT); OScenarioThreadLocal.INSTANCE.setRunMode(OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED);
} }
} }


public static <T> Object executeAsDefault(final Callable<T> iCallback) { public static Object executeAsDistributed(final Callable<? extends Object> iCallback) {
final OScenarioThreadLocal.RUN_MODE currentDistributedMode = OScenarioThreadLocal.INSTANCE.getRunMode(); final OScenarioThreadLocal.RUN_MODE currentDistributedMode = OScenarioThreadLocal.INSTANCE.getRunMode();
if (currentDistributedMode == OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED) if (currentDistributedMode != OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED)
// ASSURE SCHEMA CHANGES ARE NEVER PROPAGATED ON CLUSTER // ASSURE SCHEMA CHANGES ARE NEVER PROPAGATED ON CLUSTER
OScenarioThreadLocal.INSTANCE.setRunMode(RUN_MODE.DEFAULT); OScenarioThreadLocal.INSTANCE.setRunMode(OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED);


try { try {
return (T) iCallback.call(); return iCallback.call();
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {
if (currentDistributedMode == OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED) if (currentDistributedMode != OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED)
// RESTORE PREVIOUS MODE // RESTORE PREVIOUS MODE
OScenarioThreadLocal.INSTANCE.setRunMode(OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED); OScenarioThreadLocal.INSTANCE.setRunMode(OScenarioThreadLocal.RUN_MODE.DEFAULT);
} }
} }


Expand Down
Expand Up @@ -11,20 +11,26 @@
import com.orientechnologies.orient.core.compression.impl.OZIPCompressionUtil; import com.orientechnologies.orient.core.compression.impl.OZIPCompressionUtil;
import com.orientechnologies.orient.core.db.*; import com.orientechnologies.orient.core.db.*;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentEmbedded; import com.orientechnologies.orient.core.db.document.ODatabaseDocumentEmbedded;
import com.orientechnologies.orient.core.db.record.OClassTrigger;
import com.orientechnologies.orient.core.db.record.OIdentifiable; import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.db.record.ORecordOperation; import com.orientechnologies.orient.core.db.record.ORecordOperation;
import com.orientechnologies.orient.core.exception.*; import com.orientechnologies.orient.core.exception.*;
import com.orientechnologies.orient.core.hook.ORecordHook;
import com.orientechnologies.orient.core.id.ORID; import com.orientechnologies.orient.core.id.ORID;
import com.orientechnologies.orient.core.id.ORecordId; import com.orientechnologies.orient.core.id.ORecordId;
import com.orientechnologies.orient.core.index.OClassIndexManager;
import com.orientechnologies.orient.core.index.OIndex; import com.orientechnologies.orient.core.index.OIndex;
import com.orientechnologies.orient.core.metadata.OMetadataDefault; import com.orientechnologies.orient.core.metadata.OMetadataDefault;
import com.orientechnologies.orient.core.metadata.schema.OClass; import com.orientechnologies.orient.core.metadata.schema.OClass;
import com.orientechnologies.orient.core.metadata.security.ORole; import com.orientechnologies.orient.core.metadata.schema.OImmutableClass;
import com.orientechnologies.orient.core.metadata.security.ORule; import com.orientechnologies.orient.core.metadata.security.*;
import com.orientechnologies.orient.core.metadata.sequence.OSequenceLibraryProxy;
import com.orientechnologies.orient.core.query.live.OLiveQueryHook; import com.orientechnologies.orient.core.query.live.OLiveQueryHook;
import com.orientechnologies.orient.core.query.live.OLiveQueryHookV2; import com.orientechnologies.orient.core.query.live.OLiveQueryHookV2;
import com.orientechnologies.orient.core.record.ORecord; import com.orientechnologies.orient.core.record.ORecord;
import com.orientechnologies.orient.core.record.impl.ODocument; import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.record.impl.ODocumentInternal;
import com.orientechnologies.orient.core.schedule.OScheduledEvent;
import com.orientechnologies.orient.core.sql.executor.OExecutionPlan; import com.orientechnologies.orient.core.sql.executor.OExecutionPlan;
import com.orientechnologies.orient.core.sql.executor.OResultSet; import com.orientechnologies.orient.core.sql.executor.OResultSet;
import com.orientechnologies.orient.core.storage.ORecordDuplicatedException; import com.orientechnologies.orient.core.storage.ORecordDuplicatedException;
Expand All @@ -41,6 +47,8 @@
import com.orientechnologies.orient.server.distributed.impl.task.OCopyDatabaseChunkTask; import com.orientechnologies.orient.server.distributed.impl.task.OCopyDatabaseChunkTask;
import com.orientechnologies.orient.server.distributed.impl.task.ORunQueryExecutionPlanTask; import com.orientechnologies.orient.server.distributed.impl.task.ORunQueryExecutionPlanTask;
import com.orientechnologies.orient.server.distributed.impl.task.OSyncClusterTask; import com.orientechnologies.orient.server.distributed.impl.task.OSyncClusterTask;
import com.orientechnologies.orient.server.distributed.impl.task.OToLeaderTransactionTask;
import com.orientechnologies.orient.server.distributed.impl.task.transaction.OToLeaderTransactionTaskResponse;
import com.orientechnologies.orient.server.distributed.task.ORemoteTask; import com.orientechnologies.orient.server.distributed.task.ORemoteTask;
import com.orientechnologies.orient.server.hazelcast.OHazelcastPlugin; import com.orientechnologies.orient.server.hazelcast.OHazelcastPlugin;


Expand Down Expand Up @@ -482,43 +490,57 @@ public void internalCommit(OTransactionInternal iTx) {
//Exclusive for handling schema manipulation, remove after refactor for distributed schema //Exclusive for handling schema manipulation, remove after refactor for distributed schema
super.internalCommit(iTx); super.internalCommit(iTx);
} else { } else {
//This is future may handle a retry OToLeaderTransactionTask message = new OToLeaderTransactionTask(iTx.getRecordOperations());
try { ODistributedServerManager dManager = getStorageDistributed().getDistributedManager();
for (ORecordOperation txEntry : iTx.getRecordOperations()) { // SYNCHRONOUS CALL: REPLICATE IT
if (txEntry.type == ORecordOperation.CREATED || txEntry.type == ORecordOperation.UPDATED) { final Set<String> servers = new HashSet<String>();
final ORecord record = txEntry.getRecord(); servers.add(dManager.getLockManagerServer());
if (record instanceof ODocument) ODistributedResponse response = dManager.sendRequest(getName(), null, servers, message, dManager.getNextMessageIdCounter(),
((ODocument) record).validate(); ODistributedRequest.EXECUTION_MODE.RESPONSE, null, null, null);
} for (Map.Entry<ORID, ORID> entry : ((OToLeaderTransactionTaskResponse) response.getPayload()).getNewIds().entrySet()) {
} iTx.updateIdentityAfterCommit(entry.getValue(), entry.getKey());
final ODistributedConfiguration dbCfg = getStorageDistributed().getDistributedConfiguration(); }
ODistributedServerManager dManager = getStorageDistributed().getDistributedManager(); }
final String localNodeName = dManager.getLocalNodeName(); }
getStorageDistributed().checkNodeIsMaster(localNodeName, dbCfg, "Transaction Commit");
ONewDistributedTransactionManager txManager = new ONewDistributedTransactionManager(getStorageDistributed(), dManager, public void realCommit(OTransactionInternal iTx) {
getStorageDistributed().getLocalDistributedDatabase()); //This is future may handle a retry
Set<String> otherNodesInQuorum = txManager try {
.getAvailableNodesButLocal(dbCfg, txManager.getInvolvedClusters(iTx.getRecordOperations()), getLocalNodeName()); for (ORecordOperation txEntry : iTx.getRecordOperations()) {
List<String> online = dManager.getOnlineNodes(getName()); if (txEntry.type == ORecordOperation.CREATED || txEntry.type == ORecordOperation.UPDATED) {
if (online.size() < ((otherNodesInQuorum.size() + 1) / 2) + 1) { final ORecord record = txEntry.getRecord();
throw new ODistributedException("No enough nodes online to execute the operation, online nodes: " + online); if (record instanceof ODocument)
((ODocument) record).validate();
} }
}
final ODistributedConfiguration dbCfg = getStorageDistributed().getDistributedConfiguration();
ODistributedServerManager dManager = getStorageDistributed().getDistributedManager();
final String localNodeName = dManager.getLocalNodeName();
getStorageDistributed().checkNodeIsMaster(localNodeName, dbCfg, "Transaction Commit");
ONewDistributedTransactionManager txManager = new ONewDistributedTransactionManager(getStorageDistributed(), dManager,
getStorageDistributed().getLocalDistributedDatabase());
Set<String> otherNodesInQuorum = txManager
.getAvailableNodesButLocal(dbCfg, txManager.getInvolvedClusters(iTx.getRecordOperations()), getLocalNodeName());
List<String> online = dManager.getOnlineNodes(getName());
if (online.size() < ((otherNodesInQuorum.size() + 1) / 2) + 1) {
throw new ODistributedException("No enough nodes online to execute the operation, online nodes: " + online);
}


((OAbstractPaginatedStorage) getStorage().getUnderlying()).preallocateRids(iTx); ((OAbstractPaginatedStorage) getStorage().getUnderlying()).preallocateRids(iTx);


txManager.commit(this, iTx, getStorageDistributed().getEventListener()); txManager.commit(this, iTx);
return; return;
} catch (OValidationException e) { } catch (OValidationException e) {
throw e; throw e;
} catch (HazelcastInstanceNotActiveException e) { } catch (HazelcastInstanceNotActiveException e) {
throw new OOfflineNodeException("Hazelcast instance is not available"); throw new OOfflineNodeException("Hazelcast instance is not available");


} catch (HazelcastException e) { } catch (HazelcastException e) {
throw new OOfflineNodeException("Hazelcast instance is not available"); throw new OOfflineNodeException("Hazelcast instance is not available");
} catch (Exception e) { } catch (Exception e) {
getStorageDistributed().handleDistributedException("Cannot route TX operation against distributed node", e); getStorageDistributed().handleDistributedException("Cannot route TX operation against distributed node", e);
}
} }

} }


public void acquireLocksForTx(OTransactionInternal tx, ODistributedTxContext txContext) { public void acquireLocksForTx(OTransactionInternal tx, ODistributedTxContext txContext) {
Expand Down Expand Up @@ -684,4 +706,83 @@ public void internalBegin2pc(ONewDistributedTxContextImpl txContext, boolean loc


} }


public void afterCreateOperations(final OIdentifiable id) {
if (id instanceof ODocument) {
ODocument doc = (ODocument) id;
OImmutableClass clazz = ODocumentInternal.getImmutableSchemaClass(this, doc);
if (clazz != null) {
OClassIndexManager.checkIndexesAfterCreate(doc, this);
if (clazz.isFunction()) {
this.getSharedContext().getFunctionLibrary().createdFunction(doc);
Orient.instance().getScriptManager().close(this.getName());
}
if (clazz.isOuser() || clazz.isOrole()) {
getMetadata().getSecurity().incrementVersion();
}
if (clazz.isSequence()) {
((OSequenceLibraryProxy) getMetadata().getSequenceLibrary()).getDelegate().onSequenceCreated(this, doc);
}
if (clazz.isScheduler()) {
getMetadata().getScheduler().scheduleEvent(new OScheduledEvent(doc));
}
if (clazz.isTriggered()) {
OClassTrigger.onRecordAfterCreate(doc, this);
}
}
}
callbackHooks(ORecordHook.TYPE.AFTER_CREATE, id);
}

public void afterUpdateOperations(final OIdentifiable id) {
if (id instanceof ODocument) {
ODocument doc = (ODocument) id;
OImmutableClass clazz = ODocumentInternal.getImmutableSchemaClass(this, doc);
if (clazz != null) {
OClassIndexManager.checkIndexesAfterUpdate((ODocument) id, this);
if (clazz.isFunction()) {
this.getSharedContext().getFunctionLibrary().updatedFunction(doc);
Orient.instance().getScriptManager().close(this.getName());
}
if (clazz.isOuser() || clazz.isOrole()) {
getMetadata().getSecurity().incrementVersion();
}
if (clazz.isSequence()) {
((OSequenceLibraryProxy) getMetadata().getSequenceLibrary()).getDelegate().onSequenceUpdated(this, doc);
}
if (clazz.isTriggered()) {
OClassTrigger.onRecordAfterUpdate(doc, this);
}
}
}
callbackHooks(ORecordHook.TYPE.AFTER_UPDATE, id);
}

public void afterDeleteOperations(final OIdentifiable id) {
if (id instanceof ODocument) {
ODocument doc = (ODocument) id;
OImmutableClass clazz = ODocumentInternal.getImmutableSchemaClass(this, doc);
if (clazz != null) {
OClassIndexManager.checkIndexesAfterDelete(doc, this);
if (clazz.isFunction()) {
this.getSharedContext().getFunctionLibrary().droppedFunction(doc);
Orient.instance().getScriptManager().close(this.getName());
}
if (clazz.isOuser() || clazz.isOrole()) {
getMetadata().getSecurity().incrementVersion();
}
if (clazz.isSequence()) {
((OSequenceLibraryProxy) getMetadata().getSequenceLibrary()).getDelegate().onSequenceDropped(this, doc);
}
if (clazz.isScheduler()) {
final String eventName = doc.field(OScheduledEvent.PROP_NAME);
getSharedContext().getScheduler().removeEventInternal(eventName);
}
if (clazz.isTriggered()) {
OClassTrigger.onRecordAfterDelete(doc, this);
}
}
}
callbackHooks(ORecordHook.TYPE.AFTER_DELETE, id);
}

} }
Expand Up @@ -19,10 +19,7 @@
*/ */
package com.orientechnologies.orient.server.distributed.impl; package com.orientechnologies.orient.server.distributed.impl;


import com.orientechnologies.common.concur.ONeedRetryException;
import com.orientechnologies.common.log.OLogFormatter;
import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.db.OrientDB;
import com.orientechnologies.orient.core.db.record.ORecordOperation; import com.orientechnologies.orient.core.db.record.ORecordOperation;
import com.orientechnologies.orient.core.exception.*; import com.orientechnologies.orient.core.exception.*;
import com.orientechnologies.orient.core.id.ORID; import com.orientechnologies.orient.core.id.ORID;
Expand All @@ -33,9 +30,7 @@
import com.orientechnologies.orient.core.storage.impl.local.paginated.OLocalPaginatedStorage; import com.orientechnologies.orient.core.storage.impl.local.paginated.OLocalPaginatedStorage;
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OLogSequenceNumber; import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OLogSequenceNumber;
import com.orientechnologies.orient.core.tx.OTransaction; import com.orientechnologies.orient.core.tx.OTransaction;
import com.orientechnologies.orient.core.tx.OTransactionIndexChanges;
import com.orientechnologies.orient.core.tx.OTransactionInternal; import com.orientechnologies.orient.core.tx.OTransactionInternal;
import com.orientechnologies.orient.core.tx.OTransactionOptimistic;
import com.orientechnologies.orient.server.distributed.*; import com.orientechnologies.orient.server.distributed.*;
import com.orientechnologies.orient.server.distributed.ODistributedRequest.EXECUTION_MODE; import com.orientechnologies.orient.server.distributed.ODistributedRequest.EXECUTION_MODE;
import com.orientechnologies.orient.server.distributed.impl.task.*; import com.orientechnologies.orient.server.distributed.impl.task.*;
Expand Down Expand Up @@ -65,8 +60,7 @@ public ONewDistributedTransactionManager(final ODistributedStorage storage, fina
this.localDistributedDatabase = iDDatabase; this.localDistributedDatabase = iDDatabase;
} }


public List<ORecordOperation> commit(final ODatabaseDocumentDistributed database, final OTransactionInternal iTx, public void commit(final ODatabaseDocumentDistributed database, final OTransactionInternal iTx) {
final ODistributedStorageEventListener eventListener) {
final String localNodeName = dManager.getLocalNodeName(); final String localNodeName = dManager.getLocalNodeName();


iTx.setStatus(OTransaction.TXSTATUS.BEGUN); iTx.setStatus(OTransaction.TXSTATUS.BEGUN);
Expand Down Expand Up @@ -97,7 +91,7 @@ public List<ORecordOperation> commit(final ODatabaseDocumentDistributed database
if (nodes.isEmpty()) { if (nodes.isEmpty()) {
// NO FURTHER NODES TO INVOLVE // NO FURTHER NODES TO INVOLVE
localOk(requestId, database); localOk(requestId, database);
return null; return;
} }
//TODO:check the lsn //TODO:check the lsn
txTask.setLastLSN(getLsn()); txTask.setLastLSN(getLsn());
Expand All @@ -117,8 +111,7 @@ public List<ORecordOperation> commit(final ODatabaseDocumentDistributed database
handleResponse(requestId, responseManager, involvedClusters, nodes, database); handleResponse(requestId, responseManager, involvedClusters, nodes, database);


// OK, DISTRIBUTED COMMIT SUCCEED // OK, DISTRIBUTED COMMIT SUCCEED
//TODO:Get the list of result from local ok, if is needed otherwise remove the ruturn return;
return null;


} }


Expand Down
Expand Up @@ -4,6 +4,7 @@
import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal; import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal;
import com.orientechnologies.orient.core.db.record.ORecordOperation; import com.orientechnologies.orient.core.db.record.ORecordOperation;
import com.orientechnologies.orient.core.exception.ORecordNotFoundException; import com.orientechnologies.orient.core.exception.ORecordNotFoundException;
import com.orientechnologies.orient.core.id.ORID;
import com.orientechnologies.orient.core.index.OClassIndexManager; import com.orientechnologies.orient.core.index.OClassIndexManager;
import com.orientechnologies.orient.core.metadata.schema.OImmutableClass; import com.orientechnologies.orient.core.metadata.schema.OImmutableClass;
import com.orientechnologies.orient.core.metadata.sequence.OSequenceLibraryProxy; import com.orientechnologies.orient.core.metadata.sequence.OSequenceLibraryProxy;
Expand All @@ -16,6 +17,7 @@


import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;


public class OTransactionOptimisticDistributed extends OTransactionOptimistic { public class OTransactionOptimisticDistributed extends OTransactionOptimistic {
public OTransactionOptimisticDistributed(ODatabaseDocumentInternal database, List<ORecordOperation> changes) { public OTransactionOptimisticDistributed(ODatabaseDocumentInternal database, List<ORecordOperation> changes) {
Expand Down Expand Up @@ -109,4 +111,9 @@ private void resolveTracking(ORecordOperation change) {


} }
} }

@Override
public Map<ORID, ORID> getUpdatedRids() {
return super.getUpdatedRids();
}
} }
Expand Up @@ -144,6 +144,8 @@ public ORemoteTask createTask(final int code) {


case NodeOperationTask.FACTORYID: //55 case NodeOperationTask.FACTORYID: //55
return new NodeOperationTask(); return new NodeOperationTask();
case OToLeaderTransactionTask.FACTORYID: // 30
return new OToLeaderTransactionTask();
} }


throw new IllegalArgumentException("Task with code " + code + " is not supported"); throw new IllegalArgumentException("Task with code " + code + " is not supported");
Expand Down
Expand Up @@ -33,12 +33,13 @@
*/ */
public class ORemoteTaskFactoryManagerImpl implements ORemoteTaskFactoryManager { public class ORemoteTaskFactoryManagerImpl implements ORemoteTaskFactoryManager {
private final ODistributedServerManager dManager; private final ODistributedServerManager dManager;
private ORemoteTaskFactory[] factories = new ODefaultRemoteTaskFactoryV0[2]; private ORemoteTaskFactory[] factories = new ODefaultRemoteTaskFactoryV0[3];


public ORemoteTaskFactoryManagerImpl(final ODistributedServerManager dManager) { public ORemoteTaskFactoryManagerImpl(final ODistributedServerManager dManager) {
this.dManager = dManager; this.dManager = dManager;
factories[0] = new ODefaultRemoteTaskFactoryV0(); factories[0] = new ODefaultRemoteTaskFactoryV0();
factories[1] = new ODefaultRemoteTaskFactoryV1(); factories[1] = new ODefaultRemoteTaskFactoryV1();
factories[2] = new ODefaultRemoteTaskFactoryV2();
} }


@Override @Override
Expand Down

0 comments on commit 83dfe32

Please sign in to comment.