Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

fixed serialization exception due to metrics classes not being regist…

…ered properly
  • Loading branch information...
commit 863669e0d6838d78bfbf7d7b2a40292dafb6ae78 1 parent 69ada70
Jason Jackson authored
View
5 conf/defaults.yaml
@@ -103,9 +103,4 @@ topology.max.error.report.per.interval: 5
topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory"
topology.trident.batch.emit.interval.millis: 500
-### register classes used in implementation of metrics api.
-topology.kryo.register:
- - backtype.storm.metric.api.IMetricsConsumer$TaskInfo
- - backtype.storm.metric.api.IMetricsConsumer$DataPoint
-
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
View
2  src/clj/backtype/storm/daemon/common.clj
@@ -239,7 +239,7 @@
(->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)
(map #(get % "class"))
(number-duplicates)
- (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))
+ (map #(str Constants/METRICS_COMPONENT_ID_PREFIX "_" %))))
(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf]
(let [inputs (->> (for [comp-id components-ids-that-emit-metrics]
View
2  src/jvm/backtype/storm/metric/api/IMetricsConsumer.java
@@ -7,6 +7,7 @@
public interface IMetricsConsumer {
public static class TaskInfo {
+ public TaskInfo() {}
public TaskInfo(String srcWorkerHost, int srcWorkerPort, String srcComponentId, int srcTaskId, long timestamp, int updateIntervalSecs) {
this.srcWorkerHost = srcWorkerHost;
this.srcWorkerPort = srcWorkerPort;
@@ -23,6 +24,7 @@ public TaskInfo(String srcWorkerHost, int srcWorkerPort, String srcComponentId,
public int updateIntervalSecs;
}
public static class DataPoint {
+ public DataPoint() {}
public DataPoint(String name, Object value) {
this.name = name;
this.value = value;
View
2  src/jvm/backtype/storm/serialization/SerializationFactory.java
@@ -38,6 +38,8 @@ public static Kryo getKryo(Map conf) {
k.register(BigInteger.class, new BigIntegerSerializer());
k.register(TransactionAttempt.class);
k.register(Values.class);
+ k.register(backtype.storm.metric.api.IMetricsConsumer.DataPoint.class);
+ k.register(backtype.storm.metric.api.IMetricsConsumer.TaskInfo.class);
try {
JavaBridge.registerPrimitives(k);
JavaBridge.registerCollections(k);
Please sign in to comment.
Something went wrong with that request. Please try again.