Skip to content

Commit

Permalink
Add RG support for Spark, add MR implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mcimbora authored and rvansa committed Jul 25, 2016
1 parent fb56ced commit b63de0e
Show file tree
Hide file tree
Showing 42 changed files with 1,515 additions and 188 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/org/radargun/config/Cluster.java
Expand Up @@ -140,7 +140,7 @@ public Set<Integer> getSlaves(String group) {
}
index += g.size;
}
throw new IllegalArgumentException("Group " + group + " was not defined");
return Collections.EMPTY_SET;
}

public class Group implements Serializable {
Expand Down
47 changes: 47 additions & 0 deletions core/src/main/java/org/radargun/utils/KeyValueProperty.java
@@ -0,0 +1,47 @@
package org.radargun.utils;

import org.radargun.config.DefinitionElement;
import org.radargun.config.Property;

/**
* @author Matej Cimbora
*/
@DefinitionElement(name = "property", doc = "Holder for key-value pairs.")
public class KeyValueProperty {

@Property(doc = "String key.")
private String key;
@Property(doc = "String value.")
private String value;

public String getKey() {
return key;
}

public void setKey(String key) {
this.key = key;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

@Override
public String toString() {
return "KeyValueProperty{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
'}';
}

public static class KeyValuePairListConverter extends ReflexiveConverters.ListConverter {

public KeyValuePairListConverter() {
super(new Class[] {KeyValueProperty.class});
}
}
}
26 changes: 26 additions & 0 deletions core/src/main/java/org/radargun/utils/Utils.java
Expand Up @@ -476,6 +476,32 @@ public static <T> T invokeMethodWithString(T object, Map<String, String> propert
return object;
}

/**
*
* Invoke a public method with a String argument on an Object
*
* @param object
* the Object where the method is invoked
* @param properties
* a Collection of key-value pairs where the public method name is the key, and the String parameter is the value
* @return the modified Object, or <code>null</code> if the field can't be changed
*/
public static <T> T invokeMethodWithProperties(T object, Collection<KeyValueProperty> properties) {
Class<?> clazz = object.getClass();
if (properties != null) {
for (KeyValueProperty property : properties) {
try {
Method method = clazz.getDeclaredMethod(property.getKey(), String.class);
method.invoke(object, property.getValue());
} catch (Exception e) {
log.error("Error invoking method named " + property.getKey() + " with value " + property.getValue(), e);
return null;
}
}
}
return object;
}

public static void dumpHeap(String file) throws Exception {
if (hotspotMBean == null) {
synchronized (Utils.class) {
Expand Down
16 changes: 16 additions & 0 deletions extensions/cache/pom.xml
Expand Up @@ -22,4 +22,20 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Expand Up @@ -26,7 +26,6 @@ public static Map<Class<?>, Object> getAllTraits() {
traitMap.put(org.radargun.traits.ConditionalOperations.class, new ConditionalOperations(new ConditionalOperations.ConditionalOperationsCache(concurrentHashMap)));
traitMap.put(org.radargun.traits.Iterable.class, new Iterable<>(concurrentHashMap));
traitMap.put(org.radargun.traits.CacheInformation.class, new CacheInformation(new CacheInformation.Cache(concurrentHashMap)));
traitMap.put(org.radargun.traits.MapReducer.class, new MapReducer(concurrentHashMap));
traitMap.put(org.radargun.traits.DistributedTaskExecutor.class, new DistributedTaskExecutor(concurrentHashMap));
traitMap.put(org.radargun.traits.TopologyHistory.class, new TopologyHistory());
traitMap.put(org.radargun.traits.Debugable.class, new Debuggable());
Expand Down Expand Up @@ -584,118 +583,6 @@ public org.radargun.traits.TopologyHistory.Event copy() {
}
}

public static class MapReducer implements org.radargun.traits.MapReducer {

private ConcurrentHashMap cache;

public MapReducer(ConcurrentHashMap cache) {
this.cache = cache;
}

@Override
public Builder builder(String cacheName) {
return new Builder(cache);
}

@Override
public boolean supportsResultCacheName() {
return true;
}

@Override
public boolean supportsIntermediateSharedCache() {
return true;
}

@Override
public boolean supportsCombiner() {
return true;
}

@Override
public boolean supportsTimeout() {
return true;
}

@Override
public boolean supportsDistributedReducePhase() {
return true;
}

private static class Builder implements org.radargun.traits.MapReducer.Builder {

private ConcurrentHashMap cache;

public Builder(ConcurrentHashMap cache) {
this.cache = cache;
}

@Override
public org.radargun.traits.MapReducer.Builder distributedReducePhase(boolean distributedReducePhase) {
return this;
}

@Override
public org.radargun.traits.MapReducer.Builder useIntermediateSharedCache(boolean useIntermediateSharedCache) {
return this;
}

@Override
public org.radargun.traits.MapReducer.Builder timeout(long timeout, TimeUnit unit) {
return this;
}

@Override
public org.radargun.traits.MapReducer.Builder resultCacheName(String resultCacheName) {
return this;
}

@Override
public Task build() {
return new Task(cache);
}

@Override
public org.radargun.traits.MapReducer.Builder collator(String collatorFqn, Map collatorParameters) {
return this;
}

@Override
public org.radargun.traits.MapReducer.Builder combiner(String combinerFqn, Map combinerParameters) {
return this;
}

@Override
public org.radargun.traits.MapReducer.Builder reducer(String reducerFqn, Map reducerParameters) {
return this;
}

@Override
public org.radargun.traits.MapReducer.Builder mapper(String mapperFqn, Map mapperParameters) {
return this;
}
}

private static class Task implements org.radargun.traits.MapReducer.Task {

private ConcurrentHashMap cache;

public Task(ConcurrentHashMap cache) {
this.cache = cache;
}

@Override
public Map execute() {
return Collections.unmodifiableMap(cache);
}

@Override
public Object executeWithCollator() {
return cache.size();
}
}
}

public static class DistributedTaskExecutor implements org.radargun.traits.DistributedTaskExecutor {

private ConcurrentHashMap cache;
Expand Down
42 changes: 42 additions & 0 deletions extensions/mapreduce/pom.xml
@@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.radargun</groupId>
<artifactId>radargun-extensions</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>radargun-mapreduce</artifactId>
<name>Radargun Map-reduce support</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.radargun</groupId>
<artifactId>radargun-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.radargun</groupId>
<artifactId>radargun-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.radargun</groupId>
<artifactId>radargun-cache</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.radargun</groupId>
<artifactId>radargun-cache</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

</project>

0 comments on commit b63de0e

Please sign in to comment.