Skip to content

Commit

Permalink
ISPN-6354 Remove Map/Reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
wburns authored and tristantarrant committed Mar 17, 2016
1 parent 7cee967 commit 799fc1a
Show file tree
Hide file tree
Showing 96 changed files with 64 additions and 6,966 deletions.
Expand Up @@ -2,17 +2,16 @@

import org.infinispan.Cache;
import org.infinispan.distexec.DistributedExecutorService;
import org.infinispan.distexec.mapreduce.Mapper;

/**
* ContextInputCache keeps track of {@link Input} cache to be injected into Callables from
* {@link DistributedExecutorService} and {@link Mapper} from {@link MapReduceTask} using CDI
* {@link DistributedExecutorService} using CDI
* mechanism. The cache injected will be the cache used to construct
* {@link DistributedExecutorService} and/or {@link MapReduceTask}
*
* {@link DistributedExecutorService}
*
* @author Vladimir Blagoejvic
* @since 5.2
* @see InfinispanExtension#registerInputCacheCustomBean(javax.enterprise.inject.spi.AfterBeanDiscovery,
* @see InfinispanExtensionEmbedded#registerInputCacheCustomBean(javax.enterprise.inject.spi.AfterBeanDiscovery,
* javax.enterprise.inject.spi.BeanManager)
*
*/
Expand Down

This file was deleted.

3 changes: 1 addition & 2 deletions cdi/embedded/src/main/java/org/infinispan/cdi/Input.java
@@ -1,7 +1,6 @@
package org.infinispan.cdi;

import org.infinispan.distexec.DefaultExecutorService;
import org.infinispan.distexec.mapreduce.MapReduceTask;

import javax.inject.Qualifier;
import java.lang.annotation.Documented;
Expand All @@ -13,7 +12,7 @@

/**
* Qualifier indicating the injected Cache should be the input Cache used to create
* {@link DefaultExecutorService} or {@link MapReduceTask}
* {@link DefaultExecutorService}
*
* @author Vladimir Blagojevic
* @since 5.2
Expand Down

This file was deleted.

This file was deleted.

Expand Up @@ -101,22 +101,6 @@ private void populateCache(String cacheName) {
clients.get(i % NUM_SERVERS).getCache(cacheName).put(String.format("Key %d", i), String.format("Value %d", i));
}

public void testRemoteMapReduce() throws Exception {
String cacheName = "testRemoteMapReduce";
ConfigurationBuilder builder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true);
builder.dataContainer().keyEquivalence(new AnyServerEquivalence()).valueEquivalence(new AnyServerEquivalence()).compatibility().enable().marshaller(new GenericJBossMarshaller());
defineInAll(cacheName, builder);
RemoteCache<String, String> cache = clients.get(0).getCache(cacheName);
RemoteCache<String, String> scriptCache = clients.get(0).getCache(SCRIPT_CACHE);
loadData(cache, "/macbeth.txt");
loadScript(scriptCache, "/wordCountMapper.js");
loadScript(scriptCache, "/wordCountReducer.js");
loadScript(scriptCache, "/wordCountCollator.js");
Map<String, Double> results = cache.execute("wordCountMapper.js", new HashMap<String, String>());
assertEquals(20, results.size());
assertTrue(results.get("macbeth").equals(Double.valueOf(287)));
}

@Test(enabled = false, description = "Disabling this test until the distributed scripts in DIST mode are fixed - ISPN-6173")
public void testRemoteMapReduceWithStreams() throws Exception {
String cacheName = "testRemoteMapReduce_Streams";
Expand Down
26 changes: 0 additions & 26 deletions core/src/main/java/org/infinispan/commands/CommandsFactory.java
Expand Up @@ -18,8 +18,6 @@
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.read.KeySetCommand;
import org.infinispan.commands.read.MapCombineCommand;
import org.infinispan.commands.read.ReduceCommand;
import org.infinispan.commands.read.SizeCommand;
import org.infinispan.commands.remote.ClusteredGetAllCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
Expand Down Expand Up @@ -49,8 +47,6 @@
import org.infinispan.commons.api.functional.EntryView.ReadWriteEntryView;
import org.infinispan.commons.api.functional.EntryView.WriteEntryView;
import org.infinispan.context.Flag;
import org.infinispan.distexec.mapreduce.Mapper;
import org.infinispan.distexec.mapreduce.Reducer;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.functional.impl.Params;
Expand Down Expand Up @@ -362,28 +358,6 @@ public interface CommandsFactory {
*/
<T>DistributedExecuteCommand<T> buildDistributedExecuteCommand(Callable<T> callable, Address sender, Collection keys);

/**
* Builds a MapCombineCommand used for migration and map phase execution of MapReduce tasks.
*
* @param m Mapper for MapReduceTask
* @param r Combiner for MapReduceTask
* @param keys keys used in MapReduceTask
* @return created MapCombineCommand
*/
<KIn, VIn, KOut, VOut> MapCombineCommand<KIn, VIn, KOut, VOut> buildMapCombineCommand(
String taskId, Mapper<KIn, VIn, KOut, VOut> m, Reducer<KOut, VOut> r,
Collection<KIn> keys);

/**
* Builds a ReduceCommand used for migration and reduce phase execution of MapReduce tasks.
*
* @param r Reducer for MapReduceTask
* @param keys keys used in MapReduceTask
* @return created ReduceCommand
*/
<KOut, VOut> ReduceCommand<KOut, VOut> buildReduceCommand(String taskId,
String destinationCache, Reducer<KOut, VOut> r, Collection<KOut> keys);

/**
* @see GetInDoubtTxInfoCommand
*/
Expand Down

0 comments on commit 799fc1a

Please sign in to comment.