Skip to content

Commit

Permalink
Issue #2029 , mechanics of freezing new atomic operations was added. …
Browse files Browse the repository at this point in the history
…Modification lock was replaced by new mechanics.
  • Loading branch information
laa committed Sep 15, 2015
1 parent 99765cc commit d10ab9b
Show file tree
Hide file tree
Showing 16 changed files with 857 additions and 1,600 deletions.
Expand Up @@ -19,7 +19,6 @@
*/ */
package com.orientechnologies.orient.client.remote; package com.orientechnologies.orient.client.remote;


import com.orientechnologies.common.concur.lock.OModificationLock;
import com.orientechnologies.orient.core.config.OStorageClusterConfiguration; import com.orientechnologies.orient.core.config.OStorageClusterConfiguration;
import com.orientechnologies.orient.core.conflict.ORecordConflictStrategy; import com.orientechnologies.orient.core.conflict.ORecordConflictStrategy;
import com.orientechnologies.orient.core.exception.ORecordNotFoundException; import com.orientechnologies.orient.core.exception.ORecordNotFoundException;
Expand Down Expand Up @@ -83,11 +82,6 @@ public void open() throws IOException {
public void close() throws IOException { public void close() throws IOException {
} }


@Override
public OModificationLock getExternalModificationLock() {
throw new UnsupportedOperationException("getExternalModificationLock");
}

@Override @Override
public void close(boolean flush) throws IOException { public void close(boolean flush) throws IOException {
} }
Expand Down
Expand Up @@ -17,7 +17,7 @@ public class ODistributedCounter extends OOrientListenerAbstract {
private static final int MAX_RETRIES = 8; private static final int MAX_RETRIES = 8;


private static final AtomicInteger nextHashCode = new AtomicInteger(); private static final AtomicInteger nextHashCode = new AtomicInteger();
private final AtomicBoolean poolBusy = new AtomicBoolean(); private final AtomicBoolean isBusy = new AtomicBoolean();
private final int maxPartitions = Runtime.getRuntime().availableProcessors() << 3; private final int maxPartitions = Runtime.getRuntime().availableProcessors() << 3;




Expand Down Expand Up @@ -60,7 +60,8 @@ public void add(long delta) {
} }


public void clear() { public void clear() {
while (!poolBusy.compareAndSet(false, true)) ; while (!isBusy.compareAndSet(false, true))
;


final AtomicLong[] cts = new AtomicLong[counters.length]; final AtomicLong[] cts = new AtomicLong[counters.length];
for (int i = 0; i < counters.length; i++) { for (int i = 0; i < counters.length; i++) {
Expand All @@ -69,7 +70,7 @@ public void clear() {


counters = cts; counters = cts;


poolBusy.set(false); isBusy.set(false);
} }


private void updateCounter(long delta) { private void updateCounter(long delta) {
Expand All @@ -82,15 +83,15 @@ private void updateCounter(long delta) {
AtomicLong counter = cts[index]; AtomicLong counter = cts[index];


if (counter == null) { if (counter == null) {
if (!poolBusy.get() && poolBusy.compareAndSet(false, true)) { if (!isBusy.get() && isBusy.compareAndSet(false, true)) {
if (cts == counters) { if (cts == counters) {
counter = cts[index]; counter = cts[index];


if (counter == null) if (counter == null)
cts[index] = new AtomicLong(); cts[index] = new AtomicLong();
} }


poolBusy.set(false); isBusy.set(false);
} }


continue; continue;
Expand All @@ -112,7 +113,7 @@ private void updateCounter(long delta) {
return; return;
} }


if (!poolBusy.get() && poolBusy.compareAndSet(false, true)) { if (!isBusy.get() && isBusy.compareAndSet(false, true)) {
if (cts == counters) { if (cts == counters) {
if (cts.length < maxPartitions) { if (cts.length < maxPartitions) {
final AtomicLong[] newCts = new AtomicLong[cts.length << 1]; final AtomicLong[] newCts = new AtomicLong[cts.length << 1];
Expand All @@ -121,7 +122,7 @@ private void updateCounter(long delta) {
} }
} }


poolBusy.set(false); isBusy.set(false);
} }


continue; continue;
Expand Down

This file was deleted.

Expand Up @@ -433,46 +433,7 @@ enum ATTRIBUTES {
*/ */
void freeze(boolean throwException); void freeze(boolean throwException);


/**
* Flush cached cluster content to the disk.
*
* After this call users can perform only select queries. All write-related commands will queued till {@link #releaseCluster(int)}
* command will be called.
*
* Given command waits till all on going modifications in indexes or DB will be finished.
*
* IMPORTANT: This command is not reentrant.
*
* @param iClusterId
* that must be released
*/
void freezeCluster(int iClusterId);

/**
* Allows to execute write-related commands on the cluster
*
* @param iClusterId
* that must be released
*/
void releaseCluster(int iClusterId);


/**
* Flush cached cluster content to the disk.
*
* After this call users can perform only select queries. All write-related commands will queued till {@link #releaseCluster(int)}
* command will be called.
*
* Given command waits till all on going modifications in indexes or DB will be finished.
*
* IMPORTANT: This command is not reentrant.
*
* @param iClusterId
* that must be released
* @param throwException
* If <code>true</code> {@link com.orientechnologies.common.concur.lock.OModificationOperationProhibitedException}
* exception will be thrown in case of write command will be performed.
*/
void freezeCluster(int iClusterId, boolean throwException);


enum OPERATION_MODE { enum OPERATION_MODE {
SYNCHRONOUS, ASYNCHRONOUS, ASYNCHRONOUS_NOANSWER SYNCHRONOUS, ASYNCHRONOUS, ASYNCHRONOUS_NOANSWER
Expand Down
Expand Up @@ -349,21 +349,6 @@ public void release() {
underlying.release(); underlying.release();
} }


@Override
public void freezeCluster(int iClusterId, boolean throwException) {
underlying.freezeCluster(iClusterId, throwException);
}

@Override
public void freezeCluster(int iClusterId) {
underlying.freezeCluster(iClusterId);
}

@Override
public void releaseCluster(int iClusterId) {
underlying.releaseCluster(iClusterId);
}

protected void checkOpeness() { protected void checkOpeness() {
if (isClosed()) if (isClosed())
throw new ODatabaseException("Database '" + getURL() + "' is closed"); throw new ODatabaseException("Database '" + getURL() + "' is closed");
Expand Down
Expand Up @@ -1421,35 +1421,6 @@ public ORecordMetadata getRecordMetadata(final ORID rid) {
return storage.getRecordMetadata(rid); return storage.getRecordMetadata(rid);
} }


@Override
public void freezeCluster(final int iClusterId) {
freezeCluster(iClusterId, false);
}

@Override
public void releaseCluster(final int iClusterId) {
checkIfActive();
final OLocalPaginatedStorage storage;
if (getStorage() instanceof OLocalPaginatedStorage)
storage = ((OLocalPaginatedStorage) getStorage());
else {
OLogManager.instance().error(this, "Only local paginated storage supports release of cluster");
return;
}

storage.release(iClusterId);
}

@Override
public void freezeCluster(final int iClusterId, final boolean throwException) {
checkIfActive();
if (getStorage() instanceof OLocalPaginatedStorage) {
final OLocalPaginatedStorage paginatedStorage = ((OLocalPaginatedStorage) getStorage());
paginatedStorage.freeze(throwException, iClusterId);
} else {
OLogManager.instance().error(this, "Only local paginated storage supports freeze of cluster");
}
}


public OTransaction getTransaction() { public OTransaction getTransaction() {
checkIfActive(); checkIfActive();
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/ */
package com.orientechnologies.orient.core.storage; package com.orientechnologies.orient.core.storage;


import com.orientechnologies.common.concur.lock.OModificationLock;
import com.orientechnologies.orient.core.config.OStorageClusterConfiguration; import com.orientechnologies.orient.core.config.OStorageClusterConfiguration;
import com.orientechnologies.orient.core.conflict.ORecordConflictStrategy; import com.orientechnologies.orient.core.conflict.ORecordConflictStrategy;
import com.orientechnologies.orient.core.exception.ORecordNotFoundException; import com.orientechnologies.orient.core.exception.ORecordNotFoundException;
Expand Down Expand Up @@ -47,8 +46,6 @@ enum ATTRIBUTES {


void delete() throws IOException; void delete() throws IOException;


OModificationLock getExternalModificationLock();

Object set(ATTRIBUTES iAttribute, Object iValue) throws IOException; Object set(ATTRIBUTES iAttribute, Object iValue) throws IOException;


String encryption(); String encryption();
Expand Down

0 comments on commit d10ab9b

Please sign in to comment.