Skip to content

Commit

Permalink
implemented not thread bound lock manager and used for pessimistic re…
Browse files Browse the repository at this point in the history
…cord locking
  • Loading branch information
tglman committed Sep 5, 2018
1 parent 601a389 commit e6cbd7a
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 30 deletions.
@@ -0,0 +1,105 @@
package com.orientechnologies.common.concur.lock;

import com.orientechnologies.common.exception.OException;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ONotThreadRWLockManager<T> implements OSimpleRWLockManager<T> {

private class LockGuard {
private int count;
private Condition condition;
private boolean shared;

public LockGuard(int count, Condition condition, boolean shared) {
this.count = count;
this.condition = condition;
this.shared = shared;
}
}

private final Lock lock = new ReentrantLock();
private final Map<T, LockGuard> map = new HashMap<>();

public ONotThreadRWLockManager() {
}

public void lock(T key, boolean shared, long timeout) {

lock.lock();
try {
try {

LockGuard c;
do {
c = map.get(key);
if (c != null) {
if (c.shared && shared) {
c.count++;
return;
} else {
if (timeout == 0) {
c.condition.await();
} else {
if (!c.condition.await(timeout, TimeUnit.MILLISECONDS)) {
throw new OLockException(String.format("Time out acquire lock for resource: '%s' ", key));
}
}
}
}
} while (c != null);
c = new LockGuard(1, lock.newCondition(), shared);
map.put(key, c);
} catch (InterruptedException e) {
throw OException.wrapException(new OInterruptedException("Interrupted Lock"), e);
}
} finally {
lock.unlock();
}

}

public void unlock(T key, boolean shared) {
lock.lock();
try {
LockGuard c = map.get(key);
assert c != null;
if (c.shared != shared) {
throw new OLockException("Impossible to release a not acquired lock");
}
c.count--;
if (c.count == 0) {
map.remove(key);
c.condition.signalAll();
}
} finally {
lock.unlock();
}
}

@Override
public void acquireReadLock(T key, long timeout) {
lock(key, true, timeout);
}

@Override
public void acquireWriteLock(T key, long timeout) {
lock(key, false, timeout);
}

@Override
public void releaseReadLock(T key) {
unlock(key, true);
}

@Override
public void releaseWriteLock(T key) {
unlock(key, false);
}

}
@@ -0,0 +1,13 @@
package com.orientechnologies.common.concur.lock;

public interface OSimpleRWLockManager<T> {

void acquireReadLock(T key, long timeout);

void acquireWriteLock(T key, long timeout);

void releaseReadLock(T key);

void releaseWriteLock(T key);

}
Expand Up @@ -1202,14 +1202,17 @@ record = iRecord;


@Override @Override
public void internalLockRecord(OIdentifiable iRecord, OStorage.LOCKING_STRATEGY lockingStrategy) { public void internalLockRecord(OIdentifiable iRecord, OStorage.LOCKING_STRATEGY lockingStrategy) {
internalLockRecord(iRecord, lockingStrategy, 0);
}


public void internalLockRecord(OIdentifiable iRecord, OStorage.LOCKING_STRATEGY lockingStrategy, long timeout) {
final ORID rid = new ORecordId(iRecord.getIdentity()); final ORID rid = new ORecordId(iRecord.getIdentity());
OTransactionAbstract transaction = (OTransactionAbstract) getTransaction(); OTransactionAbstract transaction = (OTransactionAbstract) getTransaction();
if (!transaction.isLockedRecord(iRecord)) { if (!transaction.isLockedRecord(iRecord)) {
if (lockingStrategy == OStorage.LOCKING_STRATEGY.EXCLUSIVE_LOCK) if (lockingStrategy == OStorage.LOCKING_STRATEGY.EXCLUSIVE_LOCK)
((OAbstractPaginatedStorage) getStorage().getUnderlying()).acquireWriteLock(rid); ((OAbstractPaginatedStorage) getStorage().getUnderlying()).acquireWriteLock(rid, timeout);
else if (lockingStrategy == OStorage.LOCKING_STRATEGY.SHARED_LOCK) else if (lockingStrategy == OStorage.LOCKING_STRATEGY.SHARED_LOCK)
((OAbstractPaginatedStorage) getStorage().getUnderlying()).acquireReadLock(rid); ((OAbstractPaginatedStorage) getStorage().getUnderlying()).acquireReadLock(rid, timeout);
else else
throw new IllegalStateException("Unsupported locking strategy " + lockingStrategy); throw new IllegalStateException("Unsupported locking strategy " + lockingStrategy);
} }
Expand Down Expand Up @@ -1244,8 +1247,7 @@ public <RET extends ORecord> RET lock(ORID recordId, long timeout, TimeUnit time
checkOpenness(); checkOpenness();
checkIfActive(); checkIfActive();
pessimisticLockChecks(recordId); pessimisticLockChecks(recordId);
//TODO: add support for customizable timeout internalLockRecord(recordId, OStorage.LOCKING_STRATEGY.EXCLUSIVE_LOCK, timeoutUnit.toMillis(timeout));
internalLockRecord(recordId, OStorage.LOCKING_STRATEGY.EXCLUSIVE_LOCK);
return load(recordId, null, true); return load(recordId, null, true);
} }


Expand Down
Expand Up @@ -21,10 +21,7 @@
package com.orientechnologies.orient.core.storage.impl.local; package com.orientechnologies.orient.core.storage.impl.local;


import com.orientechnologies.common.concur.ONeedRetryException; import com.orientechnologies.common.concur.ONeedRetryException;
import com.orientechnologies.common.concur.lock.OComparableLockManager; import com.orientechnologies.common.concur.lock.*;
import com.orientechnologies.common.concur.lock.OLockManager;
import com.orientechnologies.common.concur.lock.OModificationOperationProhibitedException;
import com.orientechnologies.common.concur.lock.OPartitionedLockManager;
import com.orientechnologies.common.exception.OException; import com.orientechnologies.common.exception.OException;
import com.orientechnologies.common.exception.OHighLevelException; import com.orientechnologies.common.exception.OHighLevelException;
import com.orientechnologies.common.io.OIOException; import com.orientechnologies.common.io.OIOException;
Expand Down Expand Up @@ -264,7 +261,7 @@ public abstract class OAbstractPaginatedStorage extends OStorageAbstract
fuzzyCheckpointExecutor.setMaximumPoolSize(1); fuzzyCheckpointExecutor.setMaximumPoolSize(1);
} }


private final OComparableLockManager<ORID> lockManager; private final OSimpleRWLockManager<ORID> lockManager;


/** /**
* Lock is used to atomically update record versions. * Lock is used to atomically update record versions.
Expand Down Expand Up @@ -337,7 +334,7 @@ public OAbstractPaginatedStorage(String name, String filePath, String mode, int
super(name, filePath, mode); super(name, filePath, mode);


this.id = id; this.id = id;
lockManager = new ORIDOLockManager(); lockManager = new ONotThreadRWLockManager<>();
recordVersionManager = new OPartitionedLockManager<>(); recordVersionManager = new OPartitionedLockManager<>();


registerProfilerHooks(); registerProfilerHooks();
Expand Down Expand Up @@ -1071,13 +1068,12 @@ public void onException(Throwable e) {
/** /**
* This method finds all the records which were updated starting from (but not including) current LSN and write result in provided * This method finds all the records which were updated starting from (but not including) current LSN and write result in provided
* output stream. In output stream will be included all thw records which were updated/deleted/created since passed in LSN till * output stream. In output stream will be included all thw records which were updated/deleted/created since passed in LSN till
* the current moment. * the current moment. Deleted records are written in output stream first, then created/updated records. All records are sorted by
* Deleted records are written in output stream first, then created/updated records. All records are sorted by record id. * record id. Data format: <ol> <li>Amount of records (single entry) - 8 bytes</li> <li>Record's cluster id - 4 bytes</li>
* Data format: <ol> <li>Amount of records (single entry) - 8 bytes</li> <li>Record's cluster id - 4 bytes</li> <li>Record's * <li>Record's cluster position - 8 bytes</li> <li>Delete flag, 1 if record is deleted - 1 byte</li> <li>Record version , only
* cluster position - 8 bytes</li> <li>Delete flag, 1 if record is deleted - 1 byte</li> <li>Record version , only if record is * if record is not deleted - 4 bytes</li> <li>Record type, only if record is not deleted - 1 byte</li> <li>Length of binary
* not deleted - 4 bytes</li> <li>Record type, only if record is not deleted - 1 byte</li> <li>Length of binary presentation of * presentation of record, only if record is not deleted - 4 bytes</li> <li>Binary presentation of the record, only if record is
* record, only if record is not deleted - 4 bytes</li> <li>Binary presentation of the record, only if record is not deleted - * not deleted - length of content is provided in above entity</li> </ol>
* length of content is provided in above entity</li> </ol>
* *
* @param lsn LSN from which we should find changed records * @param lsn LSN from which we should find changed records
* @param stream Stream which will contain found records * @param stream Stream which will contain found records
Expand Down Expand Up @@ -2075,6 +2071,10 @@ private List<ORecordOperation> commit(final OTransactionInternal transaction, bo
recordLocks.add(recordOperation.getRID()); recordLocks.add(recordOperation.getRID());
} }
} }
Set<ORID> locked = transaction.getLockedRecords();
if (locked != null) {
recordLocks.removeAll(locked);
}
Collections.sort(recordLocks); Collections.sort(recordLocks);
for (ORID rid : recordLocks) { for (ORID rid : recordLocks) {
acquireWriteLock(rid); acquireWriteLock(rid);
Expand Down Expand Up @@ -2182,11 +2182,21 @@ private List<ORecordOperation> commit(final OTransactionInternal transaction, bo
} finally { } finally {
try { try {
if (pessimisticLock) { if (pessimisticLock) {
List<ORID> recordLocks = new ArrayList<>();
for (ORecordOperation recordOperation : recordOperations) { for (ORecordOperation recordOperation : recordOperations) {
if (recordOperation.type == ORecordOperation.UPDATED || recordOperation.type == ORecordOperation.DELETED) { if (recordOperation.type == ORecordOperation.UPDATED || recordOperation.type == ORecordOperation.DELETED) {
releaseWriteLock(recordOperation.getRID()); recordLocks.add(recordOperation.getRID());
} }
} }

Set<ORID> locked = transaction.getLockedRecords();
if (locked != null) {
recordLocks.removeAll(locked);
}

for (ORID rid : recordLocks) {
releaseWriteLock(rid);
}
} }
} finally { } finally {
stateLock.releaseReadLock(); stateLock.releaseReadLock();
Expand Down Expand Up @@ -2385,7 +2395,7 @@ public int addIndexEngine(String engineName, final String algorithm, final Strin
engine.create(valueSerializer, isAutomatic, keyTypes, nullValuesSupport, keySerializer, keySize, clustersToIndex, engine.create(valueSerializer, isAutomatic, keyTypes, nullValuesSupport, keySerializer, keySize, clustersToIndex,
engineProperties, metadata, encryption); engineProperties, metadata, encryption);


if(writeAheadLog != null) { if (writeAheadLog != null) {
writeAheadLog.flush(); writeAheadLog.flush();
} }


Expand Down Expand Up @@ -3945,9 +3955,21 @@ public OPhysicalPosition[] floorPhysicalPositions(int clusterId, OPhysicalPositi
} }
} }


public void acquireWriteLock(final ORID rid, long timeout) {
try {
lockManager.acquireWriteLock(rid, timeout);
} catch (RuntimeException ee) {
throw logAndPrepareForRethrow(ee);
} catch (Error ee) {
throw logAndPrepareForRethrow(ee);
} catch (Throwable t) {
throw logAndPrepareForRethrow(t);
}
}

public void acquireWriteLock(final ORID rid) { public void acquireWriteLock(final ORID rid) {
try { try {
lockManager.acquireLock(rid, OComparableLockManager.LOCK.EXCLUSIVE, RECORD_LOCK_TIMEOUT); lockManager.acquireWriteLock(rid, 0);
} catch (RuntimeException ee) { } catch (RuntimeException ee) {
throw logAndPrepareForRethrow(ee); throw logAndPrepareForRethrow(ee);
} catch (Error ee) { } catch (Error ee) {
Expand All @@ -3959,7 +3981,7 @@ public void acquireWriteLock(final ORID rid) {


public void releaseWriteLock(final ORID rid) { public void releaseWriteLock(final ORID rid) {
try { try {
lockManager.releaseLock(this, rid, OComparableLockManager.LOCK.EXCLUSIVE); lockManager.releaseWriteLock(rid);
} catch (RuntimeException ee) { } catch (RuntimeException ee) {
throw logAndPrepareForRethrow(ee); throw logAndPrepareForRethrow(ee);
} catch (Error ee) { } catch (Error ee) {
Expand All @@ -3971,7 +3993,19 @@ public void releaseWriteLock(final ORID rid) {


public void acquireReadLock(final ORID rid) { public void acquireReadLock(final ORID rid) {
try { try {
lockManager.acquireLock(rid, OComparableLockManager.LOCK.SHARED, RECORD_LOCK_TIMEOUT); lockManager.acquireReadLock(rid, 0);
} catch (RuntimeException ee) {
throw logAndPrepareForRethrow(ee);
} catch (Error ee) {
throw logAndPrepareForRethrow(ee);
} catch (Throwable t) {
throw logAndPrepareForRethrow(t);
}
}

public void acquireReadLock(final ORID rid, long timeout) {
try {
lockManager.acquireReadLock(rid, timeout);
} catch (RuntimeException ee) { } catch (RuntimeException ee) {
throw logAndPrepareForRethrow(ee); throw logAndPrepareForRethrow(ee);
} catch (Error ee) { } catch (Error ee) {
Expand All @@ -3983,7 +4017,7 @@ public void acquireReadLock(final ORID rid) {


public void releaseReadLock(final ORID rid) { public void releaseReadLock(final ORID rid) {
try { try {
lockManager.releaseLock(this, rid, OComparableLockManager.LOCK.SHARED); lockManager.releaseReadLock(rid);
} catch (RuntimeException ee) { } catch (RuntimeException ee) {
throw logAndPrepareForRethrow(ee); throw logAndPrepareForRethrow(ee);
} catch (Error ee) { } catch (Error ee) {
Expand Down Expand Up @@ -4691,7 +4725,7 @@ private int addClusterInternal(String clusterName, int clusterPos, Object... par
cluster.open(); cluster.open();
} }


if(writeAheadLog != null) { if (writeAheadLog != null) {
writeAheadLog.flush(); writeAheadLog.flush();
} }


Expand Down
Expand Up @@ -732,4 +732,8 @@ public boolean isUsingLog() {
public ORecordOperation getRecordEntry(ORID currentRid) { public ORecordOperation getRecordEntry(ORID currentRid) {
return recordOperations.get(currentRid); return recordOperations.get(currentRid);
} }

public Set<ORID> getLockedRecords() {
return null;
}
} }
Expand Up @@ -33,6 +33,7 @@


import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set;


public abstract class OTransactionAbstract implements OTransaction { public abstract class OTransactionAbstract implements OTransaction {
protected final ODatabaseDocumentInternal database; protected final ODatabaseDocumentInternal database;
Expand Down Expand Up @@ -106,13 +107,9 @@ public void close() {
final LockedRecordMetadata lockedRecordMetadata = lock.getValue(); final LockedRecordMetadata lockedRecordMetadata = lock.getValue();


if (lockedRecordMetadata.strategy.equals(OStorage.LOCKING_STRATEGY.EXCLUSIVE_LOCK)) { if (lockedRecordMetadata.strategy.equals(OStorage.LOCKING_STRATEGY.EXCLUSIVE_LOCK)) {
for (int i = 0; i < lockedRecordMetadata.locksCount; i++) { ((OAbstractPaginatedStorage) getDatabase().getStorage().getUnderlying()).releaseWriteLock(lock.getKey());
((OAbstractPaginatedStorage) getDatabase().getStorage().getUnderlying()).releaseWriteLock(lock.getKey());
}
} else if (lockedRecordMetadata.strategy.equals(OStorage.LOCKING_STRATEGY.SHARED_LOCK)) { } else if (lockedRecordMetadata.strategy.equals(OStorage.LOCKING_STRATEGY.SHARED_LOCK)) {
for (int i = 0; i < lockedRecordMetadata.locksCount; i++) { ((OAbstractPaginatedStorage) getDatabase().getStorage().getUnderlying()).releaseReadLock(lock.getKey());
((OAbstractPaginatedStorage) getDatabase().getStorage().getUnderlying()).releaseReadLock(lock.getKey());
}
} }
} catch (Exception e) { } catch (Exception e) {
OLogManager.instance().debug(this, "Error on releasing lock against record " + lock.getKey(), e); OLogManager.instance().debug(this, "Error on releasing lock against record " + lock.getKey(), e);
Expand Down Expand Up @@ -196,4 +193,8 @@ public Map<ORID, LockedRecordMetadata> getInternalLocks() {
protected void setLocks(Map<ORID, LockedRecordMetadata> locks) { protected void setLocks(Map<ORID, LockedRecordMetadata> locks) {
this.locks = locks; this.locks = locks;
} }

public Set<ORID> getLockedRecords() {
return locks.keySet();
}
} }
Expand Up @@ -26,6 +26,7 @@


import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Set;


/** /**
* Expose the api for extract the internal details needed by the storage for perform the transaction commit * Expose the api for extract the internal details needed by the storage for perform the transaction commit
Expand Down Expand Up @@ -85,4 +86,6 @@ public interface OTransactionInternal extends OBasicTransaction {
*/ */
ORecordOperation getRecordEntry(ORID currentRid); ORecordOperation getRecordEntry(ORID currentRid);


Set<ORID> getLockedRecords();

} }

0 comments on commit e6cbd7a

Please sign in to comment.