Permalink
Browse files

Removing window plugin for the time being (in its current state it's …

…more

confusing than helpful although it should go in again eventually).
  • Loading branch information...
aschmolck committed Sep 15, 2009
1 parent dcb6255 commit c56f06836e87be50c0b5393aa6edb73d372c1a3b
View
@@ -1,20 +0,0 @@
-sources=$(shell find src -type f -name "*.scala") pom.xml
-jarfile=window-1.0.jar
-
-all: lib/$(jarfile)
-
-clean:
- rm -rf target
- rm -f .libs-done
- rm -rf lib
-
-target/$(jarfile): $(sources)
-ifeq ($(TEST),false)
- mvn package -Dmaven.test.skip=true
-else
- mvn package
-endif
-
-lib/$(jarfile): target/$(jarfile)
- mkdir -p lib
- cp target/*.jar lib/
View
@@ -1,19 +0,0 @@
-{
- "name": "Window",
- "author": {"name": "Alexander Schmolck", "email": "alexander@lshift.net"},
- "type": "plugin-specification",
- "harness": "java",
- "subtype": "pipeline_component",
-
- "global_configuration_specification": [],
- "configuration_specification": [
- {"name": "timeout", "label": "timeout", "type": "float"},
- {"name": "unit", "label": "unit", "type": "string"},
- {"name": "count", "label": "count", "type": "number"},
- {"name": "overlap", "label": "overlap", "type": "int"},
- {"name": "encoding", "label": "encoding", "type": "string"}
- ],
- "inputs_specification": [{"name": "input", "label": "Input"}],
- "outputs_specification": [{"name": "output", "label": "Output"}],
- "database_specification": {}
-}
View
@@ -1,123 +0,0 @@
-<?xml version="1.0"?><project>
- <parent>
- <artifactId>streams-plugins</artifactId>
- <groupId>com.rabbitmq.streams.plugins</groupId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <name>Window plugin</name>
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.rabbitmq.streams.plugins</groupId>
- <artifactId>window</artifactId>
- <version>1.0-SNAPSHOT</version>
- <inceptionYear>2008</inceptionYear>
- <build>
- <sourceDirectory>src/main/scala</sourceDirectory>
- <testSourceDirectory>src/test/scala</testSourceDirectory>
- <plugins>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <scalaVersion>${scala.version}</scalaVersion>
- <args>
- <arg>-target:jvm-1.5</arg>
- </args>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-eclipse-plugin</artifactId>
- <configuration>
- <downloadSources>true</downloadSources>
- <buildcommands>
- <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
- </buildcommands>
- <additionalProjectnatures>
- <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
- </additionalProjectnatures>
- <classpathContainers>
- <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
- <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
- </classpathContainers>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>copy</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>lib/</outputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <repositories>
- <repository>
- <id>scala-tools.org</id>
- <name>Scala-Tools Maven2 Repository</name>
- <url>http://scala-tools.org/repo-releases</url>
- </repository>
- </repositories>
- <pluginRepositories>
- <pluginRepository>
- <id>scala-tools.org</id>
- <name>Scala-Tools Maven2 Repository</name>
- <url>http://scala-tools.org/repo-releases</url>
- </pluginRepository>
- </pluginRepositories>
- <dependencies>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.4</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scala-tools.testing</groupId>
- <artifactId>specs</artifactId>
- <version>1.5.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.rabbitmq.streams</groupId>
- <artifactId>plugin</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- </dependencies>
- <reporting>
- <plugins>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <configuration>
- <scalaVersion>${scala.version}</scalaVersion>
- </configuration>
- </plugin>
- </plugins>
- </reporting>
- <properties>
- <scala.version>2.7.5</scala.version>
- </properties>
-</project>
-
-
@@ -1,130 +0,0 @@
-package com.rabbitmq.streams.plugins.window
-
-import scala.actors._
-import Actor._
-import Math._
-
-import net.sf.json.{JSONObject, JSONArray, JSONNull}
-
-import com.rabbitmq.streams.harness.{PipelineComponent, InputReader, InputMessage,
- NotificationType, PluginException}
-
-/**
- * A sliding window with timeout and different ways to measure window length.
- */
-class WindowPlugin() extends PipelineComponent() {
- //FIXME(alexander) cut'n pasted hack
- def unpickleBody(body: Object): Array[Byte] = {
- JSONArray.toArray(JSONArray.fromObject(body)).asInstanceOf[Array[Object]].map(
- _.asInstanceOf[Int].toByte)
- }
- class ByteBasedBatcher(config:JSONObject) extends Actor{
- object Fullness extends Enumeration {
- val NOT_FULL, JUST_FULL, OVER_FULL, TWICE_FULL, IMPOSSIBLY_FULL = Value}
-
- val count = config.getInt("count")
- val overlap = config.getInt("overlap")
- val timeout = if (! JSONNull.getInstance().equals(config.opt("timeout"))) (config.getDouble("timeout")*1000).asInstanceOf[Long]
- else java.lang.Long.MAX_VALUE
- val encoding: Seq[Char] = config.getString("encoding")
- val sep:String = encoding match {
- case Seq('u','t','f','-','8','-','s','e','p',':', rest@_*) => rest.mkString
- }
- val encodingOverhead:Long = sep.getBytes("utf-8").size
-
- var state: java.util.Map[java.lang.String,java.lang.Object] = getState()
- var (deadline:Long, msgs:List[InputMessage]) =
- if (! state.isEmpty) {
- (state.get("deadline").asInstanceOf[Long],
- List()) // FIXME
- }
- else {
- val dl = Math.max(timeout, timeout + System.currentTimeMillis)
- state.put("deadline", dl.asInstanceOf[Object])
- (dl, List())
- }
-
- // make totalSize for [] == -encodingOverhead to simplify things
- var totalSize:Long = (-encodingOverhead /: msgs){encodingOverhead+_+_.body.size}
- def act() {
- loop {
- receiveWithin(Math.max(0, deadline - System.currentTimeMillis)) {
- case msg: InputMessage => {
- msgs = msg::msgs
- howFullAfter(msg) match {
- case Fullness.JUST_FULL =>
- val (scala, sucks) = ship(true, msgs); msgs = scala; totalSize = sucks
- case Fullness.OVER_FULL =>
- val (scala, sucks) = ship(false, msgs); msgs = scala; totalSize = sucks
- case Fullness.TWICE_FULL =>
- ship(true, msgs.tail)
- val (scala, sucks) = ship(true, List(msgs.head)); msgs = scala; totalSize = sucks
- case Fullness.IMPOSSIBLY_FULL =>
- notifier.notify(NotificationType.BadData, "Message length(" + msg.body.size +
- ") exceeds window count(" + count + ")")
- throw new PluginException("Illegal (too large) input msg for window plugin.")
-
- case _ =>
- totalSize += msg.body.size + encodingOverhead
-// System.err.println("Not full yet: " + totalSize)
- ; //FIXME why doesn't scala give me a !@#$! exhaustiveness check?
- }
-// System.err.println("###DEBUG about to reply msgs:" + msgs + " ")
- reply()
- }
- case TIMEOUT =>
- System.err.println("###DEBUG timeout: " + (deadline - System.currentTimeMillis))
- if (! msgs.isEmpty) { // FIXME to do otherwise seems impossible w/ current plugin API
- System.err.println("###DEBUG deadline ship")
- val (scala, sucks) = ship(true, msgs); msgs = scala; totalSize = sucks
- //FIXME(alexander) update state
- }
- deadline = Math.max(timeout, timeout + System.currentTimeMillis)
- state.put("deadline", deadline.asInstanceOf[Object])
- setState(state)
- reply()
-
- }
-
- }
- }
- def howFullAfter(msg:InputMessage) = {
-// System.err.println("totalSize " + totalSize + " body.size " + msg.body.size +
-// " oh " + encodingOverhead + " msg " + msg.bodyAsString)
- val msgSize:Long = msg.body.size
- (totalSize + msgSize + encodingOverhead) match {
- case size if size > count && msgSize > count => Fullness.IMPOSSIBLY_FULL
- case size if size > count && msgSize == count => Fullness.TWICE_FULL
- case size if size > count => Fullness.OVER_FULL
- case size if size == count => Fullness.JUST_FULL
- case _ => Fullness.NOT_FULL
- }
- }
- def ship(withHead:Boolean, msgs:List[InputMessage]) = {
-// System.err.println("About to ship " + withHead)
- val (msgsToSend:List[InputMessage], newMsgs:List[InputMessage], newSize:Long) =
- if (withHead) (msgs, List():List[InputMessage], -encodingOverhead)
- else (msgs.tail, List(msgs.head), msgs.head.body.size.asInstanceOf[Long])
- val concatedBodies = (msgsToSend.map((msg:InputMessage) => msg.bodyAsString).
- reverse.mkString(sep))
- publishToChannel("output", msgsToSend.head.withBody(concatedBodies))
-// System.err.println("published " + concatedBodies)
-// state.put("msgs", msgs.map(_.toJson))
- setState(state)
-// System.err.println("state set " + state)
- (newMsgs, newSize)
- }
- }
- override def configure(config : JSONObject) {
- val batcher = config.getString("unit") match {
- case "B" | "bytes" => new ByteBasedBatcher(config)
- }
- batcher.start
- object input extends InputReader {
- override def handleMessage(msg : InputMessage) {
- batcher !? msg // *synchronous* to retain transactionality
- }
- }
- registerInput("input", input)
- }
-}
@@ -1,4 +0,0 @@
-import com.rabbitmq.streams.plugins.window.WindowPlugin
-
-class window() extends WindowPlugin() {
-}
@@ -1,90 +0,0 @@
-
-
-package com.rabbitmq.streams.plugins.window
-
-import net.sf.json._
-import com.rabbitmq.streams.harness.{MessageChannel, InputHandler}
-
-import org.specs._
-import org.specs.mock.Mockito
-import org.mockito.Matchers._
-import org.mockito.Matchers
-import org.specs.runner.{ConsoleRunner, JUnit4}
-import com.rabbitmq.streams.harness.testsupport.MockMessageChannel
-import com.rabbitmq.streams.harness.InputMessage
-import com.rabbitmq.streams.harness.StateResource
-import com.rabbitmq.streams.harness.PluginBuildException
-import com.rabbitmq.streams.harness.PipelineComponent
-import com.rabbitmq.streams.harness.Logger
-import com.rabbitmq.streams.harness.{Notifier, NotificationType}
-
-class WindowTest extends JUnit4(WindowSpec)
-//class MySpecSuite extends ScalaTestSuite(WindowSpec)
-object MySpecRunner extends ConsoleRunner(WindowSpec)
-
-
-object WindowSpec extends Specification with Mockito {
-
- def fullyMock(plugin : PipelineComponent) {
- plugin.setLog(mock[Logger])
- plugin.setMessageChannel(mock[MessageChannel])
- plugin.setNotifier(mock[Notifier])
- plugin.setStateResource(mock[StateResource])
- }
-
- "Window" should {
-
- val win = new WindowPlugin
- fullyMock(win)
- val config = JSONObject.fromObject(
- """{"timeout": null, "count":7,
- "overlap":0, "unit":"bytes", "encoding":"utf-8-sep:."}""")
-
- "be nullary-constructable" in {
- win must notBeNull
- }
-
- "not complain given OK values" in {
- win.configure(config)
- }
-
- //XXX Actually I don't care about testing this here, that should be handled
- //in the plugin.js veryfier
-// "complain about a nonsense timeout value" in {
-// val c = JSONObject.fromObject(config)
-// c.put("timeout", "zappa")
-// win.configure(c) must throwA[PluginBuildException]
-// }
-
- "register an input handler" in {
- win must notBeNull
- val mc = mock[MessageChannel]
- win.setMessageChannel(mc)
- win.configure(config)
- mc.consume(Matchers.eq("input"), any(classOf[InputHandler])) was called
- }
-
- //FIXME(alexander): figure out why the test below fails
-
- // "publish full message onward" in {
- // val mc = new MockMessageChannel()
- // win.setMessageChannel(mc)
- // win.configure(config)
- // val m = mock[InputMessage]
- // m.body returns "1234567".getBytes("utf-8")
- // m.bodyAsString returns "1234567"
- // System.err.println("###DEBUG" + m.bodyAsString)
- // mc.inject("input", m)
- // mc.outputs.size() must_== 1
- // val out = mc.outputs.get(0)
- // System.err.println("###DEBUG"+out.msg)
- // out.channel must_== "output"
- // out.msg must_== m
- // }
-
- "" in {
-
- }
-
- }
-}
@@ -1,7 +0,0 @@
-# -*- encoding: utf-8 -*-
->PLUGIN_INSTANCE_CONFIG {"timeout": null, "count":9, "encoding": "utf-8-sep:|||", "overlap":0, "unit": "B"}
->input 123
-# one utf-8 character but three bytes
->input 三
-
-<output 123|||
Oops, something went wrong.

0 comments on commit c56f068

Please sign in to comment.