Skip to content

Commit

Permalink
Distributed: refactored the entire locking system to have true parall…
Browse files Browse the repository at this point in the history
…el transactions

This 2.2.6 rocks! Yeah!
  • Loading branch information
lvca committed Jul 26, 2016
1 parent f4789ff commit ce0e4d5
Show file tree
Hide file tree
Showing 18 changed files with 466 additions and 313 deletions.
Expand Up @@ -264,7 +264,7 @@ public <T> T baseNetworkOperation(final OStorageRemoteOperation<T> operation, fi
} }


private int handleIOException(int retry, final OChannelBinaryAsynchClient network, final Exception e) { private int handleIOException(int retry, final OChannelBinaryAsynchClient network, final Exception e) {
OLogManager.instance().info(this, "Caught I/O errors, trying to reconnect (error: %s)", e.getMessage()); OLogManager.instance().info(this, "Caught Network I/O errors, trying an automatic reconnection... (error: %s)", e.getMessage());
OLogManager.instance().debug(this, "I/O error stack: ", e); OLogManager.instance().debug(this, "I/O error stack: ", e);
connectionManager.remove(network); connectionManager.remove(network);
if (--retry <= 0) if (--retry <= 0)
Expand Down
Expand Up @@ -745,7 +745,7 @@ public void onException(Exception e) {
* @see OGlobalConfiguration#STORAGE_TRACK_CHANGED_RECORDS_IN_WAL * @see OGlobalConfiguration#STORAGE_TRACK_CHANGED_RECORDS_IN_WAL
*/ */
public OLogSequenceNumber recordsChangedAfterLSN(final OLogSequenceNumber lsn, final OutputStream stream, public OLogSequenceNumber recordsChangedAfterLSN(final OLogSequenceNumber lsn, final OutputStream stream,
final Set<String> excludedClusterIds) { final Set<String> excludedClusterIds, final OCommandOutputListener outputListener) {
if (!OGlobalConfiguration.STORAGE_TRACK_CHANGED_RECORDS_IN_WAL.getValueAsBoolean()) if (!OGlobalConfiguration.STORAGE_TRACK_CHANGED_RECORDS_IN_WAL.getValueAsBoolean())
throw new IllegalStateException( throw new IllegalStateException(
"Cannot find records which were changed starting from provided LSN because tracking of rids of changed records in WAL is switched off, " "Cannot find records which were changed starting from provided LSN because tracking of rids of changed records in WAL is switched off, "
Expand Down Expand Up @@ -789,6 +789,7 @@ public OLogSequenceNumber recordsChangedAfterLSN(final OLogSequenceNumber lsn, f
OLogSequenceNumber currentLsn = startLsn; OLogSequenceNumber currentLsn = startLsn;


// all information about changed records is contained in atomic operation metadata // all information about changed records is contained in atomic operation metadata
long read = 0;
while (currentLsn != null && endLsn.compareTo(currentLsn) >= 0) { while (currentLsn != null && endLsn.compareTo(currentLsn) >= 0) {
walRecord = writeAheadLog.read(currentLsn); walRecord = writeAheadLog.read(currentLsn);


Expand Down Expand Up @@ -816,6 +817,11 @@ public OLogSequenceNumber recordsChangedAfterLSN(final OLogSequenceNumber lsn, f
} }


currentLsn = writeAheadLog.next(currentLsn); currentLsn = writeAheadLog.next(currentLsn);

read++;

if (outputListener != null)
outputListener.onMessage("read " + read + " records from WAL and collected " + sortedRids.size() + " records");
} }
} finally { } finally {
writeAheadLog.preventCutTill(null); writeAheadLog.preventCutTill(null);
Expand All @@ -832,6 +838,7 @@ public OLogSequenceNumber recordsChangedAfterLSN(final OLogSequenceNumber lsn, f


dataOutputStream.writeLong(sortedRids.size()); dataOutputStream.writeLong(sortedRids.size());


long exportedRecord = 1;
Iterator<ORID> ridIterator = sortedRids.iterator(); Iterator<ORID> ridIterator = sortedRids.iterator();
while (ridIterator.hasNext()) { while (ridIterator.hasNext()) {
final ORID rid = ridIterator.next(); final ORID rid = ridIterator.next();
Expand All @@ -845,9 +852,14 @@ public OLogSequenceNumber recordsChangedAfterLSN(final OLogSequenceNumber lsn, f


OLogManager.instance().debug(this, "Exporting deleted record %s", rid); OLogManager.instance().debug(this, "Exporting deleted record %s", rid);


if (outputListener != null)
outputListener.onMessage("exporting record " + exportedRecord + "/" + sortedRids.size());

// delete to avoid duplication // delete to avoid duplication
ridIterator.remove(); ridIterator.remove();
} }

exportedRecord++;
} }


ridIterator = sortedRids.iterator(); ridIterator = sortedRids.iterator();
Expand Down Expand Up @@ -875,6 +887,11 @@ public OLogSequenceNumber recordsChangedAfterLSN(final OLogSequenceNumber lsn, f
.debug(this, "Exporting modified record rid=%s type=%d size=%d v=%d - buffer size=%d", rid, rawBuffer.recordType, .debug(this, "Exporting modified record rid=%s type=%d size=%d v=%d - buffer size=%d", rid, rawBuffer.recordType,
rawBuffer.buffer.length, rawBuffer.version, dataOutputStream.size()); rawBuffer.buffer.length, rawBuffer.version, dataOutputStream.size());
} }

if (outputListener != null)
outputListener.onMessage("exporting record " + exportedRecord + "/" + sortedRids.size());

exportedRecord++;
} }
} finally { } finally {
dataOutputStream.close(); dataOutputStream.close();
Expand Down
Expand Up @@ -99,8 +99,8 @@ private void incrementalSyncIteration(String buildDirectory) throws Exception {
final OutputStream outputStream = Channels.newOutputStream(channel); final OutputStream outputStream = Channels.newOutputStream(channel);
final OutputStream bufferedOutputStream = new BufferedOutputStream(outputStream); final OutputStream bufferedOutputStream = new BufferedOutputStream(outputStream);


((OAbstractPaginatedStorage) originalDB.getStorage()) ((OAbstractPaginatedStorage) originalDB.getStorage()).recordsChangedAfterLSN(startLSN, bufferedOutputStream,
.recordsChangedAfterLSN(startLSN, bufferedOutputStream, new HashSet<String>()); new HashSet<String>(),null);
bufferedOutputStream.close(); bufferedOutputStream.close();


dataFile.close(); dataFile.close();
Expand Down
Expand Up @@ -36,6 +36,7 @@
import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION;
import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask; import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask;
import com.orientechnologies.orient.server.distributed.task.OAbstractReplicatedTask; import com.orientechnologies.orient.server.distributed.task.OAbstractReplicatedTask;
import com.orientechnologies.orient.server.distributed.task.ODistributedRecordLockedException;
import com.orientechnologies.orient.server.distributed.task.ORemoteTask; import com.orientechnologies.orient.server.distributed.task.ORemoteTask;
import com.orientechnologies.orient.server.hazelcast.OHazelcastPlugin; import com.orientechnologies.orient.server.hazelcast.OHazelcastPlugin;


Expand All @@ -44,6 +45,8 @@
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;


/** /**
Expand All @@ -62,7 +65,7 @@ public class ODistributedDatabaseImpl implements ODistributedDatabase {
protected final String databaseName; protected final String databaseName;
protected final Lock requestLock; protected final Lock requestLock;
protected ODistributedSyncConfiguration syncConfiguration; protected ODistributedSyncConfiguration syncConfiguration;
protected ConcurrentHashMap<ORID, ODistributedRequestId> lockManager = new ConcurrentHashMap<ORID, ODistributedRequestId>( protected ConcurrentHashMap<ORID, ODistributedLock> lockManager = new ConcurrentHashMap<ORID, ODistributedLock>(
256); 256);
protected ConcurrentHashMap<ODistributedRequestId, ODistributedTxContextImpl> activeTxContexts = new ConcurrentHashMap<ODistributedRequestId, ODistributedTxContextImpl>( protected ConcurrentHashMap<ODistributedRequestId, ODistributedTxContextImpl> activeTxContexts = new ConcurrentHashMap<ODistributedRequestId, ODistributedTxContextImpl>(
64); 64);
Expand All @@ -71,6 +74,17 @@ public class ODistributedDatabaseImpl implements ODistributedDatabase {


private Map<String, OLogSequenceNumber> lastLSN = new ConcurrentHashMap<String, OLogSequenceNumber>(); private Map<String, OLogSequenceNumber> lastLSN = new ConcurrentHashMap<String, OLogSequenceNumber>();
private long lastLSNWrittenOnDisk = 0l; private long lastLSNWrittenOnDisk = 0l;
private AtomicLong totalReceivedRequests = new AtomicLong();

private class ODistributedLock {
final ODistributedRequestId reqId;
final CountDownLatch lock;

private ODistributedLock(ODistributedRequestId reqId) {
this.reqId = reqId;
this.lock = new CountDownLatch(1);
}
}


public ODistributedDatabaseImpl(final OHazelcastPlugin manager, final ODistributedMessageServiceImpl msgService, public ODistributedDatabaseImpl(final OHazelcastPlugin manager, final ODistributedMessageServiceImpl msgService,
final String iDatabaseName) { final String iDatabaseName) {
Expand Down Expand Up @@ -103,6 +117,8 @@ public long getLastLSNWrittenOnDisk() {
public void processRequest(final ODistributedRequest request) { public void processRequest(final ODistributedRequest request) {
final ORemoteTask task = request.getTask(); final ORemoteTask task = request.getTask();


totalReceivedRequests.incrementAndGet();

if (task instanceof OAbstractReplicatedTask) { if (task instanceof OAbstractReplicatedTask) {
final OLogSequenceNumber taskLastLSN = ((OAbstractReplicatedTask) task).getLastLSN(); final OLogSequenceNumber taskLastLSN = ((OAbstractReplicatedTask) task).getLastLSN();
final OLogSequenceNumber lastLSN = getLastLSN(task.getNodeSource()); final OLogSequenceNumber lastLSN = getLastLSN(task.getNodeSource());
Expand Down Expand Up @@ -358,60 +374,90 @@ public void setOnline() {
} }


@Override @Override
public ODistributedRequestId lockRecord(final OIdentifiable iRecord, final ODistributedRequestId iRequestId) { public void lockRecord(final OIdentifiable iRecord, final ODistributedRequestId iRequestId, final long timeout) {
final ORID rid = iRecord.getIdentity(); final ORID rid = iRecord.getIdentity();
if (!rid.isPersistent())
// TEMPORARY RECORD
return null;


final ODistributedRequestId oldReqId = lockManager.putIfAbsent(rid, iRequestId); final ODistributedLock lock = new ODistributedLock(iRequestId);

ODistributedLock currentLock = lockManager.putIfAbsent(rid, lock);
if (currentLock != null && timeout > 0) {
do {
if (iRequestId.equals(currentLock.reqId)) {
// SAME ID, ALREADY LOCKED
ODistributedServerLog.debug(this, localNodeName, null, DIRECTION.NONE,
"Distributed transaction: %s locked record %s in database '%s' owned by %s (thread=%d)", iRequestId, iRecord,
databaseName, currentLock.reqId, Thread.currentThread().getId());
currentLock = null;
break;
}

try {


final boolean locked = oldReqId == null; if (timeout > 0)
currentLock.lock.await(timeout, TimeUnit.MILLISECONDS);
else
currentLock.lock.await();


if (!locked) { } catch (InterruptedException e) {
if (iRequestId.equals(oldReqId)) { Thread.currentThread().interrupt();
// SAME ID, ALREADY LOCKED break;
ODistributedServerLog.debug(this, localNodeName, null, DIRECTION.NONE, }
"Distributed transaction: %s locked record %s in database '%s' owned by %s", iRequestId, iRecord, databaseName, currentLock = lockManager.putIfAbsent(rid, lock);
iRequestId);
return null; } while (currentLock != null);
} }


if (currentLock != null) {
// CHECK THE OWNER SERVER IS ONLINE. THIS AVOIDS ANY "WALKING DEAD" LOCKS // CHECK THE OWNER SERVER IS ONLINE. THIS AVOIDS ANY "WALKING DEAD" LOCKS
final String lockingNodeName = manager.getNodeNameById(oldReqId.getNodeId()); final String lockingNodeName = manager.getNodeNameById(currentLock.reqId.getNodeId());
if (lockingNodeName == null || !manager.isNodeAvailable(lockingNodeName)) { if (lockingNodeName == null || !manager.isNodeAvailable(lockingNodeName)) {
ODistributedServerLog.info(this, localNodeName, null, DIRECTION.NONE, ODistributedServerLog.info(this, localNodeName, null, DIRECTION.NONE,
"Distributed transaction: forcing unlock of record %s in database '%s' because the owner server '%s' is offline (reqId=%s ownerReqId=%s)", "Distributed transaction: forcing unlock of record %s in database '%s' because the owner server '%s' is offline (reqId=%s ownerReqId=%s, thread=%d)",
iRecord.getIdentity(), databaseName, iRequestId, oldReqId); iRecord.getIdentity(), databaseName, iRequestId, currentLock.reqId, Thread.currentThread().getId());


// FORCE THE UNLOCK AND LOCK OF CURRENT REQ-ID // FORCE THE UNLOCK AND LOCK OF CURRENT REQ-ID
lockManager.put(rid, iRequestId); lockManager.put(rid, lock);
return null; currentLock = null;
} }
} }


if (ODistributedServerLog.isDebugEnabled()) if (ODistributedServerLog.isDebugEnabled())
if (locked) if (currentLock == null) {
ODistributedServerLog.debug(this, localNodeName, null, DIRECTION.NONE, ODistributedServerLog.debug(this, localNodeName, null, DIRECTION.NONE,
"Distributed transaction: %s locked record %s in database '%s'", iRequestId, iRecord, databaseName); "Distributed transaction: %s locked record %s in database '%s' (thread=%d)", iRequestId, iRecord, databaseName,
else Thread.currentThread().getId());
} else {
ODistributedServerLog.debug(this, localNodeName, null, DIRECTION.NONE, ODistributedServerLog.debug(this, localNodeName, null, DIRECTION.NONE,
"Distributed transaction: %s cannot lock record %s in database '%s' owned by %s", iRequestId, iRecord, databaseName, "Distributed transaction: %s cannot lock record %s in database '%s' owned by %s (thread=%d)", iRequestId, iRecord,
oldReqId); databaseName, currentLock.reqId, Thread.currentThread().getId());
}


return oldReqId; if (currentLock != null)
throw new ODistributedRecordLockedException(rid, currentLock.reqId);
} }


@Override @Override
public void unlockRecord(final OIdentifiable iRecord, final ODistributedRequestId requestId) { public void unlockRecord(final OIdentifiable iRecord, final ODistributedRequestId requestId) {
if (requestId == null) if (requestId == null)
return; return;


final ODistributedRequestId owner = lockManager.remove(iRecord.getIdentity()); final ODistributedLock owner = lockManager.remove(iRecord.getIdentity());
if (owner != null) {
if (!owner.reqId.equals(requestId)) {
ODistributedServerLog.error(this, localNodeName, null, DIRECTION.NONE,
"Distributed transaction: cannot unlocked record %s in database '%s' because owner %s <> current %s (thread=%d)",
iRecord, databaseName, owner.reqId, requestId, Thread.currentThread().getId());
return;
}

// NOTIFY ANY WAITERS
owner.lock.countDown();
}


if (ODistributedServerLog.isDebugEnabled()) if (ODistributedServerLog.isDebugEnabled())
ODistributedServerLog.debug(this, localNodeName, null, DIRECTION.NONE, ODistributedServerLog.debug(this, localNodeName, null, DIRECTION.NONE,
"Distributed transaction: %s unlocked record %s in database '%s' (owner=%s)", requestId, iRecord, databaseName, owner); "Distributed transaction: %s unlocked record %s in database '%s' (owner=%s, thread=%d)", requestId, iRecord, databaseName,
owner != null ? owner.reqId : "null", Thread.currentThread().getId());
} }


@Override @Override
Expand All @@ -422,12 +468,14 @@ public ODistributedTxContext registerTxContext(final ODistributedRequestId reqId
if (prevCtx != null) { if (prevCtx != null) {
// ALREADY EXISTENT // ALREADY EXISTENT
ODistributedServerLog.debug(this, localNodeName, null, DIRECTION.NONE, ODistributedServerLog.debug(this, localNodeName, null, DIRECTION.NONE,
"Distributed transaction: repeating request %s in database '%s'", reqId, databaseName); "Distributed transaction: repeating request %s in database '%s' (thread=%d)", reqId, databaseName,
Thread.currentThread().getId());
ctx = prevCtx; ctx = prevCtx;
} else } else
// REGISTERED // REGISTERED
ODistributedServerLog.debug(this, localNodeName, null, DIRECTION.NONE, ODistributedServerLog.debug(this, localNodeName, null, DIRECTION.NONE,
"Distributed transaction: registered request %s in database '%s'", reqId, databaseName); "Distributed transaction: registered request %s in database '%s' (thread=%d)", reqId, databaseName,
Thread.currentThread().getId());


return ctx; return ctx;
} }
Expand Down Expand Up @@ -512,6 +560,21 @@ public ODatabaseDocumentTx getDatabaseInstance() {
return (ODatabaseDocumentTx) manager.getServerInstance().openDatabase(databaseName, "internal", "internal", null, true); return (ODatabaseDocumentTx) manager.getServerInstance().openDatabase(databaseName, "internal", "internal", null, true);
} }


@Override
public long getReceivedRequests() {
return totalReceivedRequests.get();
}

@Override
public long getProcessedRequests() {
long total = 0;
for (ODistributedWorker workerThread : workerThreads) {
total += workerThread.getProcessedRequests();
}

return total;
}

public void shutdown() { public void shutdown() {
// SEND THE SHUTDOWN TO ALL THE WORKER THREADS // SEND THE SHUTDOWN TO ALL THE WORKER THREADS
for (ODistributedWorker workerThread : workerThreads) { for (ODistributedWorker workerThread : workerThreads) {
Expand Down
Expand Up @@ -273,4 +273,24 @@ public void updateMessageStats(final String message) {
counter.incrementAndGet(); counter.incrementAndGet();
} }
} }

@Override
public long getReceivedRequests() {
long total = 0;
for (ODistributedDatabaseImpl db : databases.values()) {
total += db.getReceivedRequests();
}

return total;
}

@Override
public long getProcessedRequests() {
long total = 0;
for (ODistributedDatabaseImpl db : databases.values()) {
total += db.getProcessedRequests();
}

return total;
}
} }
Expand Up @@ -22,6 +22,8 @@
import com.orientechnologies.common.io.OFileUtils; import com.orientechnologies.common.io.OFileUtils;
import com.orientechnologies.common.log.OAnsiCode; import com.orientechnologies.common.log.OAnsiCode;
import com.orientechnologies.orient.console.OTableFormatter; import com.orientechnologies.orient.console.OTableFormatter;
import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal;
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
import com.orientechnologies.orient.core.db.document.ODatabaseDocument; import com.orientechnologies.orient.core.db.document.ODatabaseDocument;
import com.orientechnologies.orient.core.db.record.OIdentifiable; import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.metadata.schema.OClass; import com.orientechnologies.orient.core.metadata.schema.OClass;
Expand Down Expand Up @@ -412,9 +414,14 @@ public void onMessage(final String text, final Object... args) {
} }
}); });


ODatabaseDocumentInternal db = ODatabaseRecordThreadLocal.INSTANCE.getIfDefined();
if( db != null && db.isClosed())
db = null;

table.setColumnSorting("CLUSTER", true); table.setColumnSorting("CLUSTER", true);
table.setColumnHidden("#"); table.setColumnHidden("#");

if (db != null)
table.setColumnAlignment("id", OTableFormatter.ALIGNMENT.RIGHT);
table.setColumnAlignment("writeQuorum", OTableFormatter.ALIGNMENT.CENTER); table.setColumnAlignment("writeQuorum", OTableFormatter.ALIGNMENT.CENTER);
table.setColumnAlignment("readQuorum", OTableFormatter.ALIGNMENT.CENTER); table.setColumnAlignment("readQuorum", OTableFormatter.ALIGNMENT.CENTER);


Expand Down Expand Up @@ -444,7 +451,10 @@ public void onMessage(final String text, final Object... args) {
rows.add(row); rows.add(row);


row.field("CLUSTER", cluster); row.field("CLUSTER", cluster);

if (db != null) {
final int clId = db.getClusterIdByName(cluster);
row.field("id", clId > -1 ? clId : "");
}
row.field("writeQuorum", wQ); row.field("writeQuorum", wQ);
row.field("readQuorum", rQ); row.field("readQuorum", rQ);


Expand Down

0 comments on commit ce0e4d5

Please sign in to comment.