Skip to content


Subversion checkout URL

You can clone with
Download ZIP


Storm 0.7.0 changes #2

merged 8 commits into from

2 participants


Minimal changes to make ScalaStorm work with Storm 0.7

Causes an issue with slf4j -- one that can be dealt with fairly easily, but that isn't yet a part of a Storm SNAPSHOT or release:



Thanks Mark. I've been busy, but will merge this today or tomorrow. I'll also have a look at hte slf4j issue.


The slf4j issue is completely resolved with the change to Storm's project.clj that I requested Nathan to pull, but it didn't go into 0.7.1, so it'll be at least a while before it goes into any Storm release. I didn't really look at what it would take to get the ScalaStorm build to back off to slf4j 1.5.8, but that would be the other way to go.

@velvia velvia merged commit c70ab8d into velvia:master

Mark, I see a warning about slf4j, but was able to actually run the topologies (ok I only ran one) in local mode. Just curious if that is what you are seeing, what operational problems you see (submission to cluster?)

For now I'll see about backing off to 1.5.8, if that's easily doable.


Yes, the topologies seem to run fine in local mode even with the slf4j mismatch error messages. I can't recall whether this worked with an actual storm cluster or not. My cluster is down at the moment, but I'll try to get it back running on 0.7.1 tonight to check on this.


The change from IRichSpout to BaseRichSpout already allows 0.7.1 to build and run, so I don't see it being a problem. Of course, neither the prior changes nor a change to 0.7.1 takes any advantage of the new functionality that is available, so there is work to be done to make use of TransactionalSpouts and other stuff, but I don't think moving to 0.7.1 breaks anything that is already working.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 12, 2012
  1. @markhamstra

    updaye to 0.7.0-rc

    markhamstra committed
Commits on Feb 13, 2012
  1. @markhamstra

    update to 0.7.0-rc

    markhamstra committed
  2. @markhamstra
  3. @markhamstra
  4. @markhamstra
  5. @markhamstra

    cleanup imports

    markhamstra committed
Commits on Mar 28, 2012
  1. @markhamstra
Commits on Mar 29, 2012
  1. @markhamstra

    merge to velvia master

    markhamstra committed
This page is out of date. Refresh to see the latest.
Showing with 6 additions and 13 deletions.
  1. +1 −1  build.sbt
  2. +2 −4 dsl/StormBolt.scala
  3. +3 −8 dsl/StormSpout.scala
2  build.sbt
@@ -17,7 +17,7 @@ unmanagedSourceDirectories in Compile <<= Seq( baseDirectory( _ / "examples" ),
resolvers ++= Seq("clojars" at "",
"clojure-releases" at "")
-libraryDependencies += "storm" % "storm" % "0.6.0"
+libraryDependencies += "storm" % "storm" % "0.7.0"
// This is to prevent error [java.lang.OutOfMemoryError: PermGen space]
javaOptions += "-XX:MaxPermSize=1g"
6 dsl/StormBolt.scala
@@ -2,7 +2,7 @@
package storm.scala.dsl
-import backtype.storm.topology.IRichBolt
+import backtype.storm.topology.base.BaseRichBolt
import backtype.storm.topology.OutputFieldsDeclarer
import backtype.storm.tuple.{Fields, Tuple}
import backtype.storm.task.OutputCollector
@@ -19,7 +19,7 @@ import backtype.storm.task.TopologyContext
// <tuple> emit (...)
// using no anchor emit (...)
abstract class StormBolt(val streamToFields: collection.Map[String, List[String]])
- extends IRichBolt with SetupFunc with BoltDsl {
+ extends BaseRichBolt with SetupFunc with BoltDsl {
var _context: TopologyContext = _
var _conf: java.util.Map[_, _] = _
@@ -33,8 +33,6 @@ abstract class StormBolt(val streamToFields: collection.Map[String, List[String]
- def cleanup {}
def declareOutputFields(declarer: OutputFieldsDeclarer) {
streamToFields foreach { case(stream, fields) =>
declarer.declareStream(stream, new Fields(fields:_*))
11 dsl/StormSpout.scala
@@ -5,13 +5,14 @@ package storm.scala.dsl
import java.util.Map
import backtype.storm.task.TopologyContext
import backtype.storm.spout.SpoutOutputCollector
-import backtype.storm.topology.{OutputFieldsDeclarer, IRichSpout}
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichSpout
import backtype.storm.tuple.Fields
import collection.JavaConverters._
import collection.JavaConversions._
abstract class StormSpout(val outputFields: List[String],
- val isDistributed: Boolean = false) extends IRichSpout with SetupFunc {
+ val isDistributed: Boolean = false) extends BaseRichSpout with SetupFunc {
var _context:TopologyContext = _
var _collector:SpoutOutputCollector = _
@@ -21,18 +22,12 @@ abstract class StormSpout(val outputFields: List[String],
- def close() = {}
// nextTuple needs to be defined by each spout inheriting from here
//def nextTuple() {}
def declareOutputFields(declarer: OutputFieldsDeclarer) =
declarer.declare(new Fields(outputFields))
- def ack(tuple: AnyRef) = {}
- def fail(tuple: AnyRef) = {}
// DSL for emit and emitDirect.
// [toStream(<streamId>)] emit (val1, val2, ..)
// [using][msgId <messageId>] [toStream <streamId>] emit (val1, val2, ...)
Something went wrong with that request. Please try again.