From 4c80f29b2a6b93be05d152d45d757b220d5c4950 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 23 Jun 2017 16:16:31 +0300 Subject: [PATCH] refactoring write-behind persist mode #927 --- .../org/redisson/MapWriteBehindListener.java | 25 +++++++----- .../main/java/org/redisson/RedissonMap.java | 18 ++++++++- .../redisson/api/LocalCachedMapOptions.java | 18 ++++++--- .../java/org/redisson/api/MapOptions.java | 40 ++++++++++++++++--- .../redisson/RedissonLocalCachedMapTest.java | 3 +- .../org/redisson/RedissonMapCacheTest.java | 3 +- .../java/org/redisson/RedissonMapTest.java | 3 +- 7 files changed, 80 insertions(+), 30 deletions(-) diff --git a/redisson/src/main/java/org/redisson/MapWriteBehindListener.java b/redisson/src/main/java/org/redisson/MapWriteBehindListener.java index 1ffbe2af9be..84269f62484 100644 --- a/redisson/src/main/java/org/redisson/MapWriteBehindListener.java +++ b/redisson/src/main/java/org/redisson/MapWriteBehindListener.java @@ -16,8 +16,7 @@ package org.redisson; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.redisson.command.CommandAsyncExecutor; import org.slf4j.Logger; @@ -36,16 +35,19 @@ public class MapWriteBehindListener implements FutureListener { private static final Logger log = LoggerFactory.getLogger(MapWriteBehindListener.class); - private static final AtomicBoolean sent = new AtomicBoolean(); - private static final Queue operations = new ConcurrentLinkedQueue(); - + private final AtomicInteger writeBehindCurrentThreads; + private final Queue writeBehindTasks; + private final int threadsAmount; private final MapWriterTask task; private final CommandAsyncExecutor commandExecutor; - public MapWriteBehindListener(CommandAsyncExecutor commandExecutor, MapWriterTask task) { + public MapWriteBehindListener(CommandAsyncExecutor commandExecutor, MapWriterTask task, AtomicInteger writeBehindCurrentThreads, Queue writeBehindTasks, int threadsAmount) { super(); + this.threadsAmount = threadsAmount; this.commandExecutor = commandExecutor; this.task = task; + this.writeBehindCurrentThreads = writeBehindCurrentThreads; + this.writeBehindTasks = writeBehindTasks; } @Override @@ -66,16 +68,16 @@ public void run() { private void enqueueRunnable(Runnable runnable) { if (runnable != null) { - operations.add(runnable); + writeBehindTasks.add(runnable); } - if (sent.compareAndSet(false, true)) { + if (writeBehindCurrentThreads.incrementAndGet() <= threadsAmount) { commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { @Override public void run() { try { while (true) { - Runnable runnable = operations.poll(); + Runnable runnable = writeBehindTasks.poll(); if (runnable != null) { runnable.run(); } else { @@ -83,13 +85,14 @@ public void run() { } } } finally { - sent.set(false); - if (!operations.isEmpty()) { + if (writeBehindCurrentThreads.decrementAndGet() == 0 && !writeBehindTasks.isEmpty()) { enqueueRunnable(null); } } } }); + } else { + writeBehindCurrentThreads.decrementAndGet(); } } diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 0b628e7f417..b8f25d66bf2 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -28,7 +28,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.MapOptions; @@ -71,6 +73,8 @@ */ public class RedissonMap extends RedissonExpirable implements RMap { + final AtomicInteger writeBehindCurrentThreads = new AtomicInteger(); + final Queue writeBehindTasks; final RedissonClient redisson; final MapOptions options; @@ -78,12 +82,22 @@ protected RedissonMap(CommandAsyncExecutor commandExecutor, String name, Redisso super(commandExecutor, name); this.redisson = redisson; this.options = options; + if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) { + writeBehindTasks = new ConcurrentLinkedQueue(); + } else { + writeBehindTasks = null; + } } public RedissonMap(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapOptions options) { super(codec, commandExecutor, name); this.redisson = redisson; this.options = options; + if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) { + writeBehindTasks = new ConcurrentLinkedQueue(); + } else { + writeBehindTasks = null; + } } @Override @@ -274,7 +288,7 @@ public void execute() { protected RFuture mapWriterFuture(RFuture future, MapWriterTask listener) { if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) { - future.addListener(new MapWriteBehindListener(commandExecutor, listener)); + future.addListener(new MapWriteBehindListener(commandExecutor, listener, writeBehindCurrentThreads, writeBehindTasks, options.getWriteBehindThreads())); return future; } @@ -876,7 +890,7 @@ public void execute() { options.getWriter().deleteAll(deletedKeys); } }; - future.addListener(new MapWriteBehindListener>(commandExecutor, listener)); + future.addListener(new MapWriteBehindListener>(commandExecutor, listener, writeBehindCurrentThreads, writeBehindTasks, options.getWriteBehindThreads())); } else { commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { @Override diff --git a/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java b/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java index fb5b25560ab..ac09f2acba0 100644 --- a/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java +++ b/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java @@ -266,15 +266,23 @@ public LocalCachedMapOptions maxIdle(long maxIdle, TimeUnit timeUnit) { } @Override - public LocalCachedMapOptions writer(MapWriter writer, org.redisson.api.MapOptions.WriteMode writeMode) { - super.writer(writer, writeMode); - return this; + public LocalCachedMapOptions writer(MapWriter writer) { + return (LocalCachedMapOptions) super.writer(writer); + } + + @Override + public LocalCachedMapOptions writeBehindThreads(int writeBehindThreads) { + return (LocalCachedMapOptions) super.writeBehindThreads(writeBehindThreads); + } + + @Override + public LocalCachedMapOptions writeMode(org.redisson.api.MapOptions.WriteMode writeMode) { + return (LocalCachedMapOptions) super.writeMode(writeMode); } @Override public LocalCachedMapOptions loader(MapLoader loader) { - super.loader(loader); - return this; + return (LocalCachedMapOptions) super.loader(loader); } } diff --git a/redisson/src/main/java/org/redisson/api/MapOptions.java b/redisson/src/main/java/org/redisson/api/MapOptions.java index 99defc52267..0f49a3fcbf4 100644 --- a/redisson/src/main/java/org/redisson/api/MapOptions.java +++ b/redisson/src/main/java/org/redisson/api/MapOptions.java @@ -47,7 +47,8 @@ public enum WriteMode { private MapLoader loader; private MapWriter writer; - private WriteMode writeMode; + private WriteMode writeMode = WriteMode.WRITE_THROUGH; + private int writeBehindThreads = 1; protected MapOptions() { } @@ -75,26 +76,53 @@ public static MapOptions defaults() { } /** - * Sets map writer object used for write-through operations. + * Sets {@link MapWriter} object. * * @param writer object - * @param writeMode for writer * @return MapOptions instance */ - public MapOptions writer(MapWriter writer, WriteMode writeMode) { + public MapOptions writer(MapWriter writer) { this.writer = writer; - this.writeMode = writeMode; return this; } public MapWriter getWriter() { return writer; } + + /** + * Sets threads amount used in write behind mode. + *

+ * Default is 1 + * + * @param writeBehindThreads - threads amount + * @return MapOptions instance + */ + public MapOptions writeBehindThreads(int writeBehindThreads) { + this.writeBehindThreads = writeBehindThreads; + return this; + } + public int getWriteBehindThreads() { + return writeBehindThreads; + } + + /** + * Sets write mode. + *

+ * Default is {@link WriteMode#WRITE_THROUGH} + * + * @param writeMode - write mode + * @return MapOptions instance + */ + public MapOptions writeMode(WriteMode writeMode) { + this.writeMode = writeMode; + return this; + } public WriteMode getWriteMode() { return writeMode; } /** - * Sets map reader object used for write-through operations. + * Sets {@link MapLoader} object. * * @param loader object * @return MapOptions instance diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index ec5d2f44376..64651b73732 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -18,7 +18,6 @@ import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.LocalCachedMapOptions.EvictionPolicy; import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy; -import org.redisson.api.MapOptions.WriteMode; import org.redisson.api.RLocalCachedMap; import org.redisson.api.RMap; import org.redisson.cache.Cache; @@ -51,7 +50,7 @@ public void testPerf() { @Override protected RMap getWriterTestMap(String name, Map map) { - LocalCachedMapOptions options = LocalCachedMapOptions.defaults().writer(createMapWriter(map), WriteMode.WRITE_THROUGH); + LocalCachedMapOptions options = LocalCachedMapOptions.defaults().writer(createMapWriter(map)); return redisson.getLocalCachedMap("test", options); } diff --git a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java index 89ac8efd486..173b0c49020 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -19,7 +19,6 @@ import org.junit.Assert; import org.junit.Test; import org.redisson.api.MapOptions; -import org.redisson.api.MapOptions.WriteMode; import org.redisson.api.RFuture; import org.redisson.api.RMap; import org.redisson.api.RMapCache; @@ -141,7 +140,7 @@ public boolean equals(Object obj) { @Override protected RMap getWriterTestMap(String name, Map map) { - MapOptions options = MapOptions.defaults().writer(createMapWriter(map), WriteMode.WRITE_THROUGH); + MapOptions options = MapOptions.defaults().writer(createMapWriter(map)); return redisson.getMapCache("test", options); } diff --git a/redisson/src/test/java/org/redisson/RedissonMapTest.java b/redisson/src/test/java/org/redisson/RedissonMapTest.java index cfb64ac33c3..723bc2df6ad 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapTest.java @@ -19,7 +19,6 @@ import org.junit.Assume; import org.junit.Test; import org.redisson.api.MapOptions; -import org.redisson.api.MapOptions.WriteMode; import org.redisson.api.RFuture; import org.redisson.api.RMap; import org.redisson.api.RedissonClient; @@ -139,7 +138,7 @@ protected RMap getLoaderTestMap(String name, Map map) { @Override protected RMap getWriterTestMap(String name, Map map) { - MapOptions options = MapOptions.defaults().writer(createMapWriter(map), WriteMode.WRITE_THROUGH); + MapOptions options = MapOptions.defaults().writer(createMapWriter(map)); return redisson.getMap("test", options); }