Skip to content

Commit

Permalink
Issue #3631 array based record lock manager was replaced by map based.
Browse files Browse the repository at this point in the history
  • Loading branch information
laa committed Apr 27, 2015
1 parent a033fa8 commit 2790583
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 66 deletions.
Expand Up @@ -21,6 +21,7 @@
package com.orientechnologies.orient.core.storage.impl.local; package com.orientechnologies.orient.core.storage.impl.local;


import com.orientechnologies.common.concur.OTimeoutException; import com.orientechnologies.common.concur.OTimeoutException;
import com.orientechnologies.common.concur.lock.OLockManager;
import com.orientechnologies.common.concur.lock.OModificationLock; import com.orientechnologies.common.concur.lock.OModificationLock;
import com.orientechnologies.common.concur.lock.ONewLockManager; import com.orientechnologies.common.concur.lock.ONewLockManager;
import com.orientechnologies.common.exception.OException; import com.orientechnologies.common.exception.OException;
Expand Down Expand Up @@ -90,39 +91,40 @@
*/ */
public abstract class OAbstractPaginatedStorage extends OStorageAbstract implements OLowDiskSpaceListener, public abstract class OAbstractPaginatedStorage extends OStorageAbstract implements OLowDiskSpaceListener,
OFullCheckpointRequestListener { OFullCheckpointRequestListener {
private static final int RECORD_LOCK_TIMEOUT = OGlobalConfiguration.STORAGE_RECORD_LOCK_TIMEOUT private static final int RECORD_LOCK_TIMEOUT = OGlobalConfiguration.STORAGE_RECORD_LOCK_TIMEOUT
.getValueAsInteger(); .getValueAsInteger();


private final ONewLockManager<ORID> lockManager; private final OLockManager<ORID, OAbstractPaginatedStorage> lockManager;
private final String PROFILER_CREATE_RECORD; private final String PROFILER_CREATE_RECORD;
private final String PROFILER_READ_RECORD; private final String PROFILER_READ_RECORD;
private final String PROFILER_UPDATE_RECORD; private final String PROFILER_UPDATE_RECORD;
private final String PROFILER_DELETE_RECORD; private final String PROFILER_DELETE_RECORD;
private final ConcurrentMap<String, OCluster> clusterMap = new ConcurrentHashMap<String, OCluster>(); private final ConcurrentMap<String, OCluster> clusterMap = new ConcurrentHashMap<String, OCluster>();
private final ThreadLocal<OStorageTransaction> transaction = new ThreadLocal<OStorageTransaction>(); private final ThreadLocal<OStorageTransaction> transaction = new ThreadLocal<OStorageTransaction>();
private final OModificationLock modificationLock = new OModificationLock(); private final OModificationLock modificationLock = new OModificationLock();
private final AtomicBoolean checkpointInProgress = new AtomicBoolean(); private final AtomicBoolean checkpointInProgress = new AtomicBoolean();
protected volatile OWriteAheadLog writeAheadLog; protected volatile OWriteAheadLog writeAheadLog;


protected volatile OReadCache readCache; protected volatile OReadCache readCache;
protected volatile OWriteCache writeCache; protected volatile OWriteCache writeCache;


private ORecordConflictStrategy recordConflictStrategy = Orient.instance() private ORecordConflictStrategy recordConflictStrategy = Orient
.getRecordConflictStrategy() .instance()
.newInstanceOfDefaultClass(); .getRecordConflictStrategy()
private CopyOnWriteArrayList<OCluster> clusters = new CopyOnWriteArrayList<OCluster>(); .newInstanceOfDefaultClass();
private volatile int defaultClusterId = -1; private CopyOnWriteArrayList<OCluster> clusters = new CopyOnWriteArrayList<OCluster>();
private volatile OAtomicOperationsManager atomicOperationsManager; private volatile int defaultClusterId = -1;
private volatile boolean wereDataRestoredAfterOpen = false; private volatile OAtomicOperationsManager atomicOperationsManager;
private volatile boolean wereNonTxOperationsPerformedInPreviousOpen = false; private volatile boolean wereDataRestoredAfterOpen = false;
private boolean makeFullCheckPointAfterClusterCreate = OGlobalConfiguration.STORAGE_MAKE_FULL_CHECKPOINT_AFTER_CLUSTER_CREATE private volatile boolean wereNonTxOperationsPerformedInPreviousOpen = false;
.getValueAsBoolean(); private boolean makeFullCheckPointAfterClusterCreate = OGlobalConfiguration.STORAGE_MAKE_FULL_CHECKPOINT_AFTER_CLUSTER_CREATE
private volatile OLowDiskSpaceInformation lowDiskSpace = null; .getValueAsBoolean();
private volatile boolean checkpointRequest = false; private volatile OLowDiskSpaceInformation lowDiskSpace = null;
private volatile boolean checkpointRequest = false;


public OAbstractPaginatedStorage(String name, String filePath, String mode) { public OAbstractPaginatedStorage(String name, String filePath, String mode) {
super(name, filePath, mode, OGlobalConfiguration.STORAGE_LOCK_TIMEOUT.getValueAsInteger()); super(name, filePath, mode, OGlobalConfiguration.STORAGE_LOCK_TIMEOUT.getValueAsInteger());
lockManager = new ONewLockManager<ORID>(); lockManager = new OLockManager<ORID, OAbstractPaginatedStorage>(true, -1);


PROFILER_CREATE_RECORD = "db." + this.name + ".createRecord"; PROFILER_CREATE_RECORD = "db." + this.name + ".createRecord";
PROFILER_READ_RECORD = "db." + this.name + ".readRecord"; PROFILER_READ_RECORD = "db." + this.name + ".readRecord";
Expand Down Expand Up @@ -664,7 +666,7 @@ public ORecordMetadata getRecordMetadata(ORID rid) {
checkOpeness(); checkOpeness();


final OCluster cluster = getClusterById(rid.getClusterId()); final OCluster cluster = getClusterById(rid.getClusterId());
Lock recordLock = lockManager.acquireSharedLock(rid); lockManager.acquireLock(this, rid, OLockManager.LOCK.SHARED);
try { try {
lock.acquireSharedLock(); lock.acquireSharedLock();
try { try {
Expand All @@ -679,7 +681,7 @@ public ORecordMetadata getRecordMetadata(ORID rid) {
} catch (IOException ioe) { } catch (IOException ioe) {
OLogManager.instance().error(this, "Retrieval of record '" + rid + "' cause: " + ioe.getMessage(), ioe); OLogManager.instance().error(this, "Retrieval of record '" + rid + "' cause: " + ioe.getMessage(), ioe);
} finally { } finally {
lockManager.releaseLock(recordLock); lockManager.releaseLock(this, rid, OLockManager.LOCK.SHARED);
} }


return null; return null;
Expand Down Expand Up @@ -716,7 +718,7 @@ public OStorageOperationResult<ORecordVersion> updateRecord(final ORecordId rid,
modificationLock.requestModificationLock(); modificationLock.requestModificationLock();
try { try {
// GET THE SHARED LOCK AND GET AN EXCLUSIVE LOCK AGAINST THE RECORD // GET THE SHARED LOCK AND GET AN EXCLUSIVE LOCK AGAINST THE RECORD
Lock recordLock = lockManager.acquireExclusiveLock(rid); lockManager.acquireLock(this, rid, OLockManager.LOCK.EXCLUSIVE);
try { try {
lock.acquireSharedLock(); lock.acquireSharedLock();
try { try {
Expand All @@ -726,7 +728,7 @@ public OStorageOperationResult<ORecordVersion> updateRecord(final ORecordId rid,
lock.releaseSharedLock(); lock.releaseSharedLock();
} }
} finally { } finally {
lockManager.releaseLock(recordLock); lockManager.releaseLock(this, rid, OLockManager.LOCK.EXCLUSIVE);
} }
} finally { } finally {
modificationLock.releaseModificationLock(); modificationLock.releaseModificationLock();
Expand Down Expand Up @@ -772,7 +774,7 @@ public OStorageOperationResult<Boolean> deleteRecord(final ORecordId rid, final
try { try {
modificationLock.requestModificationLock(); modificationLock.requestModificationLock();
try { try {
Lock recordLock = lockManager.acquireExclusiveLock(rid); lockManager.acquireLock(this, rid, OLockManager.LOCK.EXCLUSIVE);
try { try {
lock.acquireSharedLock(); lock.acquireSharedLock();
try { try {
Expand All @@ -781,7 +783,7 @@ public OStorageOperationResult<Boolean> deleteRecord(final ORecordId rid, final
lock.releaseSharedLock(); lock.releaseSharedLock();
} }
} finally { } finally {
lockManager.releaseLock(recordLock); lockManager.releaseLock(this, rid, OLockManager.LOCK.EXCLUSIVE);
} }
} finally { } finally {
modificationLock.releaseModificationLock(); modificationLock.releaseModificationLock();
Expand Down Expand Up @@ -815,7 +817,7 @@ public OStorageOperationResult<Boolean> hideRecord(final ORecordId rid, final in
try { try {
modificationLock.requestModificationLock(); modificationLock.requestModificationLock();
try { try {
final Lock recordLock = lockManager.acquireExclusiveLock(rid); lockManager.acquireLock(this, rid, OLockManager.LOCK.EXCLUSIVE);
try { try {
lock.acquireSharedLock(); lock.acquireSharedLock();
try { try {
Expand All @@ -824,7 +826,7 @@ public OStorageOperationResult<Boolean> hideRecord(final ORecordId rid, final in
lock.releaseSharedLock(); lock.releaseSharedLock();
} }
} finally { } finally {
lockManager.releaseLock(recordLock); lockManager.releaseLock(this, rid, OLockManager.LOCK.EXCLUSIVE);
} }
} finally { } finally {
modificationLock.releaseModificationLock(); modificationLock.releaseModificationLock();
Expand Down Expand Up @@ -1296,43 +1298,21 @@ public OPhysicalPosition[] floorPhysicalPositions(int clusterId, OPhysicalPositi


public void acquireWriteLock(final ORID rid) { public void acquireWriteLock(final ORID rid) {
assert !lock.assertSharedLockHold() && !lock.assertExclusiveLockHold() : " a record lock should not be taken inside a storage lock"; assert !lock.assertSharedLockHold() && !lock.assertExclusiveLockHold() : " a record lock should not be taken inside a storage lock";

lockManager.acquireLock(this, rid, OLockManager.LOCK.EXCLUSIVE, RECORD_LOCK_TIMEOUT);
boolean result;
try {
result = lockManager.tryAcquireExclusiveLock(rid, RECORD_LOCK_TIMEOUT);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new OTimeoutException("Thread was interrupted during record lock", e);
}

if (!result)
throw new OTimeoutException("Can not lock record for " + RECORD_LOCK_TIMEOUT
+ " ms. seems record is deadlocked by other record");
} }


public void releaseWriteLock(final ORID rid) { public void releaseWriteLock(final ORID rid) {
assert !lock.assertSharedLockHold() && !lock.assertExclusiveLockHold() : " a record lock should not be released inside a storage lock"; assert !lock.assertSharedLockHold() && !lock.assertExclusiveLockHold() : " a record lock should not be released inside a storage lock";
lockManager.releaseExclusiveLock(rid); lockManager.releaseLock(this, rid, OLockManager.LOCK.EXCLUSIVE);
} }


public void acquireReadLock(final ORID rid) { public void acquireReadLock(final ORID rid) {
assert !lock.assertSharedLockHold() && !lock.assertExclusiveLockHold() : " a record lock should not be taken inside a storage lock"; lockManager.acquireLock(this, rid, OLockManager.LOCK.SHARED, RECORD_LOCK_TIMEOUT);
boolean result;
try {
result = lockManager.tryAcquireSharedLock(rid, RECORD_LOCK_TIMEOUT);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new OTimeoutException("Thread was interrupted during record lock", e);
}

if (!result)
throw new OTimeoutException("Can not lock record for " + RECORD_LOCK_TIMEOUT
+ " ms. seems record is deadlocked by other record");
} }


public void releaseReadLock(final ORID iRid) { public void releaseReadLock(final ORID rid) {
assert !lock.assertSharedLockHold() && !lock.assertExclusiveLockHold() : " a record lock should not be released inside a storage lock"; assert !lock.assertSharedLockHold() && !lock.assertExclusiveLockHold() : " a record lock should not be released inside a storage lock";
lockManager.releaseSharedLock(iRid); lockManager.releaseLock(this, rid, OLockManager.LOCK.SHARED);
} }


public ORecordConflictStrategy getConflictStrategy() { public ORecordConflictStrategy getConflictStrategy() {
Expand Down
Expand Up @@ -21,7 +21,6 @@ public static void main(final String[] args) throws Exception {


new OInternalGraphImporter().runImport(inputFile, dbURL); new OInternalGraphImporter().runImport(inputFile, dbURL);
Orient.instance().shutdown(); Orient.instance().shutdown();
System.exit(0);
} }


public void runImport(String inputFile, String dbURL) throws IOException, FileNotFoundException { public void runImport(String inputFile, String dbURL) throws IOException, FileNotFoundException {
Expand Down

0 comments on commit 2790583

Please sign in to comment.