Skip to content

Commit

Permalink
Merge pull request #239 from microstream-one/volatile_to_atomic
Browse files Browse the repository at this point in the history
Volatile to atomic
  • Loading branch information
zdenek-jonas committed Sep 15, 2021
2 parents bceb46a + aaecfbf commit 0f144e4
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 77 deletions.
21 changes: 11 additions & 10 deletions base/src/main/java/one/microstream/concurrency/Threaded.java
Expand Up @@ -24,6 +24,7 @@
import static java.lang.Thread.currentThread;

import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -151,7 +152,7 @@ public static final <E> Threaded<E> New(final E currentThreadsElement)
////////////////////

private volatile Entry<E>[] slots;
private volatile int size;
private AtomicInteger size = new AtomicInteger();
private volatile Consumer<E> cleanUpOperation;


Expand All @@ -170,7 +171,7 @@ public Threaded()
{
super();
this.slots = new Entry[1]; // in this case small memory need is preferable to good low-grow performance
this.size = 0;
this.size.set(0);
this.cleanUpOperation = null;
}

Expand All @@ -194,7 +195,7 @@ public Threaded(final int initialCapacity)
{
super();
this.slots = Threaded.<E>createSlots(initialCapacity);
this.size = 0;
this.size.set(0);
this.cleanUpOperation = null;
}

Expand Down Expand Up @@ -362,7 +363,7 @@ private synchronized void addEntry(final int threadIdHashCode, final Entry<E> en
(slots = this.slots)[threadIdHashCode & slots.length - 1] = entry.next(slots[threadIdHashCode & slots.length - 1]);

// increase size and rebuild slots array if necessary
if(this.size++ == slots.length)
if(this.size.getAndIncrement() == slots.length)
{
this.internalOptimize();
}
Expand Down Expand Up @@ -405,7 +406,7 @@ private synchronized void removeForCurrentThread(final int threadIdHashCode)
@SuppressWarnings("unchecked")
private int internalOptimize()
{
final Entry<E>[] slots, buffer = new Entry[this.size];
final Entry<E>[] slots, buffer = new Entry[this.size.get()];
final int slotsLength = (slots = this.slots).length;
final Consumer<E> cleanUpOp = this.cleanUpOperation;
int count = 0;
Expand Down Expand Up @@ -464,7 +465,7 @@ else if(cleanUpOp != null)
newSlots[index = identityHashCode(thread) & modulo] = new Entry<>(thread, buffer[i].value, newSlots[index]);
}

this.size = count;
this.size.set(count);
this.slots = newSlots;
return newSlots.length - count;

Expand Down Expand Up @@ -533,9 +534,9 @@ public synchronized long optimize()
@Override
public synchronized long consolidate()
{
final int oldSize = this.size;
final int oldSize = this.size.get();
this.internalOptimize();
return oldSize - this.size;
return oldSize - this.size.get();
}

/**
Expand All @@ -553,7 +554,7 @@ public synchronized long consolidate()
@Override
public boolean isEmpty()
{
return this.size == 0;
return this.size.get() == 0;
}

/**
Expand All @@ -570,7 +571,7 @@ public boolean isEmpty()
@Override
public long size()
{
return this.size;
return this.size.get();
}

public synchronized boolean containsSearched(final Predicate<? super E> predicate)
Expand Down
55 changes: 28 additions & 27 deletions cache/cache/src/main/java/one/microstream/cache/types/Cache.java
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import javax.cache.CacheException;
Expand Down Expand Up @@ -183,8 +184,8 @@ public static class Default<K, V> implements Cache<K, V>
private final ExecutorService executorService ;
private final CacheConfigurationMXBean cacheConfigurationMXBean;
private final CacheStatisticsMXBean cacheStatisticsMXBean ;
private volatile boolean isStatisticsEnabled ;
private volatile boolean isClosed ;
private AtomicBoolean isStatisticsEnabled = new AtomicBoolean();
private AtomicBoolean isClosed = new AtomicBoolean();

/*
* According to spec cache and configuration, which may be mutable,
Expand Down Expand Up @@ -352,18 +353,18 @@ public void deregisterCacheEntryListener(final CacheEntryListenerConfiguration<K
@Override
public boolean isClosed()
{
return this.isClosed;
return this.isClosed.get();
}

@Override
public synchronized void close()
{
if(this.isClosed)
if(this.isClosed.get())
{
return;
}

this.isClosed = true;
this.isClosed.set(true);

this.manager.removeCache(this.name);

Expand Down Expand Up @@ -555,8 +556,8 @@ public void put(final K key, final V value)
this.keyValidator.validate(key);
this.valueValidator.validate(value);

final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final long start = isStatisticsEnabled
final AtomicBoolean isStatisticsEnabled = this.isStatisticsEnabled;
final long start = isStatisticsEnabled.get()
? System.nanoTime()
: 0;

Expand Down Expand Up @@ -641,7 +642,7 @@ public void put(final K key, final V value)
{
eventDispatcher.dispatch(this.listenerRegistrations);
}
if(isStatisticsEnabled && putCount > 0)
if(isStatisticsEnabled.get() && putCount > 0)
{
final CacheStatisticsMXBean cacheStatisticsMXBean = this.cacheStatisticsMXBean;
cacheStatisticsMXBean.increaseCachePuts(putCount);
Expand All @@ -657,8 +658,8 @@ public V getAndPut(final K key, final V value)
this.keyValidator.validate(key);
this.valueValidator.validate(value);

final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final long start = isStatisticsEnabled
final AtomicBoolean isStatisticsEnabled = this.isStatisticsEnabled;
final long start = isStatisticsEnabled.get()
? System.nanoTime()
: 0;

Expand Down Expand Up @@ -744,7 +745,7 @@ public V getAndPut(final K key, final V value)
{
eventDispatcher.dispatch(this.listenerRegistrations);
}
if(isStatisticsEnabled)
if(isStatisticsEnabled.get())
{
final CacheStatisticsMXBean cacheStatisticsMXBean = this.cacheStatisticsMXBean;
if(result == null)
Expand Down Expand Up @@ -780,7 +781,7 @@ public void putAll(
map.keySet().forEach(this.keyValidator::validate);
map.values().forEach(this.valueValidator::validate);

final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final boolean isStatisticsEnabled = this.isStatisticsEnabled.get();
final long start = isStatisticsEnabled
? System.nanoTime()
: 0;
Expand Down Expand Up @@ -938,7 +939,7 @@ public boolean putIfAbsent(final K key, final V value)
this.keyValidator.validate(key);
this.valueValidator.validate(value);

final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final boolean isStatisticsEnabled = this.isStatisticsEnabled.get();
final long start = isStatisticsEnabled
? System.nanoTime()
: 0;
Expand Down Expand Up @@ -1036,7 +1037,7 @@ public boolean remove(final K key)

this.keyValidator.validate(key);

final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final boolean isStatisticsEnabled = this.isStatisticsEnabled.get();
final long start = isStatisticsEnabled
? System.nanoTime()
: 0;
Expand Down Expand Up @@ -1101,7 +1102,7 @@ public boolean remove(final K key, final V oldValue)
this.keyValidator.validate(key);
this.valueValidator.validate(oldValue);

final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final boolean isStatisticsEnabled = this.isStatisticsEnabled.get();
final long start = isStatisticsEnabled
? System.nanoTime()
: 0;
Expand Down Expand Up @@ -1187,7 +1188,7 @@ public V getAndRemove(final K key)

this.keyValidator.validate(key);

final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final boolean isStatisticsEnabled = this.isStatisticsEnabled.get();
final long start = isStatisticsEnabled
? System.nanoTime()
: 0;
Expand Down Expand Up @@ -1256,7 +1257,7 @@ public boolean replace(final K key, final V oldValue, final V newValue)
this.valueValidator.validate(newValue);
this.valueValidator.validate(oldValue);

final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final boolean isStatisticsEnabled = this.isStatisticsEnabled.get();
final long start = isStatisticsEnabled
? System.nanoTime()
: 0;
Expand Down Expand Up @@ -1348,7 +1349,7 @@ public boolean replace(final K key, final V value)
this.keyValidator.validate(key);
this.valueValidator.validate(value);

final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final boolean isStatisticsEnabled = this.isStatisticsEnabled.get();
final long start = isStatisticsEnabled
? System.nanoTime()
: 0;
Expand Down Expand Up @@ -1423,7 +1424,7 @@ public V getAndReplace(final K key, final V value)
this.keyValidator.validate(key);
this.valueValidator.validate(value);

final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final boolean isStatisticsEnabled = this.isStatisticsEnabled.get();
final long start = isStatisticsEnabled
? System.nanoTime()
: 0;
Expand Down Expand Up @@ -1499,7 +1500,7 @@ public void removeAll(final Set<? extends K> keys)

keys.forEach(this.keyValidator::validate);

final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final boolean isStatisticsEnabled = this.isStatisticsEnabled.get();
final long now = System.currentTimeMillis();
final CacheEventDispatcher<K, V> eventDispatcher = this.listenerRegistrations.size() > 0L
? CacheEventDispatcher.New()
Expand Down Expand Up @@ -1615,7 +1616,7 @@ public void removeAll()
{
this.ensureOpen();

final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final boolean isStatisticsEnabled = this.isStatisticsEnabled.get();
int removed = 0;
final long now = System.currentTimeMillis();
final CacheEventDispatcher<K, V> eventDispatcher = this.listenerRegistrations.size() > 0L
Expand Down Expand Up @@ -1734,7 +1735,7 @@ public Iterator<Cache.Entry<K, V>> iterator()
@Override
public void setStatisticsEnabled(final boolean enabled)
{
this.isStatisticsEnabled = enabled;
this.isStatisticsEnabled.set(enabled);

this.updateConfiguration(c -> c.setStatisticsEnabled(enabled));

Expand Down Expand Up @@ -1822,7 +1823,7 @@ public <T> T invoke(

this.keyValidator.validate(key);

final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final boolean isStatisticsEnabled = this.isStatisticsEnabled.get();
long start = isStatisticsEnabled
? System.nanoTime()
: 0;
Expand Down Expand Up @@ -2096,7 +2097,7 @@ private void finishInvocationRemove(

private void ensureOpen()
{
if(this.isClosed)
if(this.isClosed.get())
{
throw new IllegalStateException("Cache is closed");
}
Expand All @@ -2108,7 +2109,7 @@ private V getValue(
final Reference<CachedValue> cachedValueReference
)
{
final boolean isStatisticsEnabled = this.isStatisticsEnabled;
final boolean isStatisticsEnabled = this.isStatisticsEnabled.get();
final long start = isStatisticsEnabled
? System.nanoTime()
: 0;
Expand Down Expand Up @@ -2241,7 +2242,7 @@ public void evict(final Iterable<KeyValue<Object, CachedValue>> entriesToEvict)
}
}

if(this.isStatisticsEnabled && evictionCount > 0)
if(this.isStatisticsEnabled.get() && evictionCount > 0)
{
if(eventDispatcher != null)
{
Expand Down Expand Up @@ -2441,7 +2442,7 @@ private final class EntryIterator implements Iterator<Cache.Entry<K, V>>
this.nextEntry = null;
this.lastEntry = null;
this.now = System.currentTimeMillis();
this.isStatisticsEnabled = Cache.Default.this.isStatisticsEnabled;
this.isStatisticsEnabled = Cache.Default.this.isStatisticsEnabled.get();
}

private void fetch()
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.lang.ref.WeakReference;
import java.net.URI;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.cache.CacheException;
import javax.cache.configuration.CompleteConfiguration;
Expand Down Expand Up @@ -84,7 +85,7 @@ public static class Default implements CacheManager
private final WeakReference<ClassLoader> classLoaderReference;
private final Properties properties;
private final EqHashTable<String, Cache<?, ?>> caches = EqHashTable.New();
private volatile boolean isClosed = false;
private final AtomicBoolean isClosed = new AtomicBoolean(false);

Default(
final CachingProvider cachingProvider,
Expand Down Expand Up @@ -113,7 +114,7 @@ public CachingProvider getCachingProvider()
@Override
public boolean isClosed()
{
return this.isClosed;
return this.isClosed.get();
}

@Override
Expand Down Expand Up @@ -276,13 +277,13 @@ public void enableStatistics(final String cacheName, final boolean enabled)
@Override
public synchronized void close()
{
if(this.isClosed)
if(this.isClosed.get())
{
// no-op, according to spec
return;
}

this.isClosed = true;
this.isClosed.set(true);

this.cachingProvider.remove(
this.getURI(),
Expand Down Expand Up @@ -311,7 +312,7 @@ public synchronized void close()

private void ensureOpen()
{
if(this.isClosed)
if(this.isClosed.get())
{
throw new IllegalStateException("CacheManager is closed");
}
Expand Down

0 comments on commit 0f144e4

Please sign in to comment.