Skip to content

Commit

Permalink
Upgrade to Storm API 1.0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
Timur Abishev committed Nov 21, 2016
1 parent 0a01efa commit 3397c7d
Show file tree
Hide file tree
Showing 22 changed files with 104 additions and 140 deletions.
17 changes: 4 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ val chillVersion = "0.7.3"
val scalaCheckVersion = "1.12.2"
val scalaTestVersion = "2.2.4"
val slf4jVersion = "1.6.6"
val stormKafkaVersion = "0.9.0-wip6-scala292-multischeme"
val stormKestrelVersion = "0.9.0-wip5-multischeme"
val stormVersion = "0.9.0-wip15"
val stormVersion = "1.0.2"
val twitter4jVersion = "3.0.3"

val extraSettings =
Expand All @@ -35,16 +33,14 @@ val sharedSettings = extraSettings ++ ciSettings ++ Seq(
javacOptions in doc := Seq("-source", "1.6"),
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % slf4jVersion,
"storm" % "storm" % stormVersion % "provided",
"org.apache.storm" % "storm-core" % stormVersion % "provided",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test"
),
scalacOptions ++= Seq("-unchecked", "-deprecation", "-Yresolve-term-conflict:package"),
resolvers ++= Seq(
Opts.resolver.sonatypeSnapshots,
Opts.resolver.sonatypeReleases,
"Clojars Repository" at "http://clojars.org/repo",
"Conjars Repository" at "http://conjars.org/repo"
Opts.resolver.sonatypeReleases
),

parallelExecution in Test := false,
Expand Down Expand Up @@ -144,7 +140,6 @@ lazy val tormenta = Project(
publishLocal := { }
).aggregate(
tormentaCore,
tormentaKestrel,
tormentaKafka,
tormentaTwitter,
tormentaAvro
Expand All @@ -168,11 +163,7 @@ lazy val tormentaTwitter = module("twitter").settings(
).dependsOn(tormentaCore % "test->test;compile->compile")

lazy val tormentaKafka = module("kafka").settings(
libraryDependencies += "storm" % "storm-kafka" % stormKafkaVersion
).dependsOn(tormentaCore % "test->test;compile->compile")

lazy val tormentaKestrel = module("kestrel").settings(
libraryDependencies += "storm" % "storm-kestrel" % stormKestrelVersion
libraryDependencies += "org.apache.storm" % "storm-kafka" % stormVersion
).dependsOn(tormentaCore % "test->test;compile->compile")

lazy val tormentaAvro = module("avro").settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ package com.twitter.tormenta.scheme.avro
import com.twitter.bijection.Injection
import scala.util.{ Failure, Success }
import com.twitter.tormenta.scheme.Scheme
import java.nio.ByteBuffer

/**
* @author Mansur Ashraf
* @since 9/14/13
*/
trait AvroScheme[T] extends Scheme[T] {
def decodeRecord(buffer: ByteBuffer)(implicit inj: Injection[T, Array[Byte]]): TraversableOnce[T] = {
val bytes = Array.ofDim[Byte](buffer.remaining)
buffer.get(bytes)

def decodeRecord(bytes: Array[Byte])(implicit inj: Injection[T, Array[Byte]]): TraversableOnce[T] = Injection.invert[T, Array[Byte]](bytes) match {
case Success(x) => Seq(x)
case Failure(x) => throw x
Injection.invert[T, Array[Byte]](bytes) match {
case Success(x) => Seq(x)
case Failure(x) => throw x
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.twitter.bijection.avro.GenericAvroCodecs
import com.twitter.bijection.Injection.connect
import com.twitter.tormenta.scheme.avro.AvroScheme
import com.twitter.tormenta.scheme.Scheme
import java.nio.ByteBuffer

/**
* @author Mansur Ashraf
Expand All @@ -32,9 +33,9 @@ object GenericAvroScheme {
}

class GenericAvroScheme[T <: GenericRecord](schema: Schema) extends Scheme[T] with AvroScheme[T] {
def decode(bytes: Array[Byte]): TraversableOnce[T] = {
def decode(buffer: ByteBuffer): TraversableOnce[T] = {
implicit val inj = GenericAvroCodecs[T](schema)
decodeRecord(bytes)
decodeRecord(buffer)
}
}

Expand All @@ -43,9 +44,9 @@ object BinaryAvroScheme {
}

class BinaryAvroScheme[T <: GenericRecord](schema: Schema) extends Scheme[T] with AvroScheme[T] {
def decode(bytes: Array[Byte]): TraversableOnce[T] = {
def decode(buffer: ByteBuffer): TraversableOnce[T] = {
implicit val inj = GenericAvroCodecs.toBinary[T](schema)
decodeRecord(bytes)
decodeRecord(buffer)
}
}

Expand All @@ -54,10 +55,10 @@ object JsonAvroScheme {
}

class JsonAvroScheme[T <: GenericRecord](schema: Schema) extends Scheme[T] with AvroScheme[T] {
def decode(bytes: Array[Byte]): TraversableOnce[T] = {
def decode(buffer: ByteBuffer): TraversableOnce[T] = {
implicit val avroInj = GenericAvroCodecs.toJson[T](schema)
implicit val inj = connect[T, String, Array[Byte]]
decodeRecord(bytes)
decodeRecord(buffer)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.twitter.tormenta.scheme.avro.AvroScheme
import com.twitter.tormenta.scheme.Scheme
import com.twitter.bijection.avro.SpecificAvroCodecs
import com.twitter.bijection.Injection._
import java.nio.ByteBuffer
import org.apache.avro.specific.SpecificRecordBase

/**
Expand All @@ -32,7 +33,7 @@ object SpecificAvroScheme {
}

class SpecificAvroScheme[T <: SpecificRecordBase: Manifest] extends Scheme[T] with AvroScheme[T] {
def decode(bytes: Array[Byte]): TraversableOnce[T] = {
def decode(bytes: ByteBuffer): TraversableOnce[T] = {
implicit val inj = SpecificAvroCodecs[T]
decodeRecord(bytes)
}
Expand All @@ -43,7 +44,7 @@ object BinaryAvroScheme {
}

class BinaryAvroScheme[T <: SpecificRecordBase: Manifest] extends Scheme[T] with AvroScheme[T] {
def decode(bytes: Array[Byte]): TraversableOnce[T] = {
def decode(bytes: ByteBuffer): TraversableOnce[T] = {
implicit val inj = SpecificAvroCodecs.toBinary[T]
decodeRecord(bytes)
}
Expand All @@ -54,7 +55,7 @@ object JsonAvroScheme {
}

class JsonAvroScheme[T <: SpecificRecordBase: Manifest](schema: Schema) extends Scheme[T] with AvroScheme[T] {
def decode(bytes: Array[Byte]): TraversableOnce[T] = {
def decode(bytes: ByteBuffer): TraversableOnce[T] = {
implicit val avroInj = SpecificAvroCodecs.toJson[T](schema)
implicit val inj = connect[T, String, Array[Byte]]
decodeRecord(bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecordBase
import scala.collection.JavaConverters._
import com.twitter.tormenta.scheme.Scheme
import java.nio.ByteBuffer

/**
* @author Mansur Ashraf
Expand Down Expand Up @@ -51,7 +52,7 @@ trait BaseAvroProperties {
forAll {
(a: A) =>
val b = inj(a)
val deserialize = scheme.deserialize(b)
val deserialize = scheme.deserialize(ByteBuffer.wrap(b))
val c = deserialize.asScala
!c.isEmpty && c.size == 1 && eqa.equiv(c.head.get(0).asInstanceOf[A], a)
}
Expand All @@ -61,7 +62,7 @@ trait BaseAvroProperties {
forAll {
(a: A) =>
val b = inj(a)
val c = scheme.deserialize(b).asScala
val c = scheme.deserialize(ByteBuffer.wrap(b)).asScala
!c.isEmpty && c.size == 1 && eqa.equiv(c.head.get(0).asInstanceOf[A], failedRecord)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package com.twitter.tormenta.scheme.spout

import backtype.storm.topology.TopologyBuilder
import backtype.storm.testing.{ MockedSources, TestGlobalCount }
import backtype.storm.LocalCluster
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.testing.{ MockedSources, TestGlobalCount }
import org.apache.storm.LocalCluster
import com.twitter.tormenta.spout.TraversableSpout
import backtype.storm.testing.CompleteTopologyParam
import backtype.storm.tuple.Values
import backtype.storm.Testing
import org.apache.storm.testing.CompleteTopologyParam
import org.apache.storm.tuple.Values
import org.apache.storm.Testing

import org.scalatest._
import scala.collection.JavaConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import org.scalatest._
import com.twitter.tormenta.AvroTestHelper
import com.twitter.bijection.avro.SpecificAvroCodecs
import com.twitter.tormenta.spout.{ TraversableSpout, Spout }
import backtype.storm.topology.TopologyBuilder
import backtype.storm.{ Testing, LocalCluster }
import backtype.storm.testing.{ TestGlobalCount, MockedSources, CompleteTopologyParam }
import backtype.storm.tuple.Values
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.{ Testing, LocalCluster }
import org.apache.storm.testing.{ TestGlobalCount, MockedSources, CompleteTopologyParam }
import org.apache.storm.tuple.Values
import avro.FiscalRecord
import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
*/
public class ScalaInterop {
protected ScalaInterop() { }
public static backtype.storm.drpc.DRPCSpout makeDRPC(String function) {
return new backtype.storm.drpc.DRPCSpout(function);
public static org.apache.storm.drpc.DRPCSpout makeDRPC(String function) {
return new org.apache.storm.drpc.DRPCSpout(function);
}
public static storm.trident.Stream newDRPCStream(storm.trident.TridentTopology top,
public static org.apache.storm.trident.Stream newDRPCStream(org.apache.storm.trident.TridentTopology top,
String streamName) {
return top.newDRPCStream(streamName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@

package com.twitter.tormenta.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.FixedTuple;
import backtype.storm.testing.CompletableSpout;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.FixedTuple;
import org.apache.storm.testing.CompletableSpout;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static backtype.storm.utils.Utils.get;

import static org.apache.storm.utils.Utils.get;

public class FixedTupleSpout implements IRichSpout, CompletableSpout {
private static final Map<String, Integer> acked = new HashMap<String, Integer>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ limitations under the License.

package com.twitter.tormenta.scheme

import backtype.storm.tuple.{ Fields, Values }
import backtype.storm.spout.MultiScheme
import org.apache.storm.tuple.{ Fields, Values }
import org.apache.storm.spout.MultiScheme
import java.util.{ List => JList }
import scala.collection.JavaConverters._
import java.io.Serializable
import java.nio.ByteBuffer
import org.slf4j.LoggerFactory

/**
Expand All @@ -29,19 +30,19 @@ import org.slf4j.LoggerFactory
*/

object Scheme {
val identity: Scheme[Array[Byte]] = Scheme(Some(_))
val identity: Scheme[ByteBuffer] = Scheme(Some(_))

def apply[T](decodeFn: Array[Byte] => TraversableOnce[T]) =
def apply[T](decodeFn: ByteBuffer => TraversableOnce[T]) =
new Scheme[T] {
override def decode(bytes: Array[Byte]) = decodeFn(bytes)
override def decode(bytes: ByteBuffer) = decodeFn(bytes)
}
}

trait Scheme[+T] extends MultiScheme with Serializable { self =>
/**
* This is the only method you're required to implement.
*/
def decode(bytes: Array[Byte]): TraversableOnce[T]
def decode(bytes: ByteBuffer): TraversableOnce[T]

def handle(t: Throwable): TraversableOnce[T] = {
// We assume this is rare enough that the perf hit of
Expand All @@ -54,7 +55,7 @@ trait Scheme[+T] extends MultiScheme with Serializable { self =>
def withHandler[U >: T](fn: Throwable => TraversableOnce[U]): Scheme[U] =
new Scheme[U] {
override def handle(t: Throwable) = fn(t)
override def decode(bytes: Array[Byte]) = self.decode(bytes)
override def decode(bytes: ByteBuffer) = self.decode(bytes)
}

def filter(fn: T => Boolean): Scheme[T] =
Expand All @@ -76,7 +77,7 @@ trait Scheme[+T] extends MultiScheme with Serializable { self =>
else
null

override def deserialize(bytes: Array[Byte]) =
override def deserialize(bytes: ByteBuffer) =
try {
toJava(decode(bytes))
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package com.twitter.tormenta.spout

import backtype.storm.spout.SpoutOutputCollector
import backtype.storm.task.TopologyContext
import backtype.storm.topology.base.BaseRichSpout
import backtype.storm.topology.IRichSpout
import backtype.storm.topology.OutputFieldsDeclarer
import backtype.storm.tuple.{ Fields, Values }
import backtype.storm.utils.Time
import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.topology.IRichSpout
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.tuple.{ Fields, Values }
import org.apache.storm.utils.Time
import java.util.{ Map => JMap }

trait BaseSpout[+T] extends BaseRichSpout with Spout[T] { self =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ limitations under the License.

package com.twitter.tormenta.spout

import backtype.storm.topology.IRichSpout
import com.twitter.tormenta.scheme.Scheme
import backtype.storm.task.TopologyContext
import org.apache.storm.task.TopologyContext

/**
* Spout that performs a flatMap operation on its contained
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@

package com.twitter.tormenta.spout

import backtype.storm.task.TopologyContext
import backtype.storm.metric.api.IMetric

import java.io.Serializable
import org.apache.storm.task.TopologyContext
import org.apache.storm.metric.api.IMetric

/**
* Abstraction for encapsulating metric options
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.twitter.tormenta.spout

import backtype.storm.spout.SpoutOutputCollector
import backtype.storm.task.TopologyContext
import backtype.storm.topology.IRichSpout
import backtype.storm.topology.OutputFieldsDeclarer
import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.IRichSpout
import org.apache.storm.topology.OutputFieldsDeclarer
import com.twitter.tormenta.Externalizer
import java.io.Serializable
import java.util.{ Map => JMap }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.twitter.tormenta.spout

import backtype.storm.topology.IRichSpout
import org.apache.storm.topology.IRichSpout
import com.twitter.tormenta.scheme.Scheme
import backtype.storm.task.TopologyContext
import org.apache.storm.task.TopologyContext

trait SchemeSpout[+T] extends BaseSpout[T] { self =>
/**
Expand Down
Loading

0 comments on commit 3397c7d

Please sign in to comment.