Skip to content

Commit

Permalink
Issue #3219 was fixed. But additional tests should be added.
Browse files Browse the repository at this point in the history
  • Loading branch information
laa committed Jan 9, 2015
1 parent cf7a091 commit 2275b84
Show file tree
Hide file tree
Showing 23 changed files with 437 additions and 152 deletions.
Expand Up @@ -21,24 +21,29 @@


import com.orientechnologies.orient.client.remote.OStorageRemoteThreadLocal.OStorageRemoteSession; import com.orientechnologies.orient.client.remote.OStorageRemoteThreadLocal.OStorageRemoteSession;
import com.orientechnologies.orient.core.OOrientListenerAbstract; import com.orientechnologies.orient.core.OOrientListenerAbstract;
import com.orientechnologies.orient.core.OOrientShutdownListener;
import com.orientechnologies.orient.core.OOrientStartupListener;
import com.orientechnologies.orient.core.Orient; import com.orientechnologies.orient.core.Orient;


public class OStorageRemoteThreadLocal extends ThreadLocal<OStorageRemoteSession> { public class OStorageRemoteThreadLocal extends ThreadLocal<OStorageRemoteSession> {
public static volatile OStorageRemoteThreadLocal INSTANCE = new OStorageRemoteThreadLocal(); public static volatile OStorageRemoteThreadLocal INSTANCE = new OStorageRemoteThreadLocal();


static { static {
Orient.instance().registerListener(new OOrientListenerAbstract() {
@Override
public void onShutdown() {
INSTANCE = null;
}


Orient.instance().registerWeakOrientStartupListener(new OOrientStartupListener() {
@Override @Override
public void onStartup() { public void onStartup() {
if (INSTANCE == null) if (INSTANCE == null)
INSTANCE = new OStorageRemoteThreadLocal(); INSTANCE = new OStorageRemoteThreadLocal();
} }
}); });

Orient.instance().registerWeakOrientShutdownListener(new OOrientShutdownListener() {
@Override
public void onShutdown() {
INSTANCE = null;
}
});
} }


public class OStorageRemoteSession { public class OStorageRemoteSession {
Expand Down
Expand Up @@ -21,6 +21,9 @@
package com.orientechnologies.common.concur.lock; package com.orientechnologies.common.concur.lock;


import com.orientechnologies.common.types.OModifiableInteger; import com.orientechnologies.common.types.OModifiableInteger;
import com.orientechnologies.orient.core.OOrientShutdownListener;
import com.orientechnologies.orient.core.OOrientStartupListener;
import com.orientechnologies.orient.core.Orient;


import java.util.HashSet; import java.util.HashSet;
import java.util.Queue; import java.util.Queue;
Expand All @@ -34,30 +37,33 @@
* @author Andrey Lomakin (a.lomakin-at-orientechnologies.com) * @author Andrey Lomakin (a.lomakin-at-orientechnologies.com)
* @since 8/18/14 * @since 8/18/14
*/ */
public class OReadersWriterSpinLock extends AbstractOwnableSynchronizer { public class OReadersWriterSpinLock extends AbstractOwnableSynchronizer implements OOrientStartupListener, OOrientShutdownListener {
private final OThreadCountersHashTable threadCountersHashTable = new OThreadCountersHashTable(); private final OThreadCountersHashTable threadCountersHashTable = new OThreadCountersHashTable();


private final AtomicReference<WNode> tail = new AtomicReference<WNode>(); private final AtomicReference<WNode> tail = new AtomicReference<WNode>();
private final ThreadLocal<OModifiableInteger> lockHolds = new ThreadLocal<OModifiableInteger>() { private volatile ThreadLocal<OModifiableInteger> lockHolds = new ThreadLocal<OModifiableInteger>() {
@Override @Override
protected OModifiableInteger initialValue() { protected OModifiableInteger initialValue() {
return new OModifiableInteger(); return new OModifiableInteger();
} }
}; };


private final ThreadLocal<WNode> myNode = new ThreadLocal<WNode>() { private volatile ThreadLocal<WNode> myNode = new ThreadLocal<WNode>() {
@Override @Override
protected WNode initialValue() { protected WNode initialValue() {
return new WNode(); return new WNode();
} }
}; };
private final ThreadLocal<WNode> predNode = new ThreadLocal<WNode>(); private volatile ThreadLocal<WNode> predNode = new ThreadLocal<WNode>();


public OReadersWriterSpinLock() { public OReadersWriterSpinLock() {
final WNode wNode = new WNode(); final WNode wNode = new WNode();
wNode.locked = false; wNode.locked = false;


tail.set(wNode); tail.set(wNode);

Orient.instance().registerWeakOrientStartupListener(this);
Orient.instance().registerWeakOrientShutdownListener(this);
} }


public void acquireReadLock() { public void acquireReadLock() {
Expand Down Expand Up @@ -176,6 +182,35 @@ public void releaseWriteLock() {
assert lHolds.intValue() == 0; assert lHolds.intValue() == 0;
} }


@Override
public void onShutdown() {
lockHolds = null;
myNode = null;
predNode = null;
}

@Override
public void onStartup() {
if (lockHolds == null)
lockHolds = new ThreadLocal<OModifiableInteger>() {
@Override
protected OModifiableInteger initialValue() {
return new OModifiableInteger();
}
};

if (myNode == null)
myNode = new ThreadLocal<WNode>() {
@Override
protected WNode initialValue() {
return new WNode();
}
};
if (predNode == null)
predNode = new ThreadLocal<WNode>();

}

private final static class WNode { private final static class WNode {
private final Queue<Thread> waitingReaders = new ConcurrentLinkedQueue<Thread>(); private final Queue<Thread> waitingReaders = new ConcurrentLinkedQueue<Thread>();


Expand Down
@@ -1,27 +1,30 @@
/* /*
* *
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com) * * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* * * *
* * Licensed under the Apache License, Version 2.0 (the "License"); * * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License. * * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at * * You may obtain a copy of the License at
* * * *
* * http://www.apache.org/licenses/LICENSE-2.0 * * http://www.apache.org/licenses/LICENSE-2.0
* * * *
* * Unless required by applicable law or agreed to in writing, software * * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS, * * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and * * See the License for the specific language governing permissions and
* * limitations under the License. * * limitations under the License.
* * * *
* * For more information: http://www.orientechnologies.com * * For more information: http://www.orientechnologies.com
* *
*/ */


package com.orientechnologies.common.concur.lock; package com.orientechnologies.common.concur.lock;


import com.orientechnologies.common.hash.OMurmurHash3; import com.orientechnologies.common.hash.OMurmurHash3;
import com.orientechnologies.common.serialization.types.OLongSerializer; import com.orientechnologies.common.serialization.types.OLongSerializer;
import com.orientechnologies.orient.core.OOrientShutdownListener;
import com.orientechnologies.orient.core.OOrientStartupListener;
import com.orientechnologies.orient.core.Orient;


import java.util.Arrays; import java.util.Arrays;
import java.util.Random; import java.util.Random;
Expand All @@ -34,7 +37,7 @@
* @author Andrey Lomakin (a.lomakin-at-orientechnologies.com) * @author Andrey Lomakin (a.lomakin-at-orientechnologies.com)
* @since 8/20/14 * @since 8/20/14
*/ */
public final class OThreadCountersHashTable { public final class OThreadCountersHashTable implements OOrientStartupListener, OOrientShutdownListener {
private static final int SEED = 362498820; private static final int SEED = 362498820;


private static final int NCPU = Runtime.getRuntime().availableProcessors(); private static final int NCPU = Runtime.getRuntime().availableProcessors();
Expand All @@ -44,7 +47,7 @@ public final class OThreadCountersHashTable {
public static final int THRESHOLD = 10; public static final int THRESHOLD = 10;
private final boolean deadThreadsAreAllowed; private final boolean deadThreadsAreAllowed;


private final ThreadLocal<HashEntry> hashEntry = new ThreadLocal<HashEntry>(); private volatile ThreadLocal<HashEntry> hashEntry = new ThreadLocal<HashEntry>();


private volatile int activeTableIndex = 0; private volatile int activeTableIndex = 0;


Expand Down Expand Up @@ -77,6 +80,9 @@ public OThreadCountersHashTable(int initialSize, boolean deadThreadsAreAllowed)
busyCounters = counters; busyCounters = counters;


this.tables = tables; this.tables = tables;

Orient.instance().registerWeakOrientStartupListener(this);
Orient.instance().registerWeakOrientShutdownListener(this);
} }


public void increment() { public void increment() {
Expand Down Expand Up @@ -462,6 +468,17 @@ private static int[] hashCodesByThreadId(final long threadId) {
return new int[] { (int) (hashCode & 0xFFFFFFFFL), (int) (hashCode >>> 32) }; return new int[] { (int) (hashCode & 0xFFFFFFFFL), (int) (hashCode >>> 32) };
} }


@Override
public void onShutdown() {
hashEntry = null;
}

@Override
public void onStartup() {
if (hashEntry == null)
hashEntry = new ThreadLocal<HashEntry>();
}

static final class HashEntry { static final class HashEntry {
private final Thread thread; private final Thread thread;
private final int[] hashCodes; private final int[] hashCodes;
Expand Down
Expand Up @@ -48,7 +48,8 @@ public OPartitionedObjectPool(ObjectFactory factory, int maxSize) {


partitions = pts; partitions = pts;


Orient.instance().registerListener(this); Orient.instance().registerWeakOrientStartupListener(this);
Orient.instance().registerWeakOrientShutdownListener(this);
} }


public PoolEntry<T> acquire() { public PoolEntry<T> acquire() {
Expand Down
Expand Up @@ -41,7 +41,8 @@ public OPartitionedObjectPoolFactory(ObjectFactoryFactory<K, T> objectFactoryFac
poolStore = new ConcurrentLinkedHashMap.Builder<K, OPartitionedObjectPool<T>>().maximumWeightedCapacity(capacity) poolStore = new ConcurrentLinkedHashMap.Builder<K, OPartitionedObjectPool<T>>().maximumWeightedCapacity(capacity)
.listener(evictionListener).build(); .listener(evictionListener).build();


Orient.instance().registerListener(this); Orient.instance().registerWeakOrientStartupListener(this);
Orient.instance().registerWeakOrientShutdownListener(this);
} }


public int getMaxPoolSize() { public int getMaxPoolSize() {
Expand Down
Expand Up @@ -27,10 +27,10 @@
* @author Luca Garulli (l.garulli--at--orientechnologies.com) * @author Luca Garulli (l.garulli--at--orientechnologies.com)
* *
*/ */
public interface OOrientListener { public interface OOrientListener extends OOrientShutdownListener {
public void onShutdown(); public void onShutdown();


public void onStorageRegistered(final OStorage iStorage); public void onStorageRegistered(final OStorage storage);


public void onStorageUnregistered(final OStorage iStorage); public void onStorageUnregistered(final OStorage storage);
} }
Expand Up @@ -27,7 +27,7 @@
* @author Luca Garulli (l.garulli--at--orientechnologies.com) * @author Luca Garulli (l.garulli--at--orientechnologies.com)
* *
*/ */
public abstract class OOrientListenerAbstract implements OOrientListener, OOrientStartupListener { public abstract class OOrientListenerAbstract implements OOrientListener, OOrientStartupListener, OOrientShutdownListener {
@Override @Override
public void onStartup() { public void onStartup() {
} }
Expand Down
@@ -0,0 +1,9 @@
package com.orientechnologies.orient.core;

/**
* @author Andrey Lomakin <a href="mailto:lomakin.andrey@gmail.com">Andrey Lomakin</a>
* @since 09/01/15
*/
public interface OOrientShutdownListener {
void onShutdown();
}

0 comments on commit 2275b84

Please sign in to comment.