Skip to content
Browse files

Add map wrapper for RecordWriter in POJOs

  • Loading branch information...
1 parent cdbf02e commit 800824c5136af15da5f96ec42e27fbd6c56fbe20 @dsyer dsyer committed Jan 25, 2011
View
6 README.md
@@ -56,15 +56,15 @@ API. For example:
public class PojoMapReducer {
@Mapper
- public void map(String value, RecordWriter<String, Integer> writer) throws InterruptedException, IOException {
+ public void map(String value, Map<String, Integer> writer) {
StringTokenizer itr = new StringTokenizer(value);
while (itr.hasMoreTokens()) {
- writer.write(itr.nextToken(), 1);
+ writer.put(itr.nextToken(), 1);
}
}
@Reducer
- public int reduce(Iterable<Integer> values) throws InterruptedException, IOException {
+ public int reduce(Iterable<Integer> values) {
int sum = 0;
for (Integer val : values) {
sum += val;
View
7 ...p-core/src/main/java/org/springframework/hadoop/mapreduce/ExpressionEvaluatingMapper.java
@@ -88,12 +88,15 @@ protected void map(Writable key, Writable value, Context context) throws IOExcep
private Writable value;
+ private Map<Object, Object> map;
+
public ParametersWrapper(Writable key, Writable value,
Mapper<Writable, Writable, Writable, Writable>.Context context, ConversionService conversionService, Class<? extends Writable> outputKeyType, Class<? extends Writable> outputValueType) {
this.key = key;
this.context = context;
this.value = value;
this.writer = new ConversionServiceRecordWriter(context, conversionService, outputKeyType, outputValueType);
+ this.map = new RecordWriterMap(writer);
}
public Writable getKey() {
@@ -108,6 +111,10 @@ public Writable getKey() {
return writer;
}
+ public Map<Object, Object> getMap() {
+ return map;
+ }
+
public Writable getValue() {
return value;
}
View
7 ...-core/src/main/java/org/springframework/hadoop/mapreduce/ExpressionEvaluatingReducer.java
@@ -97,6 +97,8 @@ protected void reduce(Writable key, Iterable<Writable> values, Context context)
private Iterable<?> values;
+ private Map<Object,Object> map;
+
public ParametersWrapper(Writable key, Iterable<Writable> values, Class<?> targetValueType,
Reducer<Writable, Writable, Writable, Writable>.Context context, ConversionService conversionService,
Class<? extends Writable> outputKeyType, Class<? extends Writable> outputValueType) {
@@ -105,6 +107,7 @@ public ParametersWrapper(Writable key, Iterable<Writable> values, Class<?> targe
this.values = new ConversionServiceIterableAdapter<Writable, Object>(values, targetValueType,
conversionService);
this.writer = new ConversionServiceRecordWriter(context, conversionService, outputKeyType, outputValueType);
+ this.map = new RecordWriterMap(writer);
}
public Writable getKey() {
@@ -118,6 +121,10 @@ public Writable getKey() {
public RecordWriter<Object, Object> getWriter() {
return writer;
}
+
+ public Map<Object, Object> getMap() {
+ return map;
+ }
public Iterable<?> getValues() {
return values;
View
103 spring-hadoop-core/src/main/java/org/springframework/hadoop/mapreduce/RecordWriterMap.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2006-2011 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.hadoop.mapreduce;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.springframework.hadoop.HadoopException;
+
+/**
+ * @author Dave Syer
+ *
+ */
+public class RecordWriterMap implements Map<Object, Object> {
+
+ private final RecordWriter<Object, Object> writer;
+ private Object key;
+ private Object value;
+ private int size;
+
+ public RecordWriterMap(RecordWriter<Object, Object> writer) {
+ this.writer = writer;
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public boolean isEmpty() {
+ return size>0;
+ }
+
+ public boolean containsKey(Object key) {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean containsValue(Object value) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object get(Object key) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object put(Object key, Object value) {
+ this.key = key;
+ this.value = value;
+ this.size++;
+ try {
+ writer.write(key, value);
+ }
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new HadoopException("Could not write to record writer", e);
+ }
+ return null;
+ }
+
+ public Object remove(Object key) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void putAll(Map<? extends Object, ? extends Object> m) {
+ for (Entry<? extends Object, ? extends Object> entry : m.entrySet()) {
+ put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public void clear() {
+ }
+
+ public Set<Object> keySet() {
+ return Collections.singleton(key);
+ }
+
+ public Collection<Object> values() {
+ return Collections.singleton(value);
+ }
+
+ public Set<Entry<Object, Object>> entrySet() {
+ throw new UnsupportedOperationException();
+ }
+
+}
View
9 spring-hadoop-core/src/test/java/org/springframework/hadoop/test/PojoConfiguration.java
@@ -15,11 +15,10 @@
*/
package org.springframework.hadoop.test;
-import java.io.IOException;
+import java.util.Map;
import java.util.StringTokenizer;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
@@ -75,15 +74,15 @@ public PojoMapReducer pojo() {
public class PojoMapReducer {
@org.springframework.hadoop.annotation.Mapper
- public void map(String value, RecordWriter<String, Integer> writer) throws InterruptedException, IOException {
+ public void map(String value, Map<String, Integer> writer) {
StringTokenizer itr = new StringTokenizer(value);
while (itr.hasMoreTokens()) {
- writer.write(itr.nextToken(), 1);
+ writer.put(itr.nextToken(), 1);
}
}
@org.springframework.hadoop.annotation.Reducer
- public int reduce(Iterable<Integer> values) throws InterruptedException, IOException {
+ public int reduce(Iterable<Integer> values) {
int sum = 0;
for (Integer val : values) {
sum += val;

0 comments on commit 800824c

Please sign in to comment.
Something went wrong with that request. Please try again.