Skip to content

Commit

Permalink
Issue #4645 was fixed.
Browse files Browse the repository at this point in the history
  • Loading branch information
laa committed Jul 28, 2015
1 parent 4efad7d commit 69ad5ec
Show file tree
Hide file tree
Showing 9 changed files with 625 additions and 469 deletions.
Expand Up @@ -67,6 +67,7 @@
import com.orientechnologies.orient.core.storage.OStorageAbstract;
import com.orientechnologies.orient.core.storage.OStorageOperationResult;
import com.orientechnologies.orient.core.storage.OStorageProxy;
import com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage;
import com.orientechnologies.orient.core.storage.impl.local.paginated.ORecordSerializationContext;
import com.orientechnologies.orient.core.tx.OTransaction;
import com.orientechnologies.orient.core.tx.OTransactionAbstract;
Expand Down Expand Up @@ -211,7 +212,7 @@ public void removeRemoteServerEventListener() {
public void open(final String iUserName, final String iUserPassword, final Map<String, Object> iOptions) {
addUser();

lock.acquireExclusiveLock();
stateLock.acquireWriteLock();
try {

connectionUserName = iUserName;
Expand All @@ -235,15 +236,14 @@ public void open(final String iUserName, final String iUserPassword, final Map<S
throw new OStorageException("Cannot open the remote storage: " + name, e);

} finally {
lock.releaseExclusiveLock();
stateLock.releaseWriteLock();
}
}

public void reload() {

lock.acquireExclusiveLock();
stateLock.acquireWriteLock();
try {

OChannelBinaryAsynchClient network = null;
do {
try {
Expand All @@ -269,9 +269,8 @@ public void reload() {

}
} while (true);

} finally {
lock.releaseExclusiveLock();
stateLock.releaseWriteLock();
}
}

Expand All @@ -291,7 +290,7 @@ public void close(final boolean iForce, boolean onDelete) {

OChannelBinaryAsynchClient network = null;

lock.acquireExclusiveLock();
stateLock.acquireWriteLock();
try {
if (status == STATUS.CLOSED)
return;
Expand Down Expand Up @@ -321,23 +320,50 @@ public void close(final boolean iForce, boolean onDelete) {
network.close();
}
} finally {
lock.releaseExclusiveLock();
stateLock.releaseWriteLock();
}
}

private boolean checkForClose(final boolean force) {
if (status == STATUS.CLOSED)
return false;

if (status == STATUS.CLOSED)
return false;

final int remainingUsers = getUsers() > 0 ? removeUser() : 0;

return force || remainingUsers == 0;
}

@Override
public int getUsers() {
return dataLock.getUsers();
}

@Override
public int addUser() {
return dataLock.addUser();
}

@Override
public int removeUser() {
return dataLock.removeUser();
}

public void delete() {
throw new UnsupportedOperationException(
"Cannot delete a database in a remote server. Please use the console or the OServerAdmin class.");
}

public Set<String> getClusterNames() {
lock.acquireSharedLock();
stateLock.acquireReadLock();
try {

return new HashSet<String>(clusterMap.keySet());

} finally {
lock.releaseSharedLock();
stateLock.releaseReadLock();
}
}

Expand Down Expand Up @@ -1387,7 +1413,7 @@ public void rollback(OTransaction iTx) {
}

public int getClusterIdByName(final String iClusterName) {
lock.acquireSharedLock();
stateLock.acquireReadLock();
try {

if (iClusterName == null)
Expand All @@ -1402,7 +1428,7 @@ public int getClusterIdByName(final String iClusterName) {

return cluster.getId();
} finally {
lock.releaseSharedLock();
stateLock.releaseReadLock();
}
}

Expand All @@ -1422,7 +1448,7 @@ public int addCluster(String iClusterName, int iRequestedId, boolean forceListBa

OChannelBinaryAsynchClient network = null;
do {
lock.acquireExclusiveLock();
stateLock.acquireWriteLock();
try {
try {
network = beginRequest(OChannelBinaryProtocol.REQUEST_DATACLUSTER_ADD);
Expand Down Expand Up @@ -1455,7 +1481,7 @@ public int addCluster(String iClusterName, int iRequestedId, boolean forceListBa
} catch (Exception e) {
handleException(network, "Error on add new cluster", e);
} finally {
lock.releaseExclusiveLock();
stateLock.releaseWriteLock();
}
} while (true);
}
Expand All @@ -1464,7 +1490,7 @@ public boolean dropCluster(final int iClusterId, final boolean iTruncate) {

OChannelBinaryAsynchClient network = null;
do {
lock.acquireExclusiveLock();
stateLock.acquireWriteLock();
try {
try {
network = beginRequest(OChannelBinaryProtocol.REQUEST_DATACLUSTER_DROP);
Expand Down Expand Up @@ -1499,9 +1525,8 @@ public boolean dropCluster(final int iClusterId, final boolean iTruncate) {
handleDBFreeze();
} catch (Exception e) {
handleException(network, "Error on removing of cluster", e);

} finally {
lock.releaseExclusiveLock();
stateLock.releaseWriteLock();
}
} while (true);
}
Expand All @@ -1510,7 +1535,7 @@ public void synch() {
}

public String getPhysicalClusterNameById(final int iClusterId) {
lock.acquireSharedLock();
stateLock.acquireReadLock();
try {

if (iClusterId >= clusters.length)
Expand All @@ -1520,32 +1545,32 @@ public String getPhysicalClusterNameById(final int iClusterId) {
return cluster != null ? cluster.getName() : null;

} finally {
lock.releaseSharedLock();
stateLock.releaseReadLock();
}
}

public int getClusterMap() {
lock.acquireSharedLock();
stateLock.acquireReadLock();
try {
return clusterMap.size();
} finally {
lock.releaseSharedLock();
stateLock.releaseReadLock();
}
}

public Collection<OCluster> getClusterInstances() {
lock.acquireSharedLock();
stateLock.acquireReadLock();
try {

return Arrays.asList(clusters);

} finally {
lock.releaseSharedLock();
stateLock.releaseReadLock();
}
}

public OCluster getClusterById(int iClusterId) {
lock.acquireSharedLock();
stateLock.acquireReadLock();
try {

if (iClusterId == ORID.CLUSTER_ID_INVALID)
Expand All @@ -1555,7 +1580,7 @@ public OCluster getClusterById(int iClusterId) {
return clusters[iClusterId];

} finally {
lock.releaseSharedLock();
stateLock.releaseReadLock();
}
}

Expand Down Expand Up @@ -1659,11 +1684,11 @@ public String getClientId() {
}

public int getClusters() {
lock.acquireSharedLock();
stateLock.acquireReadLock();
try {
return clusterMap.size();
} finally {
lock.releaseSharedLock();
stateLock.releaseReadLock();
}
}

Expand Down
Expand Up @@ -148,15 +148,6 @@ public int addUser() {
}
}

public OSharedResourceAdaptiveExternal getLock() {
pushSession();
try {
return delegate.getLock();
} finally {
popSession();
}
}

public void setSessionId(final String iServerURL, final int iSessionId, byte[] iToken) {
serverURL = iServerURL;
sessionId = iSessionId;
Expand Down
Expand Up @@ -500,7 +500,10 @@ public ODatabaseDocumentTx copy() {
db.metadata = new OMetadataDefault();
db.initialized = true;
db.storage = storage;
db.storage.addUser();

if (storage instanceof OStorageProxy)
((OStorageProxy) db.storage).addUser();

db.setStatus(STATUS.OPEN);
db.activateOnCurrentThread();
db.metadata.load();
Expand Down
Expand Up @@ -83,8 +83,6 @@ enum LOCKING_STRATEGY {

boolean isClosed();

OSharedResourceAdaptiveExternal getLock();

// CRUD OPERATIONS
OStorageOperationResult<OPhysicalPosition> createRecord(ORecordId iRecordId, byte[] iContent, ORecordVersion iRecordVersion,
byte iRecordType, int iMode, ORecordCallback<Long> iCallback);
Expand Down Expand Up @@ -191,11 +189,6 @@ OStorageOperationResult<Boolean> deleteRecord(ORecordId iRecordId, ORecordVersio

void synch();

int getUsers();

int addUser();

int removeUser();

/**
* Execute the command request and return the result back.
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package com.orientechnologies.orient.core.storage;

import com.orientechnologies.common.concur.lock.OReadersWriterSpinLock;
import com.orientechnologies.common.concur.resource.*;
import com.orientechnologies.common.exception.OException;
import com.orientechnologies.orient.core.Orient;
Expand Down Expand Up @@ -65,7 +66,9 @@ public abstract class OStorageAbstract implements OStorage, OSharedContainer {

protected final String url;
protected final String mode;
protected final OSharedResourceAdaptiveExternal lock;
protected final OSharedResourceAdaptiveExternal dataLock;
protected final OReadersWriterSpinLock stateLock;

protected volatile OStorageConfiguration configuration;
protected volatile OCurrentStorageComponentsFactory componentsFactory;
protected String name;
Expand All @@ -86,7 +89,8 @@ public OStorageAbstract(final String name, final String iURL, final String mode,
url = iURL;
this.mode = mode;

lock = new OSharedResourceAdaptiveExternal(OGlobalConfiguration.ENVIRONMENT_CONCURRENT.getValueAsBoolean(), timeout, true);
dataLock = new OSharedResourceAdaptiveExternal(OGlobalConfiguration.ENVIRONMENT_CONCURRENT.getValueAsBoolean(), timeout, true);
stateLock = new OReadersWriterSpinLock();
}

public abstract OCluster getClusterByName(final String iClusterName);
Expand Down Expand Up @@ -149,21 +153,6 @@ public boolean dropCluster(final String iClusterName, final boolean iTruncate) {
return dropCluster(getClusterIdByName(iClusterName), iTruncate);
}

public int getUsers() {
return lock.getUsers();
}

public int addUser() {
return lock.addUser();
}

public int removeUser() {
return lock.removeUser();
}

public OSharedResourceAdaptiveExternal getLock() {
return lock;
}

public long countRecords() {
long tot = 0;
Expand All @@ -176,21 +165,26 @@ public long countRecords() {
}

public <V> V callInLock(final Callable<V> iCallable, final boolean iExclusiveLock) {
if (iExclusiveLock)
lock.acquireExclusiveLock();
else
lock.acquireSharedLock();
stateLock.acquireReadLock();
try {
return iCallable.call();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new OException("Error on nested call in lock", e);
} finally {
if (iExclusiveLock)
lock.releaseExclusiveLock();
dataLock.acquireExclusiveLock();
else
lock.releaseSharedLock();
dataLock.acquireSharedLock();
try {
return iCallable.call();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new OException("Error on nested call in lock", e);
} finally {
if (iExclusiveLock)
dataLock.releaseExclusiveLock();
else
dataLock.releaseSharedLock();
}
} finally {
stateLock.releaseReadLock();
}
}

Expand Down Expand Up @@ -237,20 +231,4 @@ public long getLastOperationId() {
return 0;
}

protected boolean checkForClose(final boolean force) {
if (status == STATUS.CLOSED)
return false;

lock.acquireSharedLock();
try {
if (status == STATUS.CLOSED)
return false;

final int remainingUsers = getUsers() > 0 ? removeUser() : 0;

return force || (!(this instanceof OAbstractPaginatedStorage) && remainingUsers == 0);
} finally {
lock.releaseSharedLock();
}
}
}

0 comments on commit 69ad5ec

Please sign in to comment.