Skip to content

Commit

Permalink
Merge branch 'release/0.5.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Argyris Zymnis committed Apr 25, 2012
2 parents a54a5a3 + 4ec6a75 commit 6dedd17
Show file tree
Hide file tree
Showing 22 changed files with 1,776 additions and 109 deletions.
15 changes: 12 additions & 3 deletions CHANGES.md
@@ -1,9 +1,18 @@
# Scalding #

### Versions 0.5.0 ###
* ISSUE 67: Upgrade cascading to wip-281.
* ISSUE 66: Allow default time zone in DefaultDateRangeJob.
* ISSUE 65: Fixed the error message thrown by FileSource.validateTaps.
* ISSUE 62: Kryo Upgrade to 2.04
* ISSUE 60: Feature/abstract algebra
* ISSUE 52: Feature/cogroup builder
* ISSUE 51: Feature/headfix

### Version 0.4.1 ###
* ISSUE 42 Feature/iterable source
* ISSUE 41 Adds blockJoinWithSmaller to JoinAlgorithms.
* ISSUE 39 Adding default value to pivot
* ISSUE 42: Feature/iterable source
* ISSUE 41: Adds blockJoinWithSmaller to JoinAlgorithms.
* ISSUE 39: Adding default value to pivot

### Version 0.4.0 ###
* ISSUE 38: Fix bug with hash code collisions of Source objects
Expand Down
9 changes: 5 additions & 4 deletions README.md
Expand Up @@ -4,7 +4,7 @@ Scalding is a Scala library that makes it easy to write MapReduce jobs in Hadoop

Scalding is built on top of [Cascading](http://www.cascading.org/), a Java library that abstracts away much of the complexity of Hadoop.

Current version: 0.4.1
Current version: 0.5.0

## Word Count

Expand All @@ -20,12 +20,12 @@ class WordCountJob(args : Args) extends Job(args) {
.flatMap('line -> 'word) { line : String => tokenize(line) }
.groupBy('word) { _.size }
.write( Tsv( args("output") ) )

// Split a piece of text into individual words.
def tokenize(text : String) : Array[String] = {
// Lowercase each word and remove punctuation.
text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+")
}
}
}
```

Expand All @@ -49,7 +49,7 @@ You can find more example code under [examples/](https://github.com/twitter/scal
We use [Travis CI](http://travis-ci.org/) to verify the build:
[![Build Status](https://secure.travis-ci.org/twitter/scalding.png)](http://travis-ci.org/twitter/scalding)

The current version is 0.4.1 and is available from maven central: org="com.twitter",
The current version is 0.5.0 and is available from maven central: org="com.twitter",
artifact="scalding_2.8.1" or artifact="scalding_2.9.1".

## Contact
Expand All @@ -74,6 +74,7 @@ Thanks for assistance and contributions:
* Dmitriy Ryaboy <http://twitter.com/squarecog>
* Dong Wang <http://twitter.com/dongwang218>
* Edwin Chen <http://twitter.com/edchedch>
* Sam Ritchie <http://twitter.com/sritchie09>

## License
Copyright 2012 Twitter, Inc.
Expand Down
14 changes: 8 additions & 6 deletions build.sbt
Expand Up @@ -2,23 +2,25 @@ import AssemblyKeys._

name := "scalding"

version := "0.4.1"
version := "0.5.0"

organization := "com.twitter"

scalaVersion := "2.8.1"

resolvers += "Concurrent Maven Repo" at "http://conjars.org/repo"

libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-238"
libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-281"

libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-238"
libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-281"

libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-238"
libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-281"

libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.2.1"
libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.3.1"

libraryDependencies += "com.twitter" % "meat-locker" % "0.1.6"
libraryDependencies += "com.twitter" % "meat-locker" % "0.2.1"

libraryDependencies += "com.twitter" % "maple" % "0.1.4"

libraryDependencies += "commons-lang" % "commons-lang" % "2.4"

Expand Down
191 changes: 191 additions & 0 deletions scripts/ntuple_generators.rb
@@ -0,0 +1,191 @@
# @author Edwin Chen (@echen)
# Automatically write product monoid, product group, and product ring
# classes for tuples up to size 22.
#
# Run it like this:
#
# ruby scripts/ntuple_generators.rb > src/main/scala/com/twitter/scalding/mathematics/GeneratedAbstractAlgebra.scala

PACKAGE_NAME = "com.twitter.scalding.mathematics"

# The tuple sizes we want.
TUPLE_SIZES = (3..22).to_a

# Each element in a product tuple is of a certain type.
# This provides an alphabet to draw types from.
TYPE_SYMBOLS = ("A".."Z").to_a

INDENT = " "

# This returns the comment for each product monoid/group/ring definition.
# n is the size of the product.
# algebraic_structure is "monoid", "group", "ring", etc.
#
# Example return:
# "/**
# * Combine two monoids into a product monoid
# */"
def get_comment(n, algebraic_structure)
ret = <<EOS
/**
* Combine #{n} #{algebraic_structure}s into a product #{algebraic_structure}
*/
EOS
ret.strip
end

# This returns the class definition for each product monoid/group/ring.
# n is the size of the product.
# algebraic_structure is "monoid", "group", "ring", etc.
#
# Example return:
# "class Tuple2Monoid[T,U](implicit tmonoid : Monoid[T], umonoid : Monoid[U]) extends Monoid[(T,U)]"
def get_class_definition(n, algebraic_structure)
# Example: "T,U"
type_values_commaed = TYPE_SYMBOLS.first(n).join(", ")
"class Tuple#{n}#{algebraic_structure.capitalize}[#{type_values_commaed}](implicit #{get_type_parameters(n, algebraic_structure)}) extends #{algebraic_structure.capitalize}[(#{type_values_commaed})]"
end

# This returns the parameters for each product monoid/group/ring class.
# n is the size of the product.
# algebraic_structure is "monoid", "group", "ring", etc.
#
# Example return:
# "tmonoid : Monoid[T], umonoid : Monoid[U]"
def get_type_parameters(n, algebraic_structure)
params = TYPE_SYMBOLS.first(n).map{ |t| "#{t.downcase}#{algebraic_structure} : #{algebraic_structure.capitalize}[#{t.upcase}]"}
params.join(", ")
end

# This returns the method definition for constants in the algebraic structure.
# n is the size of the product.
# algebraic_structure is "monoid", "group", "ring", etc.
# constant is "zero", "one", etc.
#
# Example return:
# "override def zero = (tgroup.zero, ugroup.zero)"
def get_constant(n, algebraic_structure, constant)
# Example: "tgroup.zero, ugroup.zero"
constants_commaed = TYPE_SYMBOLS.first(n).map{ |t| "#{t.downcase}#{algebraic_structure}.#{constant}" }.join(", ")
"override def #{constant} = (#{constants_commaed})"
end

# This returns the method definition for negation in the algebraic structure
# (assuming the structure has an additive inverse).
# n is the size of the product.
# algebraic_structure is "group", "ring", etc.
#
# Example return:
# "override def negate(v : (T,U)) = (tgroup.negate(v._1), ugroup.negate(v._2))"
def get_negate(n, algebraic_structure)
negates_commaed = TYPE_SYMBOLS.first(n).each_with_index.map{ |t, i| "#{t.downcase}#{algebraic_structure}.negate(v._#{i+1})" }.join(", ")
"override def negate(v : (#{TYPE_SYMBOLS.first(n).join(", ")})) = (#{negates_commaed})"
end

# This returns the method definition for associative binary operations in
# the algebraic structure.
# n is the size of the product.
# algebraic_structure is "monoid", "group", "ring", etc.
# operation is "plus", "minus", "times", etc.
#
# Example return:
# "override def plus(l : (T,U), r : (T,U)) = (tmonoid.plus(l._1,r._1), umonoid.plus(l._2, r._2))"
def get_operation(n, algebraic_structure, operation)
# Example: "(T, U)"
individual_element_type = "(#{TYPE_SYMBOLS.first(n).join(", ")})"

# Example: "l : (T, U), r : (T, U)"
method_params = "l : #{individual_element_type}, r : #{individual_element_type}" # (1..n).to_a.map{ |i| "x#{i}" }.map{ |p| "#{p} : #{individual_element_type}" }.join(", ")

# Example: "(tmonoid.plus(l._1,r._1), umonoid.plus(l._2, r._2))"
values_commaed = TYPE_SYMBOLS.first(n).each_with_index.map do |t, i|
"#{t.downcase}#{algebraic_structure}.#{operation}(l._#{i+1}, r._#{i+1})"
end.join(", ")
values_commaed = "(#{values_commaed})"

"override def #{operation}(#{method_params}) = #{values_commaed}"
end

# Example return:
# "implicit def pairMonoid[T,U](implicit tg : Monoid[T], ug : Monoid[U]) : Monoid[(T,U)] = {
# new Tuple2Monoid[T,U]()(tg,ug)
# }"
def get_implicit_definition(n, algebraic_structure)
type_params_commaed = get_type_parameters(n, algebraic_structure)

# Example: "T,U"
tuple_type_commaed = TYPE_SYMBOLS.first(n).join(", ")

# Example: "Monoid[(T,U)]"
return_type = "#{algebraic_structure.capitalize}[(#{tuple_type_commaed})]"

ret = %Q|#{INDENT}implicit def #{algebraic_structure}#{n}[#{tuple_type_commaed}](implicit #{type_params_commaed}) : #{return_type} = {
#{INDENT} new Tuple#{n}#{algebraic_structure.capitalize}[#{tuple_type_commaed}]()(#{TYPE_SYMBOLS.first(n).map{ |t| t.downcase + algebraic_structure.downcase }.join(", ")})
#{INDENT}}|
ret
end

def print_class_definitions
TUPLE_SIZES.each do |tuple_size|

code = <<EOS
#{get_comment(tuple_size, "monoid")}
#{get_class_definition(tuple_size, "monoid")} {
#{get_constant(tuple_size, "monoid", "zero")}
#{get_operation(tuple_size, "monoid", "plus")}
}
#{get_comment(tuple_size, "group")}
#{get_class_definition(tuple_size, "group")} {
#{get_constant(tuple_size, "group", "zero")}
#{get_negate(tuple_size, "group")}
#{get_operation(tuple_size, "group", "plus")}
#{get_operation(tuple_size, "group", "minus")}
}
#{get_comment(tuple_size, "ring")}
#{get_class_definition(tuple_size, "ring")} {
#{get_constant(tuple_size, "ring", "zero")}
#{get_constant(tuple_size, "ring", "one")}
#{get_negate(tuple_size, "ring")}
#{get_operation(tuple_size, "ring", "plus")}
#{get_operation(tuple_size, "ring", "minus")}
#{get_operation(tuple_size, "ring", "times")}
}
EOS

puts code
end
end

def print_implicit_definitions
puts "trait GeneratedMonoidImplicits {"
TUPLE_SIZES.each do |n|
puts get_implicit_definition(n, "monoid")
puts
end
puts "}"
puts

puts "trait GeneratedGroupImplicits {"
TUPLE_SIZES.each do |n|
puts get_implicit_definition(n, "group")
puts
end
puts "}"
puts

puts "trait GeneratedRingImplicits {"
TUPLE_SIZES.each do |n|
puts get_implicit_definition(n, "ring")
puts
end
puts "}"
end

puts "// following were autogenerated by #{__FILE__} at #{Time.now} do not edit"
puts "package #{PACKAGE_NAME}"
puts
print_class_definitions
puts
print_implicit_definitions
2 changes: 1 addition & 1 deletion scripts/scald.rb
Expand Up @@ -2,7 +2,7 @@
require 'fileutils'
require 'thread'

SCALDING_VERSION="0.4.1"
SCALDING_VERSION="0.5.0"

#Usage : scald.rb [--hdfs|--local|--print] job <job args>
# --hdfs: if job ends in ".scala" or ".java" and the file exists, link it against JARFILE (below) and then run it on HOST.
Expand Down
61 changes: 61 additions & 0 deletions src/main/scala/com/twitter/scalding/CoGroupBuilder.scala
@@ -0,0 +1,61 @@
/*
Copyright 2012 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding

import cascading.pipe.{CoGroup, Every, Pipe}
import cascading.pipe.joiner.MixedJoin
import cascading.tuple.Fields

/**
* Builder classes used internally to implement coGroups (joins).
* Can also be used for more generalized joins, e.g., star joins.
*
*/
class CoGroupBuilder(groupFields : Fields, joinMode : JoinMode) extends GroupBuilder(groupFields) {
protected var coGroups : List[(Fields, Pipe, JoinMode)] = Nil

// Joins (cogroups) with pipe p on fields f.
// Make sure that pipe p is smaller than the left side pipe, otherwise this
// might take a while.
def coGroup(f : Fields, p : Pipe, j : JoinMode = InnerJoinMode) = {
coGroups ::= (f, RichPipe.assignName(p), j)
this
}

// TODO: move the automatic renaming of fields here
// and remove it from joinWithSmaller/joinWithTiny
override def schedule(name : String, pipe : Pipe) : Pipe = {
assert(!sortBy.isDefined, "cannot use a sortBy when doing a coGroup")
assert(!coGroups.isEmpty, "coGroupBy requires at least one other pipe to .coGroup")
val fields = (groupFields :: coGroups.map{ _._1 }).toArray
val pipes = (pipe :: coGroups.map{ _._2 }).map{ RichPipe.assignName(_) }.toArray
val joinModes = (joinMode :: coGroups.map{ _._3 }).map{ _.booleanValue }.toArray
val mixedJoiner = new MixedJoin(joinModes)
val cg : Pipe = new CoGroup(pipes, fields, null, mixedJoiner)
overrideReducers(cg)
evs.foldRight(cg)( (op : Pipe => Every, p) => op(p) )
}
}

sealed abstract class JoinMode {
def booleanValue : Boolean
}
case object InnerJoinMode extends JoinMode {
override def booleanValue = true
}
case object OuterJoinMode extends JoinMode {
override def booleanValue = false
}

0 comments on commit 6dedd17

Please sign in to comment.