Skip to content

Commit

Permalink
WaitFreeExecutionSerializer memory footprint (#216)
Browse files Browse the repository at this point in the history
Motivation:

WaitFreeExecutionSerializer is created for every actor instance, AtomicBoolean and AtomicInteger can be substituted with primitive types to save memory.

Modifications:

Replaced AtomicBoolean and AtomicInteger with volatile int and static AtomicIntegerFieldUpdaters.

Result:

Fixes #214.
  • Loading branch information
johnou authored and JoeHegarty committed Feb 13, 2017
1 parent 1a18185 commit c349297
Showing 1 changed file with 18 additions and 13 deletions.
Expand Up @@ -28,17 +28,16 @@


package cloud.orbit.actors.concurrent; package cloud.orbit.actors.concurrent;


import cloud.orbit.actors.runtime.InternalUtils;
import cloud.orbit.concurrent.Task;

import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import cloud.orbit.actors.runtime.InternalUtils;
import cloud.orbit.concurrent.Task;

import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier; import java.util.function.Supplier;


/** /**
Expand All @@ -51,10 +50,16 @@ public class WaitFreeExecutionSerializer implements ExecutionSerializer, Executo
private static final Logger logger = LoggerFactory.getLogger(WaitFreeExecutionSerializer.class); private static final Logger logger = LoggerFactory.getLogger(WaitFreeExecutionSerializer.class);
private static final boolean DEBUG_ENABLED = logger.isDebugEnabled(); private static final boolean DEBUG_ENABLED = logger.isDebugEnabled();


private static final AtomicIntegerFieldUpdater<WaitFreeExecutionSerializer> lockUpdater = AtomicIntegerFieldUpdater.newUpdater(WaitFreeExecutionSerializer.class, "lock");
private static final AtomicIntegerFieldUpdater<WaitFreeExecutionSerializer> sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(WaitFreeExecutionSerializer.class, "size");

private final ExecutorService executorService; private final ExecutorService executorService;
private final ConcurrentLinkedQueue<Supplier<Task<?>>> queue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Supplier<Task<?>>> queue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean lock = new AtomicBoolean();
private final AtomicInteger size = new AtomicInteger(); @SuppressWarnings("FieldCanBeLocal")
private volatile int lock = 0;
@SuppressWarnings("FieldCanBeLocal")
private volatile int size = 0;
private final Object key; private final Object key;


public WaitFreeExecutionSerializer(final ExecutorService executorService) public WaitFreeExecutionSerializer(final ExecutorService executorService)
Expand All @@ -73,7 +78,7 @@ public <R> Task<R> executeSerialized(Supplier<Task<R>> taskSupplier, int maxQueu
{ {
final Task<R> completion = new Task<>(); final Task<R> completion = new Task<>();


int queueSize = size.get(); int queueSize = size;
if (DEBUG_ENABLED && queueSize >= maxQueueSize / 10) if (DEBUG_ENABLED && queueSize >= maxQueueSize / 10)
{ {
logger.debug("Queued " + queueSize + " / " + maxQueueSize + " for " + key); logger.debug("Queued " + queueSize + " / " + maxQueueSize + " for " + key);
Expand All @@ -89,7 +94,7 @@ public <R> Task<R> executeSerialized(Supplier<Task<R>> taskSupplier, int maxQueu
} }


// managing the size like this to avoid using ConcurrentLinkedQueue.size() // managing the size like this to avoid using ConcurrentLinkedQueue.size()
size.incrementAndGet(); sizeUpdater.incrementAndGet(this);


tryExecute(false); tryExecute(false);


Expand All @@ -99,7 +104,7 @@ public <R> Task<R> executeSerialized(Supplier<Task<R>> taskSupplier, int maxQueu
@Override @Override
public boolean isBusy() public boolean isBusy()
{ {
return lock.get() || !queue.isEmpty(); return lock == 1 || !queue.isEmpty();
} }


/** /**
Expand All @@ -119,7 +124,7 @@ private void tryExecute(boolean local)
final Supplier<Task<?>> toRun = queue.poll(); final Supplier<Task<?>> toRun = queue.poll();
if (toRun != null) if (toRun != null)
{ {
size.decrementAndGet(); sizeUpdater.decrementAndGet(this);
try try
{ {
Task<?> taskFuture; Task<?> taskFuture;
Expand Down Expand Up @@ -209,15 +214,15 @@ private void wrapExecution(final Supplier<Task<?>> toRun, final Task<?> taskFutu


private void unlock() private void unlock()
{ {
if (!lock.compareAndSet(true, false)) if (!lockUpdater.compareAndSet(this, 1, 0))
{ {
logger.error("Unlocking without having the lock"); logger.error("Unlocking without having the lock");
} }
} }


private boolean lock() private boolean lock()
{ {
return lock.compareAndSet(false, true); return lockUpdater.compareAndSet(this, 0, 1);
} }


private <T> void whenCompleteAsync(T result, Throwable error) private <T> void whenCompleteAsync(T result, Throwable error)
Expand Down

0 comments on commit c349297

Please sign in to comment.