Skip to content

Commit

Permalink
Distributed: transactions now can run concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Jul 16, 2016
1 parent 0cc7c41 commit 6cb9b71
Show file tree
Hide file tree
Showing 16 changed files with 436 additions and 303 deletions.
Expand Up @@ -19,6 +19,16 @@
*/
package com.orientechnologies.orient.server.distributed.impl;

import java.io.*;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;

import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.Member;
Expand Down Expand Up @@ -67,15 +77,6 @@
import com.orientechnologies.orient.server.network.OServerNetworkListener;
import com.orientechnologies.orient.server.plugin.OServerPluginAbstract;

import java.io.*;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;

/**
* Abstract plugin to manage the distributed environment.
*
Expand Down Expand Up @@ -105,7 +106,6 @@ public abstract class ODistributedAbstractPlugin extends OServerPluginAbstract
protected OClusterOwnershipAssignmentStrategy clusterAssignmentStrategy = new ODefaultClusterOwnershipAssignmentStrategy(
this);

protected Map<String, Long> lastLSNWriting = new HashMap<String, Long>();
protected static final int DEPLOY_DB_MAX_RETRIES = 10;
protected Map<String, Member> activeNodes = new ConcurrentHashMap<String, Member>();
protected Map<String, String> activeNodesNamesByMemberId = new ConcurrentHashMap<String, String>();
Expand All @@ -116,15 +116,15 @@ public abstract class ODistributedAbstractPlugin extends OServerPluginAbstract
protected Date startedOn = new Date();
protected ORemoteTaskFactory taskFactory = new ODefaultRemoteTaskFactory();
protected String nodeUuid;
protected ODistributedStrategy responseManagerFactory = new ODefaultDistributedStrategy();
protected ODistributedStrategy responseManagerFactory = new ODefaultDistributedStrategy();

private volatile String lastServerDump = "";
protected CountDownLatch serverStarted = new CountDownLatch(1);

protected abstract ODistributedConfiguration getLastDatabaseConfiguration(String databaseName);

public void waitUntilNodeOnline() throws InterruptedException {
while (!status.equals(NODE_STATUS.ONLINE))
Thread.sleep(100);
serverStarted.await();
}

public void waitUntilNodeOnline(final String nodeName, final String databaseName) throws InterruptedException {
Expand Down Expand Up @@ -603,22 +603,11 @@ public Object call() throws Exception {
else {
// OK
final String sourceNodeName = task.getNodeSource();
Long last = lastLSNWriting.get(sourceNodeName);
if (last == null)
last = 0l;

if (task instanceof OAbstractReplicatedTask && System.currentTimeMillis() - last > 2000) {
final ODistributedDatabaseImpl ddb = getMessageService().getDatabase(database.getName());
final OLogSequenceNumber lastLSN = ((OAbstractReplicatedTask) task).getLastLSN();
if (lastLSN != null) {
ddb.getSyncConfiguration().setLSN(task.getNodeSource(), lastLSN);

ODistributedServerLog.debug(this, nodeName, task.getNodeSource(), DIRECTION.NONE,
"Updating LSN table to the value %s", lastLSN);
final ODistributedDatabaseImpl ddb = getMessageService().getDatabase(database.getName());

lastLSNWriting.put(sourceNodeName, System.currentTimeMillis());
}
}
if (!(result instanceof Throwable) && task instanceof OAbstractReplicatedTask)
// UPDATE LSN WITH LAST OPERATION
ddb.setLSN(sourceNodeName, ((OAbstractReplicatedTask) task).getLastLSN());
}

return result;
Expand Down

0 comments on commit 6cb9b71

Please sign in to comment.