Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use chill.ClosureCleaner to remove outer$ refs on Functions #273

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 48 additions & 3 deletions src/main/scala/com/twitter/scalding/Operations.scala
Expand Up @@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration

import com.esotericsoftware.kryo.Kryo;
import com.twitter.chill.ClosureCleaner

object CascadingUtils {
def flowProcessToConfiguration(fp : FlowProcess[_]) : Configuration = {
Expand Down Expand Up @@ -54,6 +55,9 @@ import CascadingUtils.kryoFor
conv : TupleConverter[S], set : TupleSetter[T])
extends BaseOperation[Any](fields) with Function[Any] {

//Clean the outer$ refs off if they are not accessed
ClosureCleaner(fn)

def operate(flowProcess : FlowProcess[_], functionCall : FunctionCall[Any]) {
fn(conv(functionCall.getArguments)).foreach { arg : T =>
val this_tup = set(arg)
Expand All @@ -66,6 +70,9 @@ import CascadingUtils.kryoFor
conv : TupleConverter[S], set : TupleSetter[T])
extends BaseOperation[Any](fields) with Function[Any] {

//Clean the outer$ refs off if they are not accessed
ClosureCleaner(fn)

def operate(flowProcess : FlowProcess[_], functionCall : FunctionCall[Any]) {
val res = fn(conv(functionCall.getArguments))
functionCall.getOutputCollector.add(set(res))
Expand All @@ -80,15 +87,19 @@ import CascadingUtils.kryoFor
ef: C => Unit, // end function to clean up context object
fields: Fields
) extends BaseOperation[C](fields) {

//Clean the outer$ refs off if they are not accessed
ClosureCleaner(ef)

override def prepare(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
operationCall.setContext(bf)
}

override def cleanup(flowProcess: FlowProcess[_], operationCall: OperationCall[C]) {
ef(operationCall.getContext)
}
}

/*
* A map function that allows state object to be set up and tear down.
*/
Expand All @@ -101,6 +112,9 @@ import CascadingUtils.kryoFor
set: TupleSetter[T]
) extends SideEffectBaseOperation[C](bf, ef, fields) with Function[C] {

//Clean the outer$ refs off if they are not accessed
ClosureCleaner(fn)

override def operate(flowProcess: FlowProcess[_], functionCall: FunctionCall[C]) {
val context = functionCall.getContext
val s = conv(functionCall.getArguments)
Expand All @@ -121,6 +135,9 @@ import CascadingUtils.kryoFor
set: TupleSetter[T]
) extends SideEffectBaseOperation[C](bf, ef, fields) with Function[C] {

//Clean the outer$ refs off if they are not accessed
ClosureCleaner(fn)

override def operate(flowProcess: FlowProcess[_], functionCall: FunctionCall[C]) {
val context = functionCall.getContext
val s = conv(functionCall.getArguments)
Expand All @@ -129,6 +146,9 @@ import CascadingUtils.kryoFor
}

class FilterFunction[T](fn : T => Boolean, conv : TupleConverter[T]) extends BaseOperation[Any] with Filter[Any] {
//Clean the outer$ refs off if they are not accessed
ClosureCleaner(fn)

def isRemove(flowProcess : FlowProcess[_], filterCall : FilterCall[Any]) = {
!fn(conv(filterCall.getArguments))
}
Expand All @@ -140,6 +160,9 @@ import CascadingUtils.kryoFor
conv : TupleConverter[T], set : TupleSetter[X])
extends BaseOperation[X](fields) with Aggregator[X] {

//Clean the outer$ refs off if they are not accessed
ClosureCleaner(fn)

def start(flowProcess : FlowProcess[_], call : AggregatorCall[X]) {
val deepCopyInit = kryoFor(flowProcess).copy(init)
call.setContext(deepCopyInit)
Expand All @@ -166,6 +189,12 @@ import CascadingUtils.kryoFor
class MRMAggregator[T,X,U](fsmf : T => X, rfn : (X,X) => X, mrfn : X => U, fields : Fields,
conv : TupleConverter[T], set : TupleSetter[U])
extends BaseOperation[Tuple](fields) with Aggregator[Tuple] {

//Clean the outer$ refs off if they are not accessed
ClosureCleaner(fsmf)
ClosureCleaner(rfn)
ClosureCleaner(mrfn)

// The context is a singleton Tuple, which is mutable so
// we don't have to allocate at every step of the loop:
def start(flowProcess : FlowProcess[_], call : AggregatorCall[Tuple]) {
Expand Down Expand Up @@ -264,6 +293,10 @@ import CascadingUtils.kryoFor
conv : TupleConverter[T], set : TupleSetter[X])
extends FoldFunctor[X](fields) {

//Clean the outer$ refs off if they are not accessed
ClosureCleaner(mrfn)
ClosureCleaner(rfn)

override def first(args : TupleEntry) : X = mrfn(conv(args))
override def subsequent(oldValue : X, newArgs : TupleEntry) = {
val right = mrfn(conv(newArgs))
Expand All @@ -287,12 +320,21 @@ import CascadingUtils.kryoFor
endSet : TupleSetter[U]) extends AggregateBy(
arguments,
new MRMFunctor[T,X](mfn, rfn, middleFields, startConv, midSet),
new MRMAggregator[X,X,U](args => args, rfn, mfn2, declaredFields, midConv, endSet))
new MRMAggregator[X,X,U](args => args, rfn, mfn2, declaredFields, midConv, endSet)) {

//Clean the outer$ refs off if they are not accessed
ClosureCleaner(mfn)
ClosureCleaner(rfn)
ClosureCleaner(mfn2)
}

class BufferOp[I,T,X](init : I, iterfn : (I, Iterator[T]) => TraversableOnce[X], fields : Fields,
conv : TupleConverter[T], set : TupleSetter[X])
extends BaseOperation[Any](fields) with Buffer[Any] {

//Clean the outer$ refs off if they are not accessed
ClosureCleaner(iterfn)

def operate(flowProcess : FlowProcess[_], call : BufferCall[Any]) {
val deepCopyInit = kryoFor(flowProcess).copy(init)
val oc = call.getOutputCollector
Expand All @@ -314,6 +356,9 @@ import CascadingUtils.kryoFor
set: TupleSetter[X]
) extends SideEffectBaseOperation[C](bf, ef, fields) with Buffer[C] {

//Clean the outer$ refs off if they are not accessed
ClosureCleaner(iterfn)

def operate(flowProcess : FlowProcess[_], call : BufferCall[C]) {
val deepCopyInit = kryoFor(flowProcess).copy(init)
val context = call.getContext
Expand Down