Skip to content

Commit

Permalink
Updating README, adding wrappers for main scalding classes with addit…
Browse files Browse the repository at this point in the history
…ional functionality
  • Loading branch information
Kian Wilcox committed Aug 8, 2012
1 parent ac2eee8 commit 8ddac87
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
4 changes: 3 additions & 1 deletion README.md
@@ -1,9 +1,11 @@
hbase-scalding
==============

Project template for anyone wanting to write scalding jobs while keeping scalding as a dependency, instead of mixing in their own files into the scalding project. Also includes minimal support for HBase sources and sinks.
Project template for anyone wanting to write scalding jobs while keeping scalding as a dependency, instead of mixing in their own files into the scalding project. Also includes minimal support for HBase sources and sinks, and a few extensions to the pipe model we've found to be useful at StumbleUpon.

Simply add a file in src/main/scala/jobs, import com.twitter.scalding._, and make sure the class inside extends Job.
If you'd like to use our extensions, import com.stumbleupon.scalding.extensions._ ,
and import com.stumbleupon.scalding.extensions.ScaldingWrapperConversions._ in your Job extension.

To run on hadoop, first run sbt assembly in the main project directory.
Then copy it to whatever machine you have hadoop running on.
Expand Down
@@ -0,0 +1,59 @@
package com.stumbleupon.scalding.extensions

import scala.util.matching.Regex
import cascading.pipe.Pipe
import cascading.tuple.Fields
import com.twitter.scalding._
import cascading.flow._

class PipeWrapper(input:Pipe) extends java.io.Serializable {
import Dsl._
import ScaldingWrapperConversions._

def pipe = input


// groups by the given fields, discarding any groups which match the predicate
def discardGroupsWhere[A](f:Fields)(fn: A => Boolean)
(implicit conv : TupleConverter[A]) : RichPipe = {
conv.assertArityMatches(f)
input.joinWithSmaller((f -> f),
input.groupBy(f) { _.count((f -> new Fields('__count__.name)))(fn)}.filter(new Fields('__count__.name)) {
count: Long => count == 0
})
}

// groups by the given fields, discarding any groups which do not match the predicate
def filterGroupsWhere[A](f:Fields)(fn: A => Boolean)
(implicit conv : TupleConverter[A]) : RichPipe = {
conv.assertArityMatches(f)
input.joinWithSmaller((f -> f),
input.groupBy(f) { _.count((f -> new Fields('__count__.name)))(fn)}.filter(new Fields('__count__.name)) {
count: Long => count > 0
})
}

}

class GroupBuilderWrapper(val group:GroupBuilder) {

}

class JobWrapper(val job:Job) {

}

class StumbleJob(args:Args) extends Job(args) {


}

object ScaldingWrapperConversions {
import Dsl._
implicit def inputToPipeWrapper(input:Pipe) = new PipeWrapper(input)
implicit def pipeWrapperToPipe(input:PipeWrapper) = input.pipe
implicit def wrapGroupBuilder(group:GroupBuilder) = new GroupBuilderWrapper(group)
implicit def groupBuilderWrapperToGroup(group:GroupBuilderWrapper) = group.group
implicit def wrapJob(job:Job) = new JobWrapper(job)
implicit def jobWrapperToJob(job:JobWrapper) = job.job
}

0 comments on commit 8ddac87

Please sign in to comment.