Skip to content

Commit

Permalink
Moved IOutputCollector::reportError into its own interface IErrorRepo…
Browse files Browse the repository at this point in the history
…rter
  • Loading branch information
Jason Jackson committed Nov 2, 2012
1 parent f1a4cf3 commit 69ada70
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/clj/backtype/storm/metric/testing.clj
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


(defn impl-init [] [[] (atom [])]) (defn impl-init [] [[] (atom [])])


(defn impl-prepare [this conf {:keys [ns var-name]} ctx] (defn impl-prepare [this conf {:keys [ns var-name]} ctx error-reporter]
(reset! (.state this) @(intern ns var-name)) (reset! (.state this) @(intern ns var-name))
(reset! @(.state this) [])) (reset! @(.state this) []))


Expand Down
5 changes: 3 additions & 2 deletions src/jvm/backtype/storm/metric/MetricsConsumerBolt.java
Original file line number Original file line Diff line number Diff line change
@@ -1,8 +1,9 @@
package backtype.storm.metric; package backtype.storm.metric;


import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.Config; import backtype.storm.Config;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IBolt; import backtype.storm.task.IBolt;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.OutputCollector; import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext; import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Tuple;
Expand All @@ -28,7 +29,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
throw new RuntimeException("Could not instantiate a class listed in config under section " + throw new RuntimeException("Could not instantiate a class listed in config under section " +
Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e); Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);
} }
_metricsConsumer.prepare(stormConf, _registrationArgument, context, collector); _metricsConsumer.prepare(stormConf, _registrationArgument, context, (IErrorReporter)collector);
_collector = collector; _collector = collector;
} }


Expand Down
4 changes: 2 additions & 2 deletions src/jvm/backtype/storm/metric/api/IMetricsConsumer.java
Original file line number Original file line Diff line number Diff line change
@@ -1,6 +1,6 @@
package backtype.storm.metric.api; package backtype.storm.metric.api;


import backtype.storm.task.OutputCollector; import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext; import backtype.storm.task.TopologyContext;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
Expand Down Expand Up @@ -35,7 +35,7 @@ public String toString() {
public Object value; public Object value;
} }


void prepare(Map stormConf, Object registrationArgument, TopologyContext context, OutputCollector outputCollector); void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter);
void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints); void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
void cleanup(); void cleanup();
} }
5 changes: 5 additions & 0 deletions src/jvm/backtype/storm/task/IErrorReporter.java
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,5 @@
package backtype.storm.task;

public interface IErrorReporter {
void reportError(Throwable error);
}
3 changes: 1 addition & 2 deletions src/jvm/backtype/storm/task/IOutputCollector.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;


public interface IOutputCollector { public interface IOutputCollector extends IErrorReporter {
/** /**
* Returns the task ids that received the tuples. * Returns the task ids that received the tuples.
*/ */
List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple); List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);
void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple); void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
void ack(Tuple input); void ack(Tuple input);
void fail(Tuple input); void fail(Tuple input);
void reportError(Throwable error);
} }

0 comments on commit 69ada70

Please sign in to comment.