Skip to content

Commit

Permalink
refactoring write-behind persist mode #927
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jun 23, 2017
1 parent 01cefc5 commit 4c80f29
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 30 deletions.
25 changes: 14 additions & 11 deletions redisson/src/main/java/org/redisson/MapWriteBehindListener.java
Expand Up @@ -16,8 +16,7 @@
package org.redisson; package org.redisson;


import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;


import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand All @@ -36,16 +35,19 @@ public class MapWriteBehindListener<R> implements FutureListener<R> {


private static final Logger log = LoggerFactory.getLogger(MapWriteBehindListener.class); private static final Logger log = LoggerFactory.getLogger(MapWriteBehindListener.class);


private static final AtomicBoolean sent = new AtomicBoolean(); private final AtomicInteger writeBehindCurrentThreads;
private static final Queue<Runnable> operations = new ConcurrentLinkedQueue<Runnable>(); private final Queue<Runnable> writeBehindTasks;

private final int threadsAmount;
private final MapWriterTask<R> task; private final MapWriterTask<R> task;
private final CommandAsyncExecutor commandExecutor; private final CommandAsyncExecutor commandExecutor;


public MapWriteBehindListener(CommandAsyncExecutor commandExecutor, MapWriterTask<R> task) { public MapWriteBehindListener(CommandAsyncExecutor commandExecutor, MapWriterTask<R> task, AtomicInteger writeBehindCurrentThreads, Queue<Runnable> writeBehindTasks, int threadsAmount) {
super(); super();
this.threadsAmount = threadsAmount;
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
this.task = task; this.task = task;
this.writeBehindCurrentThreads = writeBehindCurrentThreads;
this.writeBehindTasks = writeBehindTasks;
} }


@Override @Override
Expand All @@ -66,30 +68,31 @@ public void run() {


private void enqueueRunnable(Runnable runnable) { private void enqueueRunnable(Runnable runnable) {
if (runnable != null) { if (runnable != null) {
operations.add(runnable); writeBehindTasks.add(runnable);
} }


if (sent.compareAndSet(false, true)) { if (writeBehindCurrentThreads.incrementAndGet() <= threadsAmount) {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
while (true) { while (true) {
Runnable runnable = operations.poll(); Runnable runnable = writeBehindTasks.poll();
if (runnable != null) { if (runnable != null) {
runnable.run(); runnable.run();
} else { } else {
break; break;
} }
} }
} finally { } finally {
sent.set(false); if (writeBehindCurrentThreads.decrementAndGet() == 0 && !writeBehindTasks.isEmpty()) {
if (!operations.isEmpty()) {
enqueueRunnable(null); enqueueRunnable(null);
} }
} }
} }
}); });
} else {
writeBehindCurrentThreads.decrementAndGet();
} }
} }


Expand Down
18 changes: 16 additions & 2 deletions redisson/src/main/java/org/redisson/RedissonMap.java
Expand Up @@ -28,7 +28,9 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import org.redisson.api.MapOptions; import org.redisson.api.MapOptions;
Expand Down Expand Up @@ -71,19 +73,31 @@
*/ */
public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> { public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {


final AtomicInteger writeBehindCurrentThreads = new AtomicInteger();
final Queue<Runnable> writeBehindTasks;
final RedissonClient redisson; final RedissonClient redisson;
final MapOptions<K, V> options; final MapOptions<K, V> options;


protected RedissonMap(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapOptions<K, V> options) { protected RedissonMap(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapOptions<K, V> options) {
super(commandExecutor, name); super(commandExecutor, name);
this.redisson = redisson; this.redisson = redisson;
this.options = options; this.options = options;
if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) {
writeBehindTasks = new ConcurrentLinkedQueue<Runnable>();
} else {
writeBehindTasks = null;
}
} }


public RedissonMap(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapOptions<K, V> options) { public RedissonMap(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapOptions<K, V> options) {
super(codec, commandExecutor, name); super(codec, commandExecutor, name);
this.redisson = redisson; this.redisson = redisson;
this.options = options; this.options = options;
if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) {
writeBehindTasks = new ConcurrentLinkedQueue<Runnable>();
} else {
writeBehindTasks = null;
}
} }


@Override @Override
Expand Down Expand Up @@ -274,7 +288,7 @@ public void execute() {


protected <M> RFuture<M> mapWriterFuture(RFuture<M> future, MapWriterTask<M> listener) { protected <M> RFuture<M> mapWriterFuture(RFuture<M> future, MapWriterTask<M> listener) {
if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) { if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) {
future.addListener(new MapWriteBehindListener<M>(commandExecutor, listener)); future.addListener(new MapWriteBehindListener<M>(commandExecutor, listener, writeBehindCurrentThreads, writeBehindTasks, options.getWriteBehindThreads()));
return future; return future;
} }


Expand Down Expand Up @@ -876,7 +890,7 @@ public void execute() {
options.getWriter().deleteAll(deletedKeys); options.getWriter().deleteAll(deletedKeys);
} }
}; };
future.addListener(new MapWriteBehindListener<List<Long>>(commandExecutor, listener)); future.addListener(new MapWriteBehindListener<List<Long>>(commandExecutor, listener, writeBehindCurrentThreads, writeBehindTasks, options.getWriteBehindThreads()));
} else { } else {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override @Override
Expand Down
18 changes: 13 additions & 5 deletions redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java
Expand Up @@ -266,15 +266,23 @@ public LocalCachedMapOptions<K, V> maxIdle(long maxIdle, TimeUnit timeUnit) {
} }


@Override @Override
public LocalCachedMapOptions<K, V> writer(MapWriter<K, V> writer, org.redisson.api.MapOptions.WriteMode writeMode) { public LocalCachedMapOptions<K, V> writer(MapWriter<K, V> writer) {
super.writer(writer, writeMode); return (LocalCachedMapOptions<K, V>) super.writer(writer);
return this; }

@Override
public LocalCachedMapOptions<K, V> writeBehindThreads(int writeBehindThreads) {
return (LocalCachedMapOptions<K, V>) super.writeBehindThreads(writeBehindThreads);
}

@Override
public LocalCachedMapOptions<K, V> writeMode(org.redisson.api.MapOptions.WriteMode writeMode) {
return (LocalCachedMapOptions<K, V>) super.writeMode(writeMode);
} }


@Override @Override
public LocalCachedMapOptions<K, V> loader(MapLoader<K, V> loader) { public LocalCachedMapOptions<K, V> loader(MapLoader<K, V> loader) {
super.loader(loader); return (LocalCachedMapOptions<K, V>) super.loader(loader);
return this;
} }


} }
40 changes: 34 additions & 6 deletions redisson/src/main/java/org/redisson/api/MapOptions.java
Expand Up @@ -47,7 +47,8 @@ public enum WriteMode {


private MapLoader<K, V> loader; private MapLoader<K, V> loader;
private MapWriter<K, V> writer; private MapWriter<K, V> writer;
private WriteMode writeMode; private WriteMode writeMode = WriteMode.WRITE_THROUGH;
private int writeBehindThreads = 1;


protected MapOptions() { protected MapOptions() {
} }
Expand Down Expand Up @@ -75,26 +76,53 @@ public static <K, V> MapOptions<K, V> defaults() {
} }


/** /**
* Sets map writer object used for write-through operations. * Sets {@link MapWriter} object.
* *
* @param writer object * @param writer object
* @param writeMode for writer
* @return MapOptions instance * @return MapOptions instance
*/ */
public MapOptions<K, V> writer(MapWriter<K, V> writer, WriteMode writeMode) { public MapOptions<K, V> writer(MapWriter<K, V> writer) {
this.writer = writer; this.writer = writer;
this.writeMode = writeMode;
return this; return this;
} }
public MapWriter<K, V> getWriter() { public MapWriter<K, V> getWriter() {
return writer; return writer;
} }

/**
* Sets threads amount used in write behind mode.
* <p>
* Default is <code>1</code>
*
* @param writeBehindThreads - threads amount
* @return MapOptions instance
*/
public MapOptions<K, V> writeBehindThreads(int writeBehindThreads) {
this.writeBehindThreads = writeBehindThreads;
return this;
}
public int getWriteBehindThreads() {
return writeBehindThreads;
}

/**
* Sets write mode.
* <p>
* Default is <code>{@link WriteMode#WRITE_THROUGH}</code>
*
* @param writeMode - write mode
* @return MapOptions instance
*/
public MapOptions<K, V> writeMode(WriteMode writeMode) {
this.writeMode = writeMode;
return this;
}
public WriteMode getWriteMode() { public WriteMode getWriteMode() {
return writeMode; return writeMode;
} }


/** /**
* Sets map reader object used for write-through operations. * Sets {@link MapLoader} object.
* *
* @param loader object * @param loader object
* @return MapOptions instance * @return MapOptions instance
Expand Down
Expand Up @@ -18,7 +18,6 @@
import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy; import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy; import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.RLocalCachedMap; import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.cache.Cache; import org.redisson.cache.Cache;
Expand Down Expand Up @@ -51,7 +50,7 @@ public void testPerf() {


@Override @Override
protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) { protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) {
LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults().writer(createMapWriter(map), WriteMode.WRITE_THROUGH); LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults().writer(createMapWriter(map));
return redisson.getLocalCachedMap("test", options); return redisson.getLocalCachedMap("test", options);
} }


Expand Down
Expand Up @@ -19,7 +19,6 @@
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.redisson.api.MapOptions; import org.redisson.api.MapOptions;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RMapCache; import org.redisson.api.RMapCache;
Expand Down Expand Up @@ -141,7 +140,7 @@ public boolean equals(Object obj) {


@Override @Override
protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) { protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) {
MapOptions<K, V> options = MapOptions.<K, V>defaults().writer(createMapWriter(map), WriteMode.WRITE_THROUGH); MapOptions<K, V> options = MapOptions.<K, V>defaults().writer(createMapWriter(map));
return redisson.getMapCache("test", options); return redisson.getMapCache("test", options);
} }


Expand Down
3 changes: 1 addition & 2 deletions redisson/src/test/java/org/redisson/RedissonMapTest.java
Expand Up @@ -19,7 +19,6 @@
import org.junit.Assume; import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
import org.redisson.api.MapOptions; import org.redisson.api.MapOptions;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
Expand Down Expand Up @@ -139,7 +138,7 @@ protected <K, V> RMap<K, V> getLoaderTestMap(String name, Map<K, V> map) {


@Override @Override
protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) { protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) {
MapOptions<K, V> options = MapOptions.<K, V>defaults().writer(createMapWriter(map), WriteMode.WRITE_THROUGH); MapOptions<K, V> options = MapOptions.<K, V>defaults().writer(createMapWriter(map));
return redisson.getMap("test", options); return redisson.getMap("test", options);
} }


Expand Down

0 comments on commit 4c80f29

Please sign in to comment.