Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

small fixes

  • Loading branch information...
commit fc8c616f1133617ddd552f21dd74e5ffc5f14b42 1 parent 2acd487
Jonathan Coveney authored
Showing with 5 additions and 14 deletions.
  1. +5 −14 src/org/apache/pig/IteratingAccumulatorEvalFunc.java
19 src/org/apache/pig/IteratingAccumulatorEvalFunc.java
View
@@ -1,16 +1,14 @@
package org.apache.pig;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.data.DataBag;
public abstract class IteratingAccumulatorEvalFunc<T> extends AccumulatorEvalFunc<T> implements TerminatingAccumulator<T> {
private boolean isInitialized = false;
@@ -39,7 +37,7 @@ public void run() {
try {
returnValue = exec(dqi);
isFinished = true;
- } catch (IOException e) {
+ } catch (Exception e) {
executionThreadException = e;
exceptionThrown = true;
}
@@ -55,8 +53,6 @@ public boolean isFinished() {
return isFinished;
}
- //need a mechanism to see if the return value has come yet
-
@Override
public void accumulate(Tuple input) throws IOException {
if (!isInitialized)
@@ -71,7 +67,7 @@ public void accumulate(Tuple input) throws IOException {
try {
added = queue.offer(t, WAIT_TO_OFFER, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- }
+ } //TODO handle the exception?
if (exceptionThrown)
throw new RuntimeException("Exception thrown in thread: ", executionThreadException);
@@ -82,10 +78,6 @@ public void accumulate(Tuple input) throws IOException {
public T getValue() {
noMoreValues = true;
- if (exceptionThrown)
- throw new RuntimeException("Exception thrown in thread: ", executionThreadException);
-
- //TODO handle the exception
do {
if (exceptionThrown)
throw new RuntimeException("Exception thrown in thread: ", executionThreadException);
@@ -93,8 +85,7 @@ public T getValue() {
try {
executionThread.join(WAIT_TO_JOIN);
} catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ } //TODO handle the exception?
} while (executionThread.isAlive());
return returnValue;
@@ -144,7 +135,7 @@ public Tuple next() {
Tuple t = next;
if (t == null)
- throw new RuntimeException("Entered inconsistent state!"); //do I need to do something to surface errors from the thread?
+ throw new RuntimeException("Entered inconsistent state!");
next = null;
Please sign in to comment.
Something went wrong with that request. Please try again.