From 242c73897fef9f1d57982ce5ae14b317e71f0bb5 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 7 Apr 2017 11:02:30 +0300 Subject: [PATCH] refactoring --- .../redisson/api/mapreduce/RMapReduce.java | 3 +- .../redisson/mapreduce/BaseMapperTask.java | 17 ++- .../mapreduce/CollectionMapperTask.java | 54 ++++---- .../redisson/mapreduce/CoordinatorTask.java | 91 +++----------- .../org/redisson/mapreduce/MapperTask.java | 31 ++--- .../RedissonCollectionMapReduce.java | 2 +- .../redisson/mapreduce/RedissonMapReduce.java | 3 +- .../redisson/mapreduce/SubTasksExecutor.java | 115 ++++++++++++++++++ 8 files changed, 192 insertions(+), 124 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/mapreduce/SubTasksExecutor.java diff --git a/redisson/src/main/java/org/redisson/api/mapreduce/RMapReduce.java b/redisson/src/main/java/org/redisson/api/mapreduce/RMapReduce.java index 206e91d358c..4f6ad7b8266 100644 --- a/redisson/src/main/java/org/redisson/api/mapreduce/RMapReduce.java +++ b/redisson/src/main/java/org/redisson/api/mapreduce/RMapReduce.java @@ -85,7 +85,8 @@ public interface RMapReduce extends RMapReduceExecutor { /** - * Defines timeout for MapReduce process + * Defines timeout for MapReduce process. + * 0 means infinity timeout. * * @param timeout * @param unit diff --git a/redisson/src/main/java/org/redisson/mapreduce/BaseMapperTask.java b/redisson/src/main/java/org/redisson/mapreduce/BaseMapperTask.java index 7f04f00204c..a079efe3806 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/BaseMapperTask.java +++ b/redisson/src/main/java/org/redisson/mapreduce/BaseMapperTask.java @@ -16,6 +16,8 @@ package org.redisson.mapreduce; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import org.redisson.api.RedissonClient; import org.redisson.api.annotation.RInject; @@ -35,7 +37,7 @@ public abstract class BaseMapperTask implements Runnable, Serializab protected RedissonClient redisson; protected Class objectClass; - protected String objectName; + protected List objectNames = new ArrayList(); protected Class objectCodecClass; protected int workersAmount; @@ -45,13 +47,20 @@ public abstract class BaseMapperTask implements Runnable, Serializab public BaseMapperTask() { } - public BaseMapperTask(Class objectClass, String objectName, Class objectCodecClass) { + public BaseMapperTask(Class objectClass, Class objectCodecClass) { super(); this.objectClass = objectClass; - this.objectName = objectName; this.objectCodecClass = objectCodecClass; } + public void addObjectName(String objectName) { + this.objectNames.add(objectName); + } + + public void clearObjectNames() { + this.objectNames.clear(); + } + public void setTimeout(long timeout) { this.timeout = timeout; } @@ -63,5 +72,5 @@ public void setWorkersAmount(int workersAmount) { public void setCollectorMapName(String collatorMapName) { this.collectorMapName = collatorMapName; } - + } diff --git a/redisson/src/main/java/org/redisson/mapreduce/CollectionMapperTask.java b/redisson/src/main/java/org/redisson/mapreduce/CollectionMapperTask.java index 6abcf4cdc36..d74fd234c88 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/CollectionMapperTask.java +++ b/redisson/src/main/java/org/redisson/mapreduce/CollectionMapperTask.java @@ -43,8 +43,8 @@ public class CollectionMapperTask extends BaseMapperTask mapper, Class objectClass, String objectName, Class objectCodecClass) { - super(objectClass, objectName, objectCodecClass); + public CollectionMapperTask(RCollectionMapper mapper, Class objectClass, Class objectCodecClass) { + super(objectClass, objectCodecClass); this.mapper = mapper; } @@ -59,31 +59,33 @@ public void run() { Injector.inject(mapper, redisson); - Iterable collection = null; - if (RSetCache.class.isAssignableFrom(objectClass)) { - collection = redisson.getSetCache(objectName, codec); - } else if (RSet.class.isAssignableFrom(objectClass)) { - collection = redisson.getSet(objectName, codec); - } else if (RSortedSet.class.isAssignableFrom(objectClass)) { - collection = redisson.getSortedSet(objectName, codec); - } else if (RScoredSortedSet.class.isAssignableFrom(objectClass)) { - collection = redisson.getScoredSortedSet(objectName, codec); - } else if (RLexSortedSet.class.isAssignableFrom(objectClass)) { - collection = (Iterable) redisson.getLexSortedSet(objectName); - } else if (RList.class.isAssignableFrom(objectClass)) { - collection = redisson.getList(objectName, codec); - } else { - throw new IllegalStateException("Unable to work with " + objectClass); - } - - RCollector collector = new Collector(codec, redisson, collectorMapName, workersAmount, timeout); - - for (VIn value : collection) { - if (Thread.currentThread().isInterrupted()) { - return; + for (String objectName : objectNames) { + Iterable collection = null; + if (RSetCache.class.isAssignableFrom(objectClass)) { + collection = redisson.getSetCache(objectName, codec); + } else if (RSet.class.isAssignableFrom(objectClass)) { + collection = redisson.getSet(objectName, codec); + } else if (RSortedSet.class.isAssignableFrom(objectClass)) { + collection = redisson.getSortedSet(objectName, codec); + } else if (RScoredSortedSet.class.isAssignableFrom(objectClass)) { + collection = redisson.getScoredSortedSet(objectName, codec); + } else if (RLexSortedSet.class.isAssignableFrom(objectClass)) { + collection = (Iterable) redisson.getLexSortedSet(objectName); + } else if (RList.class.isAssignableFrom(objectClass)) { + collection = redisson.getList(objectName, codec); + } else { + throw new IllegalStateException("Unable to work with " + objectClass); + } + + RCollector collector = new Collector(codec, redisson, collectorMapName, workersAmount, timeout); + + for (VIn value : collection) { + if (Thread.currentThread().isInterrupted()) { + return; + } + + mapper.map(value, collector); } - - mapper.map(value, collector); } } diff --git a/redisson/src/main/java/org/redisson/mapreduce/CoordinatorTask.java b/redisson/src/main/java/org/redisson/mapreduce/CoordinatorTask.java index e41358bf00e..5dccb9a1c70 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/CoordinatorTask.java +++ b/redisson/src/main/java/org/redisson/mapreduce/CoordinatorTask.java @@ -16,11 +16,8 @@ package org.redisson.mapreduce; import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -34,9 +31,6 @@ import org.redisson.api.mapreduce.RReducer; import org.redisson.client.codec.Codec; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; - /** * * @author Nikita Koksharov @@ -46,25 +40,6 @@ */ public class CoordinatorTask implements Callable, Serializable { - public static class LatchListener implements FutureListener { - - private CountDownLatch latch; - - public LatchListener() { - } - - public LatchListener(CountDownLatch latch) { - super(); - this.latch = latch; - } - - @Override - public void operationComplete(Future future) throws Exception { - latch.countDown(); - } - - } - private static final long serialVersionUID = 7559371478909848610L; @RInject @@ -124,59 +99,31 @@ public Object call() throws Exception { if (timeout > 0) { mapperTask.setTimeout(timeout - timeSpent); } - RFuture mapperFuture = executor.submit(mapperTask); - if (timeout > 0 && !mapperFuture.await(timeout - timeSpent)) { - mapperFuture.cancel(true); - throw new MapReduceTimeoutException(); - } - if (timeout == 0) { - try { - mapperFuture.await(); - } catch (InterruptedException e) { - return null; - } - } - - List> futures = new ArrayList>(); - final CountDownLatch latch = new CountDownLatch(workersAmount); - for (int i = 0; i < workersAmount; i++) { - String name = collectorMapName + ":" + i; - Runnable runnable = new ReducerTask(name, reducer, objectCodecClass, resultMapName, timeout - timeSpent); - RFuture future = executor.submitAsync(runnable); - future.addListener(new LatchListener(latch)); - futures.add(future); - } - if (Thread.currentThread().isInterrupted()) { - cancelReduce(futures); - return null; - } - - timeSpent = System.currentTimeMillis() - startTime; - if (isTimeoutExpired(timeSpent)) { - cancelReduce(futures); - throw new MapReduceTimeoutException(); - } + mapperTask.addObjectName(objectName); + RFuture mapperFuture = executor.submitAsync(mapperTask); try { - if (timeout > 0 && !latch.await(timeout - timeSpent, TimeUnit.MILLISECONDS)) { - cancelReduce(futures); + if (timeout > 0 && !mapperFuture.await(timeout - timeSpent)) { + mapperFuture.cancel(true); throw new MapReduceTimeoutException(); } if (timeout == 0) { - try { - latch.await(); - } catch (InterruptedException e) { - return null; - } + mapperFuture.await(); } } catch (InterruptedException e) { - cancelReduce(futures); + mapperFuture.cancel(true); return null; } - for (RFuture rFuture : futures) { - if (!rFuture.isSuccess()) { - throw (Exception) rFuture.cause(); - } + + SubTasksExecutor reduceExecutor = new SubTasksExecutor(executor, workersAmount, startTime, timeout); + for (int i = 0; i < workersAmount; i++) { + String name = collectorMapName + ":" + i; + Runnable runnable = new ReducerTask(name, reducer, objectCodecClass, resultMapName, timeout - timeSpent); + reduceExecutor.submit(runnable); + } + + if (!reduceExecutor.await()) { + return null; } return executeCollator(); @@ -215,10 +162,4 @@ private boolean isTimeoutExpired(long timeSpent) { return timeSpent > timeout && timeout > 0; } - private void cancelReduce(List> futures) { - for (RFuture future : futures) { - future.cancel(true); - } - } - } diff --git a/redisson/src/main/java/org/redisson/mapreduce/MapperTask.java b/redisson/src/main/java/org/redisson/mapreduce/MapperTask.java index 1937e8f92b6..7927f90d2d7 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/MapperTask.java +++ b/redisson/src/main/java/org/redisson/mapreduce/MapperTask.java @@ -42,8 +42,8 @@ public class MapperTask extends BaseMapperTask public MapperTask() { } - public MapperTask(RMapper mapper, Class objectClass, String objectName, Class objectCodecClass) { - super(objectClass, objectName, objectCodecClass); + public MapperTask(RMapper mapper, Class objectClass, Class objectCodecClass) { + super(objectClass, objectCodecClass); this.mapper = mapper; } @@ -57,22 +57,23 @@ public void run() { } Injector.inject(mapper, redisson); - - RMap map = null; - if (RMapCache.class.isAssignableFrom(objectClass)) { - map = redisson.getMapCache(objectName, codec); - } else { - map = redisson.getMap(objectName, codec); - } - RCollector collector = new Collector(codec, redisson, collectorMapName, workersAmount, timeout); - - for (Entry entry : map.entrySet()) { - if (Thread.currentThread().isInterrupted()) { - return; + + for (String objectName : objectNames) { + RMap map = null; + if (RMapCache.class.isAssignableFrom(objectClass)) { + map = redisson.getMapCache(objectName, codec); + } else { + map = redisson.getMap(objectName, codec); } - mapper.map(entry.getKey(), entry.getValue(), collector); + for (Entry entry : map.entrySet()) { + if (Thread.currentThread().isInterrupted()) { + return; + } + + mapper.map(entry.getKey(), entry.getValue(), collector); + } } } diff --git a/redisson/src/main/java/org/redisson/mapreduce/RedissonCollectionMapReduce.java b/redisson/src/main/java/org/redisson/mapreduce/RedissonCollectionMapReduce.java index a443fc1ea9d..84ef92790e3 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/RedissonCollectionMapReduce.java +++ b/redisson/src/main/java/org/redisson/mapreduce/RedissonCollectionMapReduce.java @@ -63,7 +63,7 @@ public RCollectionMapReduce reducer(RReducer reduce @Override protected Callable createTask(String resultMapName, RCollator collator) { - CollectionMapperTask mapperTask = new CollectionMapperTask(mapper, objectClass, objectName, objectCodec.getClass()); + CollectionMapperTask mapperTask = new CollectionMapperTask(mapper, objectClass, objectCodec.getClass()); return new CoordinatorTask(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator, timeout, System.currentTimeMillis()); } diff --git a/redisson/src/main/java/org/redisson/mapreduce/RedissonMapReduce.java b/redisson/src/main/java/org/redisson/mapreduce/RedissonMapReduce.java index 59cf78a07e7..1af5987367e 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/RedissonMapReduce.java +++ b/redisson/src/main/java/org/redisson/mapreduce/RedissonMapReduce.java @@ -21,7 +21,6 @@ import org.redisson.api.RObject; import org.redisson.api.RedissonClient; import org.redisson.api.mapreduce.RCollator; -import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.api.mapreduce.RMapReduce; import org.redisson.api.mapreduce.RMapper; import org.redisson.api.mapreduce.RReducer; @@ -65,7 +64,7 @@ public RMapReduce reducer(RReducer reducer) { @Override protected Callable createTask(String resultMapName, RCollator collator) { - MapperTask mapperTask = new MapperTask(mapper, objectClass, objectName, objectCodec.getClass()); + MapperTask mapperTask = new MapperTask(mapper, objectClass, objectCodec.getClass()); return new CoordinatorTask(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator, timeout, System.currentTimeMillis()); } diff --git a/redisson/src/main/java/org/redisson/mapreduce/SubTasksExecutor.java b/redisson/src/main/java/org/redisson/mapreduce/SubTasksExecutor.java new file mode 100644 index 00000000000..93a1d1599c9 --- /dev/null +++ b/redisson/src/main/java/org/redisson/mapreduce/SubTasksExecutor.java @@ -0,0 +1,115 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.mapreduce; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.redisson.api.RExecutorService; +import org.redisson.api.RFuture; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + +/** + * + * @author Nikita Koksharov + * + */ +public class SubTasksExecutor { + + public static class LatchListener implements FutureListener { + + private CountDownLatch latch; + + public LatchListener() { + } + + public LatchListener(CountDownLatch latch) { + super(); + this.latch = latch; + } + + @Override + public void operationComplete(Future future) throws Exception { + latch.countDown(); + } + + } + + private final List> futures = new ArrayList>(); + private final CountDownLatch latch; + private final RExecutorService executor; + private final long startTime; + private final long timeout; + + public SubTasksExecutor(RExecutorService executor, int workersAmount, long startTime, long timeout) { + this.executor = executor; + this.latch = new CountDownLatch(workersAmount); + this.startTime = startTime; + this.timeout = timeout; + } + + public void submit(Runnable runnable) { + RFuture future = executor.submitAsync(runnable); + future.addListener(new LatchListener(latch)); + futures.add(future); + } + + private void cancel(List> futures) { + for (RFuture future : futures) { + future.cancel(true); + } + } + + private boolean isTimeoutExpired(long timeSpent) { + return timeSpent > timeout && timeout > 0; + } + + public boolean await() throws Exception { + if (Thread.currentThread().isInterrupted()) { + cancel(futures); + return false; + } + + long timeSpent = System.currentTimeMillis() - startTime; + if (isTimeoutExpired(timeSpent)) { + cancel(futures); + throw new MapReduceTimeoutException(); + } + try { + if (timeout > 0 && !latch.await(timeout - timeSpent, TimeUnit.MILLISECONDS)) { + cancel(futures); + throw new MapReduceTimeoutException(); + } + if (timeout == 0) { + latch.await(); + } + } catch (InterruptedException e) { + cancel(futures); + return false; + } + for (RFuture rFuture : futures) { + if (!rFuture.isSuccess()) { + throw (Exception) rFuture.cause(); + } + } + return true; + } + +}