Permalink
Browse files

Moved IOutputCollector::reportError into its own interface IErrorRepo…

…rter
  • Loading branch information...
1 parent f1a4cf3 commit 69ada7049d110ff4aad5c5acc4471e3bf0383a6c Jason Jackson committed Nov 2, 2012
@@ -11,7 +11,7 @@
(defn impl-init [] [[] (atom [])])
-(defn impl-prepare [this conf {:keys [ns var-name]} ctx]
+(defn impl-prepare [this conf {:keys [ns var-name]} ctx error-reporter]
(reset! (.state this) @(intern ns var-name))
(reset! @(.state this) []))
@@ -1,8 +1,9 @@
package backtype.storm.metric;
-import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.Config;
+import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IBolt;
+import backtype.storm.task.IErrorReporter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
@@ -28,7 +29,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
throw new RuntimeException("Could not instantiate a class listed in config under section " +
Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);
}
- _metricsConsumer.prepare(stormConf, _registrationArgument, context, collector);
+ _metricsConsumer.prepare(stormConf, _registrationArgument, context, (IErrorReporter)collector);
_collector = collector;
}
@@ -1,6 +1,6 @@
package backtype.storm.metric.api;
-import backtype.storm.task.OutputCollector;
+import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;
import java.util.Collection;
import java.util.Map;
@@ -35,7 +35,7 @@ public String toString() {
public Object value;
}
- void prepare(Map stormConf, Object registrationArgument, TopologyContext context, OutputCollector outputCollector);
+ void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter);
void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
void cleanup();
}
@@ -0,0 +1,5 @@
+package backtype.storm.task;
+
+public interface IErrorReporter {
+ void reportError(Throwable error);
+}
@@ -4,13 +4,12 @@
import java.util.Collection;
import java.util.List;
-public interface IOutputCollector {
+public interface IOutputCollector extends IErrorReporter {
/**
* Returns the task ids that received the tuples.
*/
List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);
void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
void ack(Tuple input);
void fail(Tuple input);
- void reportError(Throwable error);
}

0 comments on commit 69ada70

Please sign in to comment.