Skip to content

Commit

Permalink
Merge pull request #257 from tresata/feat-context
Browse files Browse the repository at this point in the history
Change type paramatrization of RichPipe.using and provide context for GroupBuilder
  • Loading branch information
johnynek committed Dec 21, 2012
2 parents d81d3d2 + 5423b70 commit 33d99e0
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 20 deletions.
26 changes: 26 additions & 0 deletions src/main/scala/com/twitter/scalding/GroupBuilder.scala
Expand Up @@ -323,6 +323,32 @@ class GroupBuilder(val groupFields : Fields) extends
* groupAll and groupRandomly.
*/
def pass : GroupBuilder = takeWhile(0) { (t: TupleEntry) => true }

/**
* begining of block with access to expensive nonserializable state. The state object should
* contain a function release() for resource management purpose.
*/
def using[C <: { def release() }](bf: => C) = new {

/**
* mapStream with state.
*/
def mapStream[T,X](fieldDef : (Fields,Fields))(mapfn : (C, Iterator[T]) => TraversableOnce[X])
(implicit conv : TupleConverter[T], setter : TupleSetter[X]) = {
val (inFields, outFields) = fieldDef
//Check arity
conv.assertArityMatches(inFields)
setter.assertArityMatches(outFields)

val b = new SideEffectBufferOp[Unit,T,C,X](
(), bf,
(u : Unit, c : C, it: Iterator[T]) => mapfn(c, it),
new Function1[C, Unit] with java.io.Serializable { def apply(c: C) { c.release() }},
outFields, conv, setter)
every(pipe => new Every(pipe, inFields, b, defaultMode(inFields, outFields)))
}
}

}

/**
Expand Down
60 changes: 42 additions & 18 deletions src/main/scala/com/twitter/scalding/Operations.scala
Expand Up @@ -72,6 +72,23 @@ import CascadingUtils.kryoFor
}
}

/*
* BaseOperation with support for context
*/
abstract class SideEffectBaseOperation[C] (
bf: => C, // begin function returns a context
ef: C => Unit, // end function to clean up context object
fields: Fields
) extends BaseOperation[C](fields) {
override def prepare(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
operationCall.setContext(bf)
}

override def cleanup(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
ef(operationCall.getContext)
}
}

/*
* A map function that allows state object to be set up and tear down.
*/
Expand All @@ -82,22 +99,14 @@ import CascadingUtils.kryoFor
fields: Fields,
conv: TupleConverter[S],
set: TupleSetter[T]
) extends BaseOperation[C](fields) with Function[C] {

override def prepare(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
operationCall.setContext(bf)
}
) extends SideEffectBaseOperation[C](bf, ef, fields) with Function[C] {

override def operate(flowProcess: FlowProcess[_], functionCall: FunctionCall[C]) {
val context = functionCall.getContext
val s = conv(functionCall.getArguments)
val res = fn(context, s)
functionCall.getOutputCollector.add(set(res))
}

override def cleanup(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
ef(operationCall.getContext)
}
}

/*
Expand All @@ -110,21 +119,13 @@ import CascadingUtils.kryoFor
fields: Fields,
conv: TupleConverter[S],
set: TupleSetter[T]
) extends BaseOperation[C](fields) with Function[C] {

override def prepare(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
operationCall.setContext(bf)
}
) extends SideEffectBaseOperation[C](bf, ef, fields) with Function[C] {

override def operate(flowProcess: FlowProcess[_], functionCall: FunctionCall[C]) {
val context = functionCall.getContext
val s = conv(functionCall.getArguments)
fn(context, s) foreach { t => functionCall.getOutputCollector.add(set(t)) }
}

override def cleanup(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
ef(operationCall.getContext)
}
}

class FilterFunction[T](fn : T => Boolean, conv : TupleConverter[T]) extends BaseOperation[Any] with Filter[Any] {
Expand Down Expand Up @@ -299,4 +300,27 @@ import CascadingUtils.kryoFor
iterfn(deepCopyInit, in).foreach { x => oc.add(set(x)) }
}
}

/*
* A buffer that allows state object to be set up and tear down.
*/
class SideEffectBufferOp[I,T,C,X](
init : I,
bf: => C, // begin function returns a context
iterfn: (I, C, Iterator[T]) => TraversableOnce[X],
ef: C => Unit, // end function to clean up context object
fields: Fields,
conv: TupleConverter[T],
set: TupleSetter[X]
) extends SideEffectBaseOperation[C](bf, ef, fields) with Buffer[C] {

def operate(flowProcess : FlowProcess[_], call : BufferCall[C]) {
val deepCopyInit = kryoFor(flowProcess).copy(init)
val context = call.getContext
val oc = call.getOutputCollector
val in = call.getArgumentsIterator.asScala.map { entry => conv(entry) }
iterfn(deepCopyInit, context, in).foreach { x => oc.add(set(x)) }
}
}

}
4 changes: 2 additions & 2 deletions src/main/scala/com/twitter/scalding/RichPipe.scala
Expand Up @@ -71,12 +71,12 @@ class RichPipe(val pipe : Pipe) extends java.io.Serializable with JoinAlgorithms
* begining of block with access to expensive nonserializable state. The state object should
* contain a function release() for resource management purpose.
*/
def using[A, C <: { def release() }](bf: => C) = new {
def using[C <: { def release() }](bf: => C) = new {

/**
* For pure side effect.
*/
def foreach(f: Fields)(fn: (C, A) => Unit)
def foreach[A](f: Fields)(fn: (C, A) => Unit)
(implicit conv: TupleConverter[A], set: TupleSetter[Unit], flowDef: FlowDef, mode: Mode) = {
conv.assertArityMatches(f)
val newPipe = new Each(pipe, f, new SideEffectMapFunction(bf, fn,
Expand Down
52 changes: 52 additions & 0 deletions src/test/scala/com/twitter/scalding/SideEffectTest.scala
Expand Up @@ -51,3 +51,55 @@ class SideEffectTest extends Specification with TupleConversions with FieldConve
.finish
}
}

/*
* ZipBuffer uses (unneccessary) side effect to construct zipped.
*/
class ZipBuffer(args : Args) extends Job(args) {

//import RichPipe._
def createState = new {
var lastLine: String = null
def release() {}
}

val zipped = Tsv("line",('line)).pipe
.map('line -> 'oddOrEven) { line : String => line.substring(line.length-1).toInt % 2 match {
case 0 => "even"
case 1 => "odd"
}}
.groupBy('oddOrEven) {
_.using { createState }
.mapStream('line -> ('l1, 'l2)) { (accu, iter : Iterator[String]) => {
accu.lastLine = iter.next()
for (line <- iter) yield {
val result = (accu.lastLine, line)
accu.lastLine = line
result
}
}}
}
.project('l1, 'l2)

zipped.write(Tsv("zipped"))
}

class SideEffectBufferTest extends Specification with TupleConversions with FieldConversions {
"ZipBuffer should do create two zipped sequences, one for even lines and one for odd lines. Coded with side effect" should {
JobTest("com.twitter.scalding.ZipBuffer")
.source(Tsv("line",('line)), List(Tuple1("line1"), Tuple1("line2"), Tuple1("line3"), Tuple1("line4"), Tuple1("line5"), Tuple1("line6")))
.sink[(String, String)](Tsv("zipped")) { ob =>
"correctly compute zipped sequence" in {
val res = ob.toList.sorted
val expected = List(("line1", "line3"), ("line3", "line5"), ("line2", "line4"), ("line4", "line6")).sorted
res.zip(expected) foreach {
case ((a, b), (c, d)) =>
a must be_== ( c )
b must be_== ( d )
}
}
}
.run
.finish
}
}

0 comments on commit 33d99e0

Please sign in to comment.