Skip to content

Commit

Permalink
LockManager was changed to avoid exclusive locking in case of massive…
Browse files Browse the repository at this point in the history
… data reads.
  • Loading branch information
laa committed Jul 9, 2015
1 parent 480f55c commit 9554eb4
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 106 deletions.
Expand Up @@ -21,6 +21,8 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class OLockManager<RESOURCE_TYPE, REQUESTER_TYPE> {
Expand All @@ -32,16 +34,32 @@ public enum LOCK {
protected long acquireTimeout;
protected final ConcurrentHashMap<RESOURCE_TYPE, CountableLock> map;
private final boolean enabled;
private final int shift;
private final int mask;
private final Object[] locks;

@SuppressWarnings("serial")
protected static class CountableLock extends ReentrantReadWriteLock {
protected int countLocks = 0;
protected static class CountableLock {
private final AtomicInteger countLocks = new AtomicInteger(1);
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof CountableLock))
return false;

CountableLock that = (CountableLock) o;

if (!countLocks.equals(that.countLocks))
return false;
return readWriteLock.equals(that.readWriteLock);

public CountableLock() {
super(false);
}

@Override
public int hashCode() {
int result = countLocks.hashCode();
result = 31 * result + readWriteLock.hashCode();
return result;
}
}

Expand All @@ -52,21 +70,11 @@ public OLockManager(final boolean iEnabled, final int iAcquireTimeout) {
public OLockManager(final boolean iEnabled, final int iAcquireTimeout, final int concurrencyLevel) {
int cL = 1;

int sh = 0;
while (cL < concurrencyLevel) {
cL <<= 1;
sh++;
}

shift = 32 - sh;
mask = cL - 1;

map = new ConcurrentHashMap<RESOURCE_TYPE, CountableLock>(cL);
locks = new Object[cL];
for (int i = 0; i < locks.length; i++) {
locks[i] = new Object();
}

acquireTimeout = iAcquireTimeout;
enabled = iEnabled;
}
Expand All @@ -79,33 +87,44 @@ public void acquireLock(final REQUESTER_TYPE iRequester, final RESOURCE_TYPE iRe
if (!enabled)
return;

final RESOURCE_TYPE immutableResource = getImmutableResourceId(iResourceId);

CountableLock lock;
final Object internalLock = internalLock(iResourceId);
synchronized (internalLock) {
lock = map.get(iResourceId);
if (lock == null) {
final CountableLock newLock = new CountableLock();
lock = map.putIfAbsent(getImmutableResourceId(iResourceId), newLock);
if (lock == null)
lock = newLock;

while (true) {
lock = new CountableLock();

CountableLock oldLock = map.putIfAbsent(immutableResource, lock);
if (oldLock == null)
break;

lock = oldLock;
final int oldValue = lock.countLocks.get();

if (oldValue > 0) {
if (lock.countLocks.compareAndSet(oldValue, oldValue + 1)) {
if (map.get(immutableResource) == lock)
break;
}

// otherwise wait till lock will be removed by release
}
lock.countLocks++;
}

try {
if (iTimeout <= 0) {
if (iLockType == LOCK.SHARED)
lock.readLock().lock();
lock.readWriteLock.readLock().lock();
else
lock.writeLock().lock();
lock.readWriteLock.writeLock().lock();
} else {
try {
if (iLockType == LOCK.SHARED) {
if (!lock.readLock().tryLock(iTimeout, TimeUnit.MILLISECONDS))
if (!lock.readWriteLock.readLock().tryLock(iTimeout, TimeUnit.MILLISECONDS))
throw new OLockException("Timeout (" + iTimeout + "ms) on acquiring resource '" + iResourceId
+ "' because is locked from another thread");
} else {
if (!lock.writeLock().tryLock(iTimeout, TimeUnit.MILLISECONDS))
if (!lock.readWriteLock.writeLock().tryLock(iTimeout, TimeUnit.MILLISECONDS))
throw new OLockException("Timeout (" + iTimeout + "ms) on acquiring resource '" + iResourceId
+ "' because is locked from another thread");
}
Expand All @@ -115,14 +134,13 @@ public void acquireLock(final REQUESTER_TYPE iRequester, final RESOURCE_TYPE iRe
}
}
} catch (RuntimeException e) {
synchronized (internalLock) {
lock.countLocks--;
if (lock.countLocks == 0)
map.remove(iResourceId);
}
lock.countLocks.decrementAndGet();

if (lock.countLocks.get() == 0)
map.remove(iResourceId);

throw e;
}

}

public void releaseLock(final REQUESTER_TYPE iRequester, final RESOURCE_TYPE iResourceId, final LOCK iLockType)
Expand All @@ -131,21 +149,20 @@ public void releaseLock(final REQUESTER_TYPE iRequester, final RESOURCE_TYPE iRe
return;

final CountableLock lock;
final Object internalLock = internalLock(iResourceId);
synchronized (internalLock) {
lock = map.get(iResourceId);
if (lock == null)
throw new OLockException("Error on releasing a non acquired lock by the requester '" + iRequester
+ "' against the resource: '" + iResourceId + "'");

lock.countLocks--;
if (lock.countLocks == 0)
map.remove(iResourceId);
}
lock = map.get(iResourceId);
if (lock == null)
throw new OLockException("Error on releasing a non acquired lock by the requester '" + iRequester
+ "' against the resource: '" + iResourceId + "'");

lock.countLocks.decrementAndGet();

if (lock.countLocks.get() == 0)
map.remove(iResourceId);

if (iLockType == LOCK.SHARED)
lock.readLock().unlock();
lock.readWriteLock.readLock().unlock();
else
lock.writeLock().unlock();
lock.readWriteLock.writeLock().unlock();
}

public void clear() {
Expand All @@ -164,12 +181,6 @@ protected RESOURCE_TYPE getImmutableResourceId(final RESOURCE_TYPE iResourceId)
return iResourceId;
}

private Object internalLock(final RESOURCE_TYPE iResourceId) {
final int hashCode = iResourceId.hashCode();
final int index = (hashCode ^ (hashCode >>> 16)) & mask;
return locks[index];
}

private static int defaultConcurrency() {
return Runtime.getRuntime().availableProcessors() * 8 > DEFAULT_CONCURRENCY_LEVEL ? Runtime.getRuntime().availableProcessors() * 8
: DEFAULT_CONCURRENCY_LEVEL;
Expand Down
Expand Up @@ -29,21 +29,25 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.ConsoleHandler;
import java.util.logging.FileHandler;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;

public class OLogManager {
private static final String DEFAULT_LOG = "com.orientechnologies";
private static final String ENV_INSTALL_CUSTOM_FORMATTER = "orientdb.installCustomFormatter";
private static final OLogManager instance = new OLogManager();
private boolean debug = true;
private boolean info = true;
private boolean warn = true;
private boolean error = true;
private Level minimumLevel = Level.SEVERE;
private static final String DEFAULT_LOG = "com.orientechnologies";
private static final String ENV_INSTALL_CUSTOM_FORMATTER = "orientdb.installCustomFormatter";
private static final OLogManager instance = new OLogManager();
private boolean debug = true;
private boolean info = true;
private boolean warn = true;
private boolean error = true;
private Level minimumLevel = Level.SEVERE;

private final ConcurrentMap<String, Logger> loggersCache = new ConcurrentHashMap<String, Logger>();

protected OLogManager() {
}
Expand Down Expand Up @@ -100,7 +104,25 @@ public void log(final Object iRequester, final Level iLevel, String iMessage, fi
} catch (Throwable e) {
}

final Logger log = iRequester != null ? Logger.getLogger(iRequester.getClass().getName()) : Logger.getLogger(DEFAULT_LOG);
final String requesterName;
if (iRequester != null) {
requesterName = iRequester.getClass().getName();
} else {
requesterName = DEFAULT_LOG;
}

Logger log = loggersCache.get(requesterName);
if (log == null) {
log = Logger.getLogger(requesterName);

if (log != null) {
Logger oldLogger = loggersCache.putIfAbsent(requesterName, log);

if (oldLogger != null)
log = oldLogger;
}
}

if (log == null) {
// USE SYSERR
try {
Expand Down
Expand Up @@ -31,7 +31,7 @@ public class OOperationUnitId {
private static final AtomicLong sharedId = new AtomicLong();

private static volatile ThreadLocal<OModifiableLong> localId = new ThreadLocal<OModifiableLong>();
private static volatile ThreadLocal<OModifiableLong> sharedIdCopy = new ThreadLocal<OModifiableLong>();
private static volatile ThreadLocal<Long> sharedIdCopy = new ThreadLocal<Long>();

public static final int SERIALIZED_SIZE = 2 * OLongSerializer.LONG_SIZE;

Expand All @@ -43,7 +43,7 @@ public void onStartup() {
localId = new ThreadLocal<OModifiableLong>();

if (sharedIdCopy == null)
sharedIdCopy = new ThreadLocal<OModifiableLong>();
sharedIdCopy = new ThreadLocal<Long>();
}

@Override
Expand Down Expand Up @@ -72,14 +72,14 @@ public static OOperationUnitId generateId() {
}
lId.increment();

OModifiableLong sId = sharedIdCopy.get();
Long sId = sharedIdCopy.get();
if (sId == null) {
sId = new OModifiableLong(sharedId.incrementAndGet());
sId = sharedId.incrementAndGet();
sharedIdCopy.set(sId);
}

operationUnitId.lId = lId.getValue();
operationUnitId.sId = sId.getValue();
operationUnitId.sId = sId;

return operationUnitId;
}
Expand Down

0 comments on commit 9554eb4

Please sign in to comment.