Skip to content

Commit

Permalink
misc refactors and name changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
fedesilva committed Jun 29, 2013
1 parent 17fe04d commit d709cc2
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 54 deletions.
14 changes: 7 additions & 7 deletions src/main/scala/seshat/plugin/Plugin.scala
Expand Up @@ -26,11 +26,11 @@ object InputPlugin {
* An input plugin must start consuming input ONLY when the `Start` message is received and
* must stop when `Stop` is received.
*
* An input plugin must send the `Event`s via [[seshat.processor.Processor.Common.Events]] messages with no more events than what
* An input plugin must send the `Event`s via [[seshat.processor.Processor.Internal.Batch]] messages with no more events than what
* [[seshat.SeshatConfig.queueSize]] indicates.
*
* Start means reading the input or accepting connections and send `Events` to the parent.
* Stops means stop reading but keep sending the remaining events when asked; no Events should be dropped.
* Start means reading the input or accepting connections and send `Batch` to the parent.
* Stops means stop reading but keep sending the remaining events when asked; no Batch should be dropped.
*
* Input plugins should never `context.stop` themselves.
*
Expand Down Expand Up @@ -97,22 +97,22 @@ abstract class OutputPlugin(val config: PluginConfig)
case Processor.Msg.Start => start()
case Processor.Msg.Stop => stop()

case Processor.Common.Events(events) =>
case Processor.Internal.Batch(events) =>
log.debug(s"Received ${events.size} events")
if ( events.size > 0 ) {
resetRetries()
performOutput(events)
scheduleAsk(context.parent, Processor.Common.GetEvents)
scheduleAsk(context.parent, Processor.Internal.NextBatch)
} else {
scheduleAsk(context.parent, Processor.Common.GetEvents)
scheduleAsk(context.parent, Processor.Internal.NextBatch)
}

}

def receive: Receive = defaultHandler

protected def start() {
scheduleAsk(context.parent, Processor.Common.GetEvents)
scheduleAsk(context.parent, Processor.Internal.NextBatch)
}

protected def stop() {}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/seshat/plugin/input/Dummy.scala
Expand Up @@ -6,7 +6,7 @@ import java.util.Date

/** This input plugins exists only for testing purposes.
*
* One can send it `Events` or `Any` and it will send them to the parent.
* One can send it `Batch` or `Any` and it will send them to the parent.
* `Any`s are previously wrapped in an Event.
*
*/
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/seshat/plugin/input/File.scala
@@ -1,11 +1,10 @@
package seshat.plugin.input

import seshat.plugin.{PluginConfig, InputPlugin}
import java.util.{Date, Scanner}
import concurrent.duration._
import seshat.processor.Processor.Common.Events
import java.util.Date
import seshat.processor.Processor.Internal.Batch
import seshat.Event
import scala.concurrent.{Future, blocking}
import scala.concurrent.blocking
import akka.actor.ActorRef
import java.io.RandomAccessFile
import seshat.processor.AskAgainProtocol
Expand Down Expand Up @@ -36,7 +35,7 @@ class File(config:PluginConfig)
case Moar if started =>
log.debug("Got Moar Msg")
val parent = context.parent // fix reference
Future {
blocking {
var i = 0
var lines = List[String]()
while(i<batchSize){
Expand All @@ -61,7 +60,7 @@ class File(config:PluginConfig)
Event( line, "file", (new Date()).getTime, fieldsToAdd )
}

parent ! Events( events.reverse )
parent ! Batch( events.reverse )

log.debug("Rescheduling Moar")
scheduleAsk(self,Moar)
Expand All @@ -77,6 +76,7 @@ class File(config:PluginConfig)

override def postStop() {
file.close()
log.debug("Closed")
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/seshat/plugin/input/Stdin.scala
Expand Up @@ -3,7 +3,7 @@ package seshat.plugin.input
import seshat.plugin.{PluginConfig, InputPlugin}
import java.util.{Date, Scanner}
import concurrent.duration._
import seshat.processor.Processor.Common.Events
import seshat.processor.Processor.Internal.Batch
import seshat.Event
import concurrent.blocking
import akka.actor.ActorRef
Expand Down Expand Up @@ -48,7 +48,7 @@ class Stdin(config:PluginConfig) extends InputPlugin(config) {
val event = Event( sc.nextLine(), "stdin", (new Date()).getTime )

log.debug(s"Sending Event $event")
parent ! Events( Seq(event) )
parent ! Batch( Seq(event) )

log.debug("Rescheduling Moar")
scheduler.scheduleOnce(5 milli, self, Moar)
Expand Down
20 changes: 10 additions & 10 deletions src/main/scala/seshat/processor/FilterHandler.scala
Expand Up @@ -9,7 +9,7 @@ import scala.concurrent.Future
import scala.util.{Failure, Success}

/** Composes together a set of filter functions and asks for events from the input.
* It maintains a queue of already filtered events and responds to GetEvents by sending
* It maintains a queue of already filtered events and responds to NextBatch by sending
* those events back.
*/
class FilterHandler( val input: ActorRef, val config: SeshatConfig, val descriptors: Seq[PluginDescriptor] )
Expand All @@ -35,22 +35,22 @@ class FilterHandler( val input: ActorRef, val config: SeshatConfig, val descript

case Processor.Msg.Start =>
log.debug("Starting")
input ! Processor.Common.GetEvents
input ! Processor.Internal.NextBatch

case Processor.Common.Events(events) =>
case Processor.Internal.Batch(events) =>
log.debug(s"Received ${events.size} events")
if (events.size > 0) runFilters(events)
else scheduleAsk(input, Processor.Common.GetEvents)
else scheduleAsk(input, Processor.Internal.NextBatch)

case Processor.Common.GetEvents =>
log.debug(s"Got GetEvents from ${sender}")
case Processor.Internal.NextBatch =>
log.debug(s"Got NextBatch from ${sender}")
log.debug(s"Available events ${filteredEvents.size}")
if( ! filteredEvents.isEmpty) {
sendEvents(sender)
}
else {
sender ! Processor.Common.Events(Seq.empty)
scheduleAsk(input, Processor.Common.GetEvents)
sender ! Processor.Internal.Batch(Seq.empty)
scheduleAsk(input, Processor.Internal.NextBatch)
}


Expand All @@ -64,7 +64,7 @@ class FilterHandler( val input: ActorRef, val config: SeshatConfig, val descript
case Msg.Filtered(e) =>
filteredEvents enqueue e
if (filteredEvents.isEmpty) {
input ! Processor.Common.GetEvents
input ! Processor.Internal.NextBatch
}

}
Expand Down Expand Up @@ -103,7 +103,7 @@ class FilterHandler( val input: ActorRef, val config: SeshatConfig, val descript
(1 to size).map( _ => filteredEvents.dequeue() )

log.debug(s"Sending ${events.size} events to $who")
who ! Processor.Common.Events(events)
who ! Processor.Internal.Batch(events)

}

Expand Down
21 changes: 9 additions & 12 deletions src/main/scala/seshat/processor/InputHandler.scala
Expand Up @@ -38,22 +38,22 @@ class InputHandler( val config: SeshatConfig, val descriptors: Seq[PluginDescrip
log.debug("Stopping inputs")
inputs foreach ( _ ! Processor.Msg.Stop )

case Processor.Common.Events(es) =>
case Processor.Internal.Batch(es) =>
// FIXME Check the soft limit.
log.debug(s"Got Processor.Common.Events(${es.size})")
log.debug(s"Got Processor.Internal.Batch(${es.size})")
receivedEvents.enqueue(es : _*)
if( receivedEvents.size > config.queueSize ) {
inputs foreach { _ ! Processor.Msg.Stop }
if( receivedEvents.size > config.queueSize*1.5 ) {
//FIXME use statistics to determine throttle time
inputs foreach { _ ! InputPlugin.Msg.Throttle(Some(100)) }
}
log.debug(s"Received events queue size ${receivedEvents.size}")

case Processor.Common.GetEvents =>
log.debug(s"Got Processor.Common.GetEvents from $sender")
if( receivedEvents.size > 0 ) sendEvents(sender)
else sender ! Processor.Common.Events(Seq.empty)
case Processor.Internal.NextBatch =>
log.debug(s"Got Processor.Internal.NextBatch from $sender")
if( receivedEvents.size > 0 )
sendEvents(sender)
else
sender ! Processor.Internal.Batch(Seq.empty)

}

Expand All @@ -79,11 +79,8 @@ class InputHandler( val config: SeshatConfig, val descriptors: Seq[PluginDescrip
(1 to size).map( _ => receivedEvents.dequeue() )

log.debug(s"Will send ${events.size} events")
who ! Processor.Common.Events(events)
who ! Processor.Internal.Batch(events)

if( receivedEvents.size < config.queueSize ) {
inputs foreach { _ ! Processor.Msg.Start }
}

}

Expand Down
22 changes: 11 additions & 11 deletions src/main/scala/seshat/processor/OutputHandler.scala
Expand Up @@ -39,34 +39,34 @@ class OutputHandler( val filter: ActorRef, val config: SeshatConfig, val descrip
case Processor.Msg.Start =>
log.debug("Starting")
outputs foreach ( _ ! Processor.Msg.Start )
scheduleAsk(filter, Processor.Common.GetEvents)
scheduleAsk(filter, Processor.Internal.NextBatch)

case Processor.Msg.Stop =>
log.debug("Stopping")
outputs foreach ( _ ! Processor.Msg.Stop )

case Processor.Common.Events(events) =>
log.debug(s"Got Events with ${events.size} events")
case Processor.Internal.Batch(events) =>
log.debug(s"Got Batch with ${events.size} events")
if (events.size > 0) {
resetRetries()
storeEvents(events)
if ( storeAvailable ) {
filter ! Processor.Common.GetEvents
filter ! Processor.Internal.NextBatch
}
}
else scheduleAsk(filter, Processor.Common.GetEvents)
else scheduleAsk(filter, Processor.Internal.NextBatch)

case Processor.Common.GetEvents =>
log.debug(s"Got GetEvents from $sender" )
case Processor.Internal.NextBatch =>
log.debug(s"Got NextBatch from $sender" )
if( stashedEvents(sender).size > 0 ) {
sendEvents(sender)
if ( storeAvailable ) {
filter ! Processor.Common.GetEvents
filter ! Processor.Internal.NextBatch
}
}
else {
sender ! Processor.Common.Events(Seq.empty)
scheduleAsk(filter, Processor.Common.GetEvents)
sender ! Processor.Internal.Batch(Seq.empty)
scheduleAsk(filter, Processor.Internal.NextBatch)
}

}
Expand Down Expand Up @@ -107,7 +107,7 @@ class OutputHandler( val filter: ActorRef, val config: SeshatConfig, val descrip

stashedEvents.put(who,remaining)

who ! Processor.Common.Events(nextBatch)
who ! Processor.Internal.Batch(nextBatch)

}

Expand Down
11 changes: 6 additions & 5 deletions src/main/scala/seshat/processor/Processor.scala
Expand Up @@ -20,13 +20,14 @@ object Processor {
case object Stop // Tell everyone to stop
}

object Common {
object Internal {

/** Used to ask for more events */
case object GetEvents
/** This message forces the sending of a GetEvents message to the input */
case object NextBatch
/** This message forces the sending of a NextBatch message. */
case class AskAgain(who: ActorRef, what: Any)
/** Used to reply to GetEvents */
case class Events(events: Seq[Event])
/** Used to reply to NextBatch */
case class Batch(events: Seq[Event])
/** Who's queue is full, notify downstream. */
case class Choked(who:ActorRef)

Expand Down

0 comments on commit d709cc2

Please sign in to comment.