Skip to content

Commit

Permalink
MapWriter (write-behind) support for RMap added. #927
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jun 22, 2017
1 parent eae170f commit c3ef66b
Show file tree
Hide file tree
Showing 15 changed files with 502 additions and 256 deletions.
97 changes: 97 additions & 0 deletions redisson/src/main/java/org/redisson/MapWriteBehindListener.java
@@ -0,0 +1,97 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.redisson.command.CommandAsyncExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;

/**
*
* @author Nikita Koksharov
*
* @param <R> return type
*/
public class MapWriteBehindListener<R> implements FutureListener<R> {

private static final Logger log = LoggerFactory.getLogger(MapWriteBehindListener.class);

private static final AtomicBoolean sent = new AtomicBoolean();
private static final Queue<Runnable> operations = new ConcurrentLinkedQueue<Runnable>();

private final MapWriterTask<R> task;
private final CommandAsyncExecutor commandExecutor;

public MapWriteBehindListener(CommandAsyncExecutor commandExecutor, MapWriterTask<R> task) {
super();
this.commandExecutor = commandExecutor;
this.task = task;
}

@Override
public void operationComplete(Future<R> future) throws Exception {
if (future.isSuccess() && task.condition(future)) {
enqueueRunnable(new Runnable() {
@Override
public void run() {
try {
task.execute();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
});
}
}

private void enqueueRunnable(Runnable runnable) {
if (runnable != null) {
operations.add(runnable);
}

if (sent.compareAndSet(false, true)) {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
while (true) {
Runnable runnable = operations.poll();
if (runnable != null) {
runnable.run();
} else {
break;
}
}
} finally {
sent.set(false);
if (!operations.isEmpty()) {
enqueueRunnable(null);
}
}
}
});
}
}


}
63 changes: 0 additions & 63 deletions redisson/src/main/java/org/redisson/MapWriterExecutorPromise.java

This file was deleted.

32 changes: 21 additions & 11 deletions redisson/src/main/java/org/redisson/MapWriterPromise.java
Expand Up @@ -15,8 +15,6 @@
*/
package org.redisson;

import java.util.concurrent.ExecutorService;

import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
Expand All @@ -30,23 +28,35 @@
*
* @param <R> result type
*/
public abstract class MapWriterPromise<R> extends RedissonPromise<R> {
public class MapWriterPromise<R> extends RedissonPromise<R> {

public MapWriterPromise(RFuture<R> f, final CommandAsyncExecutor commandExecutor) {
super();
public MapWriterPromise(RFuture<R> f, final CommandAsyncExecutor commandExecutor, final MapWriterTask<R> task) {
f.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
public void operationComplete(final Future<R> future) throws Exception {
if (!future.isSuccess()) {
tryFailure(future.cause());
return;
}

execute(future, commandExecutor.getConnectionManager().getExecutor());

if (task.condition(future)) {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
task.execute();
} catch (Exception e) {
tryFailure(e);
return;
}
trySuccess(future.getNow());
}
});
} else {
trySuccess(future.getNow());
}
}
});
}

public abstract void execute(Future<R> future, ExecutorService executorService);


}
34 changes: 34 additions & 0 deletions redisson/src/main/java/org/redisson/MapWriterTask.java
@@ -0,0 +1,34 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;

import io.netty.util.concurrent.Future;

/**
*
* @author Nikita Koksharov
*
* @param <R> return type
*/
public abstract class MapWriterTask<R> {

protected boolean condition(Future<R> future) {
return true;
}

protected abstract void execute();

}
25 changes: 13 additions & 12 deletions redisson/src/main/java/org/redisson/Redisson.java
Expand Up @@ -21,6 +21,7 @@

import org.redisson.api.ClusterNodesGroup;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.MapOptions;
import org.redisson.api.Node;
import org.redisson.api.NodesGroup;
import org.redisson.api.RAtomicDouble;
Expand Down Expand Up @@ -265,12 +266,12 @@ public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, Codec codec,

@Override
public <K, V> RMap<K, V> getMap(String name) {
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name, this, null, null);
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name, this, null);
}

@Override
public <K, V> RMap<K, V> getMap(String name, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name, this, mapLoader, mapWriter);
public <K, V> RMap<K, V> getMap(String name, MapOptions<K, V> options) {
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name, this, options);
}

@Override
Expand Down Expand Up @@ -315,32 +316,32 @@ public <V> RSetCache<V> getSetCache(String name, Codec codec) {

@Override
public <K, V> RMapCache<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this, null, null);
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this, null);
}

@Override
public <K, V> RMapCache<K, V> getMapCache(String name, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this, mapLoader, mapWriter);
public <K, V> RMapCache<K, V> getMapCache(String name, MapOptions<K, V> options) {
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this, options);
}

@Override
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, null, null);
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, null);
}

@Override
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, mapLoader, mapWriter);
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec, MapOptions<K, V> options) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, options);
}

@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name, this, null, null);
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name, this, null);
}

@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name, this, mapLoader, mapWriter);
public <K, V> RMap<K, V> getMap(String name, Codec codec, MapOptions<K, V> options) {
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name, this, options);
}

@Override
Expand Down
8 changes: 4 additions & 4 deletions redisson/src/main/java/org/redisson/RedissonBatch.java
Expand Up @@ -102,12 +102,12 @@ public <V> RListAsync<V> getList(String name, Codec codec) {

@Override
public <K, V> RMapAsync<K, V> getMap(String name) {
return new RedissonMap<K, V>(executorService, name, null, null, null);
return new RedissonMap<K, V>(executorService, name, null, null);
}

@Override
public <K, V> RMapAsync<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(codec, executorService, name, null, null, null);
return new RedissonMap<K, V>(codec, executorService, name, null, null);
}

@Override
Expand Down Expand Up @@ -202,12 +202,12 @@ public RBitSetAsync getBitSet(String name) {

@Override
public <K, V> RMapCacheAsync<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, executorService, name, null, null, null);
return new RedissonMapCache<K, V>(codec, evictionScheduler, executorService, name, null, null);
}

@Override
public <K, V> RMapCacheAsync<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(evictionScheduler, executorService, name, null, null, null);
return new RedissonMapCache<K, V>(evictionScheduler, executorService, name, null, null);
}

@Override
Expand Down
Expand Up @@ -206,12 +206,12 @@ public String toString() {
private volatile long lastInvalidate;

protected RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions<K, V> options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(commandExecutor, name, redisson, options.getMapLoader(), options.getMapWriter());
super(commandExecutor, name, redisson, options);
init(name, options, redisson, evictionScheduler);
}

protected RedissonLocalCachedMap(Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions<K, V> options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(codec, connectionManager, name, redisson, options.getMapLoader(), options.getMapWriter());
super(codec, connectionManager, name, redisson, options);
init(name, options, redisson, evictionScheduler);
}

Expand Down

0 comments on commit c3ef66b

Please sign in to comment.