Skip to content

Commit

Permalink
rid bag locking fixes + index key locking fixes + update identity fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
taburet committed Sep 1, 2016
1 parent 237df39 commit 28e6afc
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 54 deletions.
Expand Up @@ -26,6 +26,7 @@
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -83,8 +84,7 @@ public OOneEntryPerKeyLockManager(final boolean iEnabled, final int iAcquireTime

@Override
public Lock acquireSharedLock(final T key) {
acquireLock(key, LOCK.SHARED);
return null;
return acquireLock(key, LOCK.SHARED);
}

@Override
Expand All @@ -94,22 +94,21 @@ public void releaseSharedLock(final T key) {

@Override
public Lock acquireExclusiveLock(final T key) {
acquireLock(key, LOCK.EXCLUSIVE);
return null;
return acquireLock(key, LOCK.EXCLUSIVE);
}

@Override
public void releaseExclusiveLock(final T key) {
releaseLock(Thread.currentThread(), key, LOCK.EXCLUSIVE);
}

public void acquireLock(final T iResourceId, final LOCK iLockType) {
acquireLock(iResourceId, iLockType, acquireTimeout);
public Lock acquireLock(final T iResourceId, final LOCK iLockType) {
return acquireLock(iResourceId, iLockType, acquireTimeout);
}

public void acquireLock(final T iResourceId, final LOCK iLockType, long iTimeout) {
public Lock acquireLock(final T iResourceId, final LOCK iLockType, long iTimeout) {
if (!enabled)
return;
return null;

T immutableResource = getImmutableResourceId(iResourceId);
if (immutableResource == null)
Expand Down Expand Up @@ -192,6 +191,8 @@ public void acquireLock(final T iResourceId, final LOCK iLockType, long iTimeout
.wrapException(new OLockException("Thread interrupted while waiting for resource '" + iResourceId + "'"), e);
}
}

return new CountableLockWrapper(lock, iLockType == LOCK.SHARED);
} catch (RuntimeException e) {
final int usages = lock.countLocks.decrementAndGet();
if (usages == 0)
Expand Down Expand Up @@ -303,4 +304,51 @@ protected T getImmutableResourceId(final T iResourceId) {
private static int closestInteger(int value) {
return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
}

@SuppressWarnings("NullableProblems")
private static class CountableLockWrapper implements Lock {

private final CountableLock countableLock;
private final boolean read;

public CountableLockWrapper(CountableLock countableLock, boolean read) {
this.countableLock = countableLock;
this.read = read;
}

@Override
public void lock() {
throw new UnsupportedOperationException();
}

@Override
public void lockInterruptibly() throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public boolean tryLock() {
throw new UnsupportedOperationException();
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public void unlock() {
countableLock.countLocks.decrementAndGet();

if (read)
countableLock.readWriteLock.readLock().unlock();
else
countableLock.readWriteLock.writeLock().unlock();
}

@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
}
Expand Up @@ -20,11 +20,6 @@

package com.orientechnologies.orient.core.db.record.ridbag.sbtree;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;

import com.orientechnologies.common.profiler.OProfiler;
import com.orientechnologies.common.serialization.types.OBooleanSerializer;
import com.orientechnologies.orient.core.Orient;
Expand All @@ -37,16 +32,33 @@
import com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage;
import com.orientechnologies.orient.core.storage.impl.local.paginated.atomicoperations.OAtomicOperation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;

/**
* Persistent Set<OIdentifiable> implementation that uses the SBTree to handle entries in persistent way.
*
*
* @author Artem Orobets (enisher-at-gmail.com)
*/
public class OIndexRIDContainerSBTree implements Set<OIdentifiable> {
public static final String INDEX_FILE_EXTENSION = ".irs";
public static final String INDEX_FILE_EXTENSION = ".irs";

/**
* Generates a lock name for the given index name.
*
* @param indexName the index name to generate the lock name for.
*
* @return the generated lock name.
*/
public static String generateLockName(String indexName) {
return indexName + INDEX_FILE_EXTENSION;
}

private OSBTreeBonsaiLocal<OIdentifiable, Boolean> tree;

protected static final OProfiler PROFILER = Orient.instance().getProfiler();
protected static final OProfiler PROFILER = Orient.instance().getProfiler();

public OIndexRIDContainerSBTree(long fileId, OAbstractPaginatedStorage storage) {
String fileName;
Expand Down Expand Up @@ -206,8 +218,8 @@ public String getName() {
}

private static class TreeKeyIterator implements Iterator<OIdentifiable> {
private final boolean autoConvertToRecord;
private OSBTreeMapEntryIterator<OIdentifiable, Boolean> entryIterator;
private final boolean autoConvertToRecord;
private OSBTreeMapEntryIterator<OIdentifiable, Boolean> entryIterator;

public TreeKeyIterator(OTreeInternal<OIdentifiable, Boolean> tree, boolean autoConvertToRecord) {
entryIterator = new OSBTreeMapEntryIterator<OIdentifiable, Boolean>(tree);
Expand Down
Expand Up @@ -20,29 +20,39 @@

package com.orientechnologies.orient.core.db.record.ridbag.sbtree;

import java.util.UUID;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.orientechnologies.common.concur.resource.OCloseable;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.index.sbtreebonsai.local.OSBTreeBonsai;

import java.util.UUID;

/**
* @author Artem Orobets (enisher-at-gmail.com)
*/
public abstract class OSBTreeCollectionManagerAbstract implements OCloseable, OSBTreeCollectionManager {
public static final String FILE_NAME_PREFIX = "collections_";
public static final String DEFAULT_EXTENSION = ".sbc";
protected final int evictionThreshold;
protected final int cacheMaxSize;
protected final int shift;
protected final int mask;
protected final Object[] locks;
private final ConcurrentLinkedHashMap<OBonsaiCollectionPointer, SBTreeBonsaiContainer> treeCache = new ConcurrentLinkedHashMap.Builder<OBonsaiCollectionPointer, SBTreeBonsaiContainer>()
.maximumWeightedCapacity(
Long.MAX_VALUE)
.build();
public static final String FILE_NAME_PREFIX = "collections_";
public static final String DEFAULT_EXTENSION = ".sbc";

/**
* Generates a lock name for the given cluster ID.
*
* @param clusterId the cluster ID to generate the lock name for.
*
* @return the generated lock name.
*/
public static String generateLockName(int clusterId) {
return FILE_NAME_PREFIX + clusterId + DEFAULT_EXTENSION;
}

protected final int evictionThreshold;
protected final int cacheMaxSize;
protected final int shift;
protected final int mask;
protected final Object[] locks;
private final ConcurrentLinkedHashMap<OBonsaiCollectionPointer, SBTreeBonsaiContainer> treeCache = new ConcurrentLinkedHashMap.Builder<OBonsaiCollectionPointer, SBTreeBonsaiContainer>()
.maximumWeightedCapacity(Long.MAX_VALUE).build();

public OSBTreeCollectionManagerAbstract() {
this(OGlobalConfiguration.SBTREEBONSAI_LINKBAG_CACHE_EVICTION_SIZE.getValueAsInteger(),
Expand Down Expand Up @@ -106,7 +116,6 @@ public OSBTreeBonsai<OIdentifiable, Integer> loadSBTree(OBonsaiCollectionPointer
}
}


}

evict();
Expand Down Expand Up @@ -181,7 +190,7 @@ private Object treesSubsetLock(OBonsaiCollectionPointer collectionPointer) {

private static final class SBTreeBonsaiContainer {
private final OSBTreeBonsai<OIdentifiable, Integer> tree;
private int usagesCounter = 0;
private int usagesCounter = 0;

private SBTreeBonsaiContainer(OSBTreeBonsai<OIdentifiable, Integer> tree) {
this.tree = tree;
Expand Down
Expand Up @@ -645,7 +645,7 @@ private void unlockKeys() {
try {
lock.unlock();
} catch (RuntimeException e) {
OLogManager.instance().error(this, "Error during unlock of index key");
OLogManager.instance().error(this, "Error during unlock of index key", e);
}
}
}
Expand Down
Expand Up @@ -46,9 +46,7 @@
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
import com.orientechnologies.orient.core.db.record.OCurrentStorageComponentsFactory;
import com.orientechnologies.orient.core.db.record.ORecordOperation;
import com.orientechnologies.orient.core.db.record.ridbag.sbtree.ORidBagDeleter;
import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OSBTreeCollectionManager;
import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OSBTreeCollectionManagerShared;
import com.orientechnologies.orient.core.db.record.ridbag.sbtree.*;
import com.orientechnologies.orient.core.exception.*;
import com.orientechnologies.orient.core.id.ORID;
import com.orientechnologies.orient.core.id.ORecordId;
Expand Down Expand Up @@ -1292,6 +1290,7 @@ public int compare(final ORecordOperation o1, final ORecordOperation o2) {
startStorageTx(clientTx);

lockClusters(clustersToLock);
lockRidBags(clustersToLock, indexesToCommit);
lockIndexes(indexesToCommit);

Map<ORecordOperation, OPhysicalPosition> positions = new IdentityHashMap<ORecordOperation, OPhysicalPosition>();
Expand Down Expand Up @@ -4146,6 +4145,9 @@ private void lockIndexKeys(OIndexManagerProxy manager, TreeMap<String, OTransact
if (index == null)
throw new OTransactionException("Cannot find index '" + indexName + "' while committing transaction");

if (!(index instanceof OIndexUnique))
continue;

if (!changes.nullKeyChanges.entries.isEmpty())
index.lockKeysForUpdate((Object) null);
lockList.add(index.lockKeysForUpdate(changes.changesPerKey.keySet()));
Expand All @@ -4158,6 +4160,9 @@ private void unlockIndexKeys(TreeMap<String, OTransactionIndexChanges> indexes,
if (index == null) // index may be unresolved at this point (and its keys are not locked) due to failure in lockIndexKeys
break;

if (!(index instanceof OIndexUnique))
continue;

if (!changes.nullKeyChanges.entries.isEmpty())
index.releaseKeysForUpdate((Object) null);
}
Expand Down Expand Up @@ -4200,6 +4205,23 @@ private void lockClusters(final TreeMap<Integer, OCluster> clustersToLock) {
cluster.acquireAtomicExclusiveLock();
}

private void lockRidBags(final TreeMap<Integer, OCluster> clusters, final TreeMap<String, OTransactionIndexChanges> indexes) {
final OAtomicOperation atomicOperation = atomicOperationsManager.getCurrentOperation();

for (Integer clusterId : clusters.keySet())
atomicOperationsManager
.acquireExclusiveLockTillOperationComplete(atomicOperation, OSBTreeCollectionManagerAbstract.generateLockName(clusterId));

for (Map.Entry<String, OTransactionIndexChanges> entry : indexes.entrySet()) {
final String indexName = entry.getKey();
final OIndexInternal<?> index = entry.getValue().getAssociatedIndex();

if (!index.isUnique())
atomicOperationsManager
.acquireExclusiveLockTillOperationComplete(atomicOperation, OIndexRIDContainerSBTree.generateLockName(indexName));
}
}

private class UncompletedCommit implements OUncompletedCommit<List<ORecordOperation>> {
private final OTransaction clientTx;
private final Iterable<ORecordOperation> entries;
Expand Down
Expand Up @@ -453,12 +453,18 @@ public OUncompletedCommit<OAtomicOperation> initiateCommit() throws IOException
return new UncompletedCommit(operation, operation.initiateCommit(useWal() ? writeAheadLog : null));
}

private void acquireExclusiveLockTillOperationComplete(OAtomicOperation operation, String fullName) {
if (operation.containsInLockedObjects(fullName))
/**
* Acquires exclusive lock with the given lock name in the given atomic operation.
*
* @param operation the atomic operation to acquire the lock in.
* @param lockName the lock name to acquire.
*/
public void acquireExclusiveLockTillOperationComplete(OAtomicOperation operation, String lockName) {
if (operation.containsInLockedObjects(lockName))
return;

lockManager.acquireLock(fullName, OOneEntryPerKeyLockManager.LOCK.EXCLUSIVE);
operation.addLockedObject(fullName);
lockManager.acquireLock(lockName, OOneEntryPerKeyLockManager.LOCK.EXCLUSIVE);
operation.addLockedObject(lockName);
}

/**
Expand Down
Expand Up @@ -153,17 +153,7 @@ public Collection<ORecordOperation> getAllRecordEntries() {
}

public ORecordOperation getRecordEntry(ORID rid) {
ORecordOperation e = allEntries.get(rid);
if (e != null)
return e;

if (!updatedRids.isEmpty()) {
ORID r = updatedRids.get(rid);
if (r != null)
return allEntries.get(r);
}

return null;
return allEntries.get(translateRid(rid));
}

public ORecord getRecord(final ORID rid) {
Expand Down Expand Up @@ -361,7 +351,7 @@ public void updateIdentityAfterCommit(final ORID oldRid, final ORID newRid) {

final ORecordOperation rec = getRecordEntry(oldRid);
if (rec != null) {
updatedRids.put(newRid, oldRid.copy());
updatedRids.put(newRid.copy(), oldRid.copy());

if (!rec.getRecord().getIdentity().equals(newRid)) {
ORecordInternal.onBeforeIdentityChanged(rec.getRecord());
Expand All @@ -385,7 +375,7 @@ public void updateIdentityAfterCommit(final ORID oldRid, final ORID newRid) {

// Update the indexes.

final List<OTransactionRecordIndexOperation> transactionIndexOperations = recordIndexOperations.get(oldRid);
final List<OTransactionRecordIndexOperation> transactionIndexOperations = recordIndexOperations.get(translateRid(oldRid));
if (transactionIndexOperations != null) {
for (final OTransactionRecordIndexOperation indexOperation : transactionIndexOperations) {
OTransactionIndexChanges indexEntryChanges = indexEntries.get(indexOperation.index);
Expand Down Expand Up @@ -474,6 +464,18 @@ public Object getCustomData(String iName) {
return userData.get(iName);
}

private ORID translateRid(ORID rid) {
while (true) {
final ORID translatedRid = updatedRids.get(rid);
if (translatedRid == null)
break;

rid = translatedRid;
}

return rid;
}

private static Dependency[] getIndexFieldRidDependencies(OIndex<?> index) {
final OIndexDefinition definition = index.getDefinition();

Expand Down

0 comments on commit 28e6afc

Please sign in to comment.