Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Apr 7, 2017
1 parent 5d4d66c commit 242c738
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 124 deletions.
Expand Up @@ -85,7 +85,8 @@
public interface RMapReduce<KIn, VIn, KOut, VOut> extends RMapReduceExecutor<VIn, KOut, VOut> {

/**
* Defines timeout for MapReduce process
* Defines timeout for MapReduce process.
* <code>0</code> means infinity timeout.
*
* @param timeout
* @param unit
Expand Down
17 changes: 13 additions & 4 deletions redisson/src/main/java/org/redisson/mapreduce/BaseMapperTask.java
Expand Up @@ -16,6 +16,8 @@
package org.redisson.mapreduce;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
Expand All @@ -35,7 +37,7 @@ public abstract class BaseMapperTask<KOut, VOut> implements Runnable, Serializab
protected RedissonClient redisson;

protected Class<?> objectClass;
protected String objectName;
protected List<String> objectNames = new ArrayList<String>();
protected Class<?> objectCodecClass;

protected int workersAmount;
Expand All @@ -45,13 +47,20 @@ public abstract class BaseMapperTask<KOut, VOut> implements Runnable, Serializab
public BaseMapperTask() {
}

public BaseMapperTask(Class<?> objectClass, String objectName, Class<?> objectCodecClass) {
public BaseMapperTask(Class<?> objectClass, Class<?> objectCodecClass) {
super();
this.objectClass = objectClass;
this.objectName = objectName;
this.objectCodecClass = objectCodecClass;
}

public void addObjectName(String objectName) {
this.objectNames.add(objectName);
}

public void clearObjectNames() {
this.objectNames.clear();
}

public void setTimeout(long timeout) {
this.timeout = timeout;
}
Expand All @@ -63,5 +72,5 @@ public void setWorkersAmount(int workersAmount) {
public void setCollectorMapName(String collatorMapName) {
this.collectorMapName = collatorMapName;
}

}
Expand Up @@ -43,8 +43,8 @@ public class CollectionMapperTask<VIn, KOut, VOut> extends BaseMapperTask<KOut,
public CollectionMapperTask() {
}

public CollectionMapperTask(RCollectionMapper<VIn, KOut, VOut> mapper, Class<?> objectClass, String objectName, Class<?> objectCodecClass) {
super(objectClass, objectName, objectCodecClass);
public CollectionMapperTask(RCollectionMapper<VIn, KOut, VOut> mapper, Class<?> objectClass, Class<?> objectCodecClass) {
super(objectClass, objectCodecClass);
this.mapper = mapper;
}

Expand All @@ -59,31 +59,33 @@ public void run() {

Injector.inject(mapper, redisson);

Iterable<VIn> collection = null;
if (RSetCache.class.isAssignableFrom(objectClass)) {
collection = redisson.getSetCache(objectName, codec);
} else if (RSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getSet(objectName, codec);
} else if (RSortedSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getSortedSet(objectName, codec);
} else if (RScoredSortedSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getScoredSortedSet(objectName, codec);
} else if (RLexSortedSet.class.isAssignableFrom(objectClass)) {
collection = (Iterable<VIn>) redisson.getLexSortedSet(objectName);
} else if (RList.class.isAssignableFrom(objectClass)) {
collection = redisson.getList(objectName, codec);
} else {
throw new IllegalStateException("Unable to work with " + objectClass);
}

RCollector<KOut, VOut> collector = new Collector<KOut, VOut>(codec, redisson, collectorMapName, workersAmount, timeout);

for (VIn value : collection) {
if (Thread.currentThread().isInterrupted()) {
return;
for (String objectName : objectNames) {
Iterable<VIn> collection = null;
if (RSetCache.class.isAssignableFrom(objectClass)) {
collection = redisson.getSetCache(objectName, codec);
} else if (RSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getSet(objectName, codec);
} else if (RSortedSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getSortedSet(objectName, codec);
} else if (RScoredSortedSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getScoredSortedSet(objectName, codec);
} else if (RLexSortedSet.class.isAssignableFrom(objectClass)) {
collection = (Iterable<VIn>) redisson.getLexSortedSet(objectName);
} else if (RList.class.isAssignableFrom(objectClass)) {
collection = redisson.getList(objectName, codec);
} else {
throw new IllegalStateException("Unable to work with " + objectClass);
}

RCollector<KOut, VOut> collector = new Collector<KOut, VOut>(codec, redisson, collectorMapName, workersAmount, timeout);

for (VIn value : collection) {
if (Thread.currentThread().isInterrupted()) {
return;
}

mapper.map(value, collector);
}

mapper.map(value, collector);
}
}

Expand Down
91 changes: 16 additions & 75 deletions redisson/src/main/java/org/redisson/mapreduce/CoordinatorTask.java
Expand Up @@ -16,11 +16,8 @@
package org.redisson.mapreduce;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -34,9 +31,6 @@
import org.redisson.api.mapreduce.RReducer;
import org.redisson.client.codec.Codec;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;

/**
*
* @author Nikita Koksharov
Expand All @@ -46,25 +40,6 @@
*/
public class CoordinatorTask<KOut, VOut> implements Callable<Object>, Serializable {

public static class LatchListener implements FutureListener<Object> {

private CountDownLatch latch;

public LatchListener() {
}

public LatchListener(CountDownLatch latch) {
super();
this.latch = latch;
}

@Override
public void operationComplete(Future<Object> future) throws Exception {
latch.countDown();
}

}

private static final long serialVersionUID = 7559371478909848610L;

@RInject
Expand Down Expand Up @@ -124,59 +99,31 @@ public Object call() throws Exception {
if (timeout > 0) {
mapperTask.setTimeout(timeout - timeSpent);
}
RFuture<?> mapperFuture = executor.submit(mapperTask);
if (timeout > 0 && !mapperFuture.await(timeout - timeSpent)) {
mapperFuture.cancel(true);
throw new MapReduceTimeoutException();
}
if (timeout == 0) {
try {
mapperFuture.await();
} catch (InterruptedException e) {
return null;
}
}

List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
final CountDownLatch latch = new CountDownLatch(workersAmount);
for (int i = 0; i < workersAmount; i++) {
String name = collectorMapName + ":" + i;
Runnable runnable = new ReducerTask<KOut, VOut>(name, reducer, objectCodecClass, resultMapName, timeout - timeSpent);
RFuture<?> future = executor.submitAsync(runnable);
future.addListener(new LatchListener(latch));
futures.add(future);
}

if (Thread.currentThread().isInterrupted()) {
cancelReduce(futures);
return null;
}

timeSpent = System.currentTimeMillis() - startTime;
if (isTimeoutExpired(timeSpent)) {
cancelReduce(futures);
throw new MapReduceTimeoutException();
}
mapperTask.addObjectName(objectName);
RFuture<?> mapperFuture = executor.submitAsync(mapperTask);
try {
if (timeout > 0 && !latch.await(timeout - timeSpent, TimeUnit.MILLISECONDS)) {
cancelReduce(futures);
if (timeout > 0 && !mapperFuture.await(timeout - timeSpent)) {
mapperFuture.cancel(true);
throw new MapReduceTimeoutException();
}
if (timeout == 0) {
try {
latch.await();
} catch (InterruptedException e) {
return null;
}
mapperFuture.await();
}
} catch (InterruptedException e) {
cancelReduce(futures);
mapperFuture.cancel(true);
return null;
}
for (RFuture<?> rFuture : futures) {
if (!rFuture.isSuccess()) {
throw (Exception) rFuture.cause();
}

SubTasksExecutor reduceExecutor = new SubTasksExecutor(executor, workersAmount, startTime, timeout);
for (int i = 0; i < workersAmount; i++) {
String name = collectorMapName + ":" + i;
Runnable runnable = new ReducerTask<KOut, VOut>(name, reducer, objectCodecClass, resultMapName, timeout - timeSpent);
reduceExecutor.submit(runnable);
}

if (!reduceExecutor.await()) {
return null;
}

return executeCollator();
Expand Down Expand Up @@ -215,10 +162,4 @@ private boolean isTimeoutExpired(long timeSpent) {
return timeSpent > timeout && timeout > 0;
}

private void cancelReduce(List<RFuture<?>> futures) {
for (RFuture<?> future : futures) {
future.cancel(true);
}
}

}
31 changes: 16 additions & 15 deletions redisson/src/main/java/org/redisson/mapreduce/MapperTask.java
Expand Up @@ -42,8 +42,8 @@ public class MapperTask<KIn, VIn, KOut, VOut> extends BaseMapperTask<KOut, VOut>
public MapperTask() {
}

public MapperTask(RMapper<KIn, VIn, KOut, VOut> mapper, Class<?> objectClass, String objectName, Class<?> objectCodecClass) {
super(objectClass, objectName, objectCodecClass);
public MapperTask(RMapper<KIn, VIn, KOut, VOut> mapper, Class<?> objectClass, Class<?> objectCodecClass) {
super(objectClass, objectCodecClass);
this.mapper = mapper;
}

Expand All @@ -57,22 +57,23 @@ public void run() {
}

Injector.inject(mapper, redisson);

RMap<KIn, VIn> map = null;
if (RMapCache.class.isAssignableFrom(objectClass)) {
map = redisson.getMapCache(objectName, codec);
} else {
map = redisson.getMap(objectName, codec);
}

RCollector<KOut, VOut> collector = new Collector<KOut, VOut>(codec, redisson, collectorMapName, workersAmount, timeout);

for (Entry<KIn, VIn> entry : map.entrySet()) {
if (Thread.currentThread().isInterrupted()) {
return;

for (String objectName : objectNames) {
RMap<KIn, VIn> map = null;
if (RMapCache.class.isAssignableFrom(objectClass)) {
map = redisson.getMapCache(objectName, codec);
} else {
map = redisson.getMap(objectName, codec);
}

mapper.map(entry.getKey(), entry.getValue(), collector);
for (Entry<KIn, VIn> entry : map.entrySet()) {
if (Thread.currentThread().isInterrupted()) {
return;
}

mapper.map(entry.getKey(), entry.getValue(), collector);
}
}
}

Expand Down
Expand Up @@ -63,7 +63,7 @@ public RCollectionMapReduce<VIn, KOut, VOut> reducer(RReducer<KOut, VOut> reduce

@Override
protected Callable<Object> createTask(String resultMapName, RCollator<KOut, VOut, Object> collator) {
CollectionMapperTask<VIn, KOut, VOut> mapperTask = new CollectionMapperTask<VIn, KOut, VOut>(mapper, objectClass, objectName, objectCodec.getClass());
CollectionMapperTask<VIn, KOut, VOut> mapperTask = new CollectionMapperTask<VIn, KOut, VOut>(mapper, objectClass, objectCodec.getClass());
return new CoordinatorTask<KOut, VOut>(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator, timeout, System.currentTimeMillis());
}

Expand Down
Expand Up @@ -21,7 +21,6 @@
import org.redisson.api.RObject;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollator;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.api.mapreduce.RMapper;
import org.redisson.api.mapreduce.RReducer;
Expand Down Expand Up @@ -65,7 +64,7 @@ public RMapReduce<KIn, VIn, KOut, VOut> reducer(RReducer<KOut, VOut> reducer) {

@Override
protected Callable<Object> createTask(String resultMapName, RCollator<KOut, VOut, Object> collator) {
MapperTask<KIn, VIn, KOut, VOut> mapperTask = new MapperTask<KIn, VIn, KOut, VOut>(mapper, objectClass, objectName, objectCodec.getClass());
MapperTask<KIn, VIn, KOut, VOut> mapperTask = new MapperTask<KIn, VIn, KOut, VOut>(mapper, objectClass, objectCodec.getClass());
return new CoordinatorTask<KOut, VOut>(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator, timeout, System.currentTimeMillis());
}

Expand Down

0 comments on commit 242c738

Please sign in to comment.