From c349297af9f215290894ac466385d5034cc0530e Mon Sep 17 00:00:00 2001 From: Johno Crawford Date: Mon, 13 Feb 2017 21:38:38 +0100 Subject: [PATCH] WaitFreeExecutionSerializer memory footprint (#216) Motivation: WaitFreeExecutionSerializer is created for every actor instance, AtomicBoolean and AtomicInteger can be substituted with primitive types to save memory. Modifications: Replaced AtomicBoolean and AtomicInteger with volatile int and static AtomicIntegerFieldUpdaters. Result: Fixes #214. --- .../WaitFreeExecutionSerializer.java | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/actors/runtime/src/main/java/cloud/orbit/actors/concurrent/WaitFreeExecutionSerializer.java b/actors/runtime/src/main/java/cloud/orbit/actors/concurrent/WaitFreeExecutionSerializer.java index f52efa0e0..d29cd17c0 100644 --- a/actors/runtime/src/main/java/cloud/orbit/actors/concurrent/WaitFreeExecutionSerializer.java +++ b/actors/runtime/src/main/java/cloud/orbit/actors/concurrent/WaitFreeExecutionSerializer.java @@ -28,17 +28,16 @@ package cloud.orbit.actors.concurrent; -import cloud.orbit.actors.runtime.InternalUtils; -import cloud.orbit.concurrent.Task; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import cloud.orbit.actors.runtime.InternalUtils; +import cloud.orbit.concurrent.Task; + import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Supplier; /** @@ -51,10 +50,16 @@ public class WaitFreeExecutionSerializer implements ExecutionSerializer, Executo private static final Logger logger = LoggerFactory.getLogger(WaitFreeExecutionSerializer.class); private static final boolean DEBUG_ENABLED = logger.isDebugEnabled(); + private static final AtomicIntegerFieldUpdater lockUpdater = AtomicIntegerFieldUpdater.newUpdater(WaitFreeExecutionSerializer.class, "lock"); + private static final AtomicIntegerFieldUpdater sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(WaitFreeExecutionSerializer.class, "size"); + private final ExecutorService executorService; private final ConcurrentLinkedQueue>> queue = new ConcurrentLinkedQueue<>(); - private final AtomicBoolean lock = new AtomicBoolean(); - private final AtomicInteger size = new AtomicInteger(); + + @SuppressWarnings("FieldCanBeLocal") + private volatile int lock = 0; + @SuppressWarnings("FieldCanBeLocal") + private volatile int size = 0; private final Object key; public WaitFreeExecutionSerializer(final ExecutorService executorService) @@ -73,7 +78,7 @@ public Task executeSerialized(Supplier> taskSupplier, int maxQueu { final Task completion = new Task<>(); - int queueSize = size.get(); + int queueSize = size; if (DEBUG_ENABLED && queueSize >= maxQueueSize / 10) { logger.debug("Queued " + queueSize + " / " + maxQueueSize + " for " + key); @@ -89,7 +94,7 @@ public Task executeSerialized(Supplier> taskSupplier, int maxQueu } // managing the size like this to avoid using ConcurrentLinkedQueue.size() - size.incrementAndGet(); + sizeUpdater.incrementAndGet(this); tryExecute(false); @@ -99,7 +104,7 @@ public Task executeSerialized(Supplier> taskSupplier, int maxQueu @Override public boolean isBusy() { - return lock.get() || !queue.isEmpty(); + return lock == 1 || !queue.isEmpty(); } /** @@ -119,7 +124,7 @@ private void tryExecute(boolean local) final Supplier> toRun = queue.poll(); if (toRun != null) { - size.decrementAndGet(); + sizeUpdater.decrementAndGet(this); try { Task taskFuture; @@ -209,7 +214,7 @@ private void wrapExecution(final Supplier> toRun, final Task taskFutu private void unlock() { - if (!lock.compareAndSet(true, false)) + if (!lockUpdater.compareAndSet(this, 1, 0)) { logger.error("Unlocking without having the lock"); } @@ -217,7 +222,7 @@ private void unlock() private boolean lock() { - return lock.compareAndSet(false, true); + return lockUpdater.compareAndSet(this, 0, 1); } private void whenCompleteAsync(T result, Throwable error)