Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Adds Abstract Algebra

  • Loading branch information...
commit ba2571c32811442440af6a3f3892c4d89ad1fa97 1 parent a49099b
@johnynek authored
View
173 scripts/ntuple_generators.rb
@@ -0,0 +1,173 @@
+# @author Edwin Chen (@echen)
+# Automatically write product monoid, product group, and product ring
+# classes for tuples up to size 22.
+
+# The tuple sizes we want.
+TUPLE_SIZES = (3..22).to_a
+
+# Each element in a product tuple is of a certain type.
+# This provides an alphabet to draw types from.
+TYPE_SYMBOLS = ("A".."Z").to_a
+
+# This returns the comment for each product monoid/group/ring definition.
+# n is the size of the product.
+# algebraic_structure is "monoid", "group", "ring", etc.
+#
+# Example return:
+# "/**
+# * Combine two monoids into a product monoid
+# */"
+def get_comment(n, algebraic_structure)
+ ret = <<EOS
+/**
+* Combine #{n} #{algebraic_structure}s into a product #{algebraic_structure}
+*/
+EOS
+ ret.strip
+end
+
+# This returns the class definition for each product monoid/group/ring.
+# n is the size of the product.
+# algebraic_structure is "monoid", "group", "ring", etc.
+#
+# Example return:
+# "class Tuple2Monoid[T,U](implicit tmonoid : Monoid[T], umonoid : Monoid[U]) extends Monoid[(T,U)]"
+def get_class_definition(n, algebraic_structure)
+ # Example: "T,U"
+ type_values_commaed = TYPE_SYMBOLS.first(n).join(", ")
+ "class Tuple#{n}#{algebraic_structure.capitalize}[#{type_values_commaed}](implicit #{get_type_parameters(n, algebraic_structure)}) extends #{algebraic_structure.capitalize}[(#{type_values_commaed})]"
+end
+
+# This returns the parameters for each product monoid/group/ring class.
+# n is the size of the product.
+# algebraic_structure is "monoid", "group", "ring", etc.
+#
+# Example return:
+# "tmonoid : Monoid[T], umonoid : Monoid[U]"
+def get_type_parameters(n, algebraic_structure)
+ params = TYPE_SYMBOLS.first(n).map{ |t| "#{t.downcase}#{algebraic_structure} : #{algebraic_structure.capitalize}[#{t.upcase}]"}
+ params.join(", ")
+end
+
+# This returns the method definition for constants in the algebraic structure.
+# n is the size of the product.
+# algebraic_structure is "monoid", "group", "ring", etc.
+# constant is "zero", "one", etc.
+#
+# Example return:
+# "override def zero = (tgroup.zero, ugroup.zero)"
+def get_constant(n, algebraic_structure, constant)
+ # Example: "tgroup.zero, ugroup.zero"
+ constants_commaed = TYPE_SYMBOLS.first(n).map{ |t| "#{t.downcase}#{algebraic_structure}.#{constant}" }.join(", ")
+ "override def #{constant} = (#{constants_commaed})"
+end
+
+# This returns the method definition for negation in the algebraic structure
+# (assuming the structure has an additive inverse).
+# n is the size of the product.
+# algebraic_structure is "group", "ring", etc.
+#
+# Example return:
+# "override def negate(v : (T,U)) = (tgroup.negate(v._1), ugroup.negate(v._2))"
+def get_negate(n, algebraic_structure)
+ negates_commaed = TYPE_SYMBOLS.first(n).map.with_index{ |t, i| "#{t.downcase}#{algebraic_structure}.negate(v._#{i+1})" }.join(", ")
+ "override def negate(v : (#{TYPE_SYMBOLS.first(n).join(", ")})) = (#{negates_commaed})"
+end
+
+# This returns the method definition for associative binary operations in
+# the algebraic structure.
+# n is the size of the product.
+# algebraic_structure is "monoid", "group", "ring", etc.
+# operation is "plus", "minus", "times", etc.
+#
+# Example return:
+# "override def plus(l : (T,U), r : (T,U)) = (tmonoid.plus(l._1,r._1), umonoid.plus(l._2, r._2))"
+def get_operation(n, algebraic_structure, operation)
+ # Example: "(T, U)"
+ individual_element_type = "(#{TYPE_SYMBOLS.first(n).join(", ")})"
+
+ # Example: "l : (T, U), r : (T, U)"
+ method_params = "l : #{individual_element_type}, r : #{individual_element_type}" # (1..n).to_a.map{ |i| "x#{i}" }.map{ |p| "#{p} : #{individual_element_type}" }.join(", ")
+
+ # Example: "(tmonoid.plus(l._1,r._1), umonoid.plus(l._2, r._2))"
+ values_commaed = TYPE_SYMBOLS.first(n).map.with_index do |t, i|
+ "#{t.downcase}#{algebraic_structure}.#{operation}(l._#{i+1}, r._#{i+1})"
+ end.join(", ")
+ values_commaed = "(#{values_commaed})"
+
+ "override def #{operation}(#{method_params}) = #{values_commaed}"
+end
+
+# Example return:
+# "implicit def pairMonoid[T,U](implicit tg : Monoid[T], ug : Monoid[U]) : Monoid[(T,U)] = {
+# new Tuple2Monoid[T,U]()(tg,ug)
+# }"
+def get_implicit_definition(n, algebraic_structure)
+ type_params_commaed = get_type_parameters(n, algebraic_structure)
+
+ # Example: "T,U"
+ tuple_type_commaed = TYPE_SYMBOLS.first(n).join(", ")
+
+ # Example: "Monoid[(T,U)]"
+ return_type = "#{algebraic_structure.capitalize}[(#{tuple_type_commaed})]"
+
+ ret = <<EOS
+implicit def #{algebraic_structure}#{n}[#{tuple_type_commaed}](implicit #{type_params_commaed}) : #{return_type} = {
+ new Tuple#{n}#{algebraic_structure.capitalize}[#{tuple_type_commaed}]()(#{TYPE_SYMBOLS.first(n).map{ |t| t.downcase + algebraic_structure.downcase }.join(", ")})
+}
+EOS
+ ret
+end
+
+def print_class_definitions
+ TUPLE_SIZES.each do |tuple_size|
+
+ code = <<EOS
+#{get_comment(tuple_size, "monoid")}
+#{get_class_definition(tuple_size, "monoid")} {
+ #{get_constant(tuple_size, "monoid", "zero")}
+ #{get_operation(tuple_size, "monoid", "plus")}
+}
+
+#{get_comment(tuple_size, "group")}
+#{get_class_definition(tuple_size, "group")} {
+ #{get_constant(tuple_size, "group", "zero")}
+ #{get_negate(tuple_size, "group")}
+ #{get_operation(tuple_size, "group", "plus")}
+ #{get_operation(tuple_size, "group", "minus")}
+}
+
+#{get_comment(tuple_size, "ring")}
+#{get_class_definition(tuple_size, "ring")} {
+ #{get_constant(tuple_size, "ring", "zero")}
+ #{get_constant(tuple_size, "ring", "one")}
+ #{get_negate(tuple_size, "ring")}
+ #{get_operation(tuple_size, "ring", "plus")}
+ #{get_operation(tuple_size, "ring", "minus")}
+ #{get_operation(tuple_size, "ring", "times")}
+}
+EOS
+
+ puts code
+ end
+end
+
+def print_implicit_definitions
+ TUPLE_SIZES.each do |n|
+ puts get_implicit_definition(n, "monoid")
+ puts
+ end
+
+ TUPLE_SIZES.each do |n|
+ puts get_implicit_definition(n, "group")
+ puts
+ end
+
+ TUPLE_SIZES.each do |n|
+ puts get_implicit_definition(n, "ring")
+ puts
+ end
+end
+
+# print_class_definitions
+# print_implicit_definitions
View
46 src/main/scala/com/twitter/scalding/GroupBuilder.scala
@@ -23,6 +23,8 @@ import cascading.operation.filter._
import cascading.tuple.Fields
import cascading.tuple.{Tuple => CTuple}
+import com.twitter.scalding.mathematics.{Monoid, Ring}
+
import scala.collection.JavaConverters._
import scala.annotation.tailrec
import scala.math.Ordering
@@ -387,6 +389,50 @@ class GroupBuilder(val groupFields : Fields) extends FieldConversions
reduce(fieldDef -> fieldDef)(fn)(setter,conv)
}
+ // Abstract algebra reductions (plus, times, dot):
+
+ /** use Monoid.plus to compute a sum. Not called sum to avoid conflicting with standard sum
+ * Your Monoid[T] should be associated and commutative, else this doesn't make sense
+ */
+ def plus[T](fd : (Fields,Fields))
+ (implicit monoid : Monoid[T], tconv : TupleConverter[T], tset : TupleSetter[T]) : GroupBuilder = {
+ // We reverse the order because the left is the old value in reduce, and for list concat
+ // we are much better off concatenating into the bigger list
+ reduce[T](fd)({ (left, right) => monoid.plus(right, left) })(tset, tconv)
+ }
+
+ // The same as plus(fs -> fs)
+ def plus[T](fs : Symbol*)
+ (implicit monoid : Monoid[T], tconv : TupleConverter[T], tset : TupleSetter[T]) : GroupBuilder = {
+ plus[T](fs -> fs)(monoid,tconv,tset)
+ }
+
+ // Returns the product of all the items in this grouping
+ def times[T](fd : (Fields,Fields))
+ (implicit ring : Ring[T], tconv : TupleConverter[T], tset : TupleSetter[T]) : GroupBuilder = {
+ // We reverse the order because the left is the old value in reduce, and for list concat
+ // we are much better off concatenating into the bigger list
+ reduce[T](fd)({ (left, right) => ring.times(right, left) })(tset, tconv)
+ }
+
+ // The same as times(fs -> fs)
+ def times[T](fs : Symbol*)
+ (implicit ring : Ring[T], tconv : TupleConverter[T], tset : TupleSetter[T]) : GroupBuilder = {
+ times[T](fs -> fs)(ring,tconv,tset)
+ }
+
+ // First do "times" on each pair, then "plus" them all together.
+ // Example: groupBy('x) { _.dot('y,'z, 'ydotz) }
+ def dot[T](left : Fields, right : Fields, result : Fields)
+ (implicit ttconv : TupleConverter[Tuple2[T,T]], ring : Ring[T],
+ tconv : TupleConverter[T], tset : TupleSetter[T]) : GroupBuilder = {
+ mapReduceMap[(T,T),T,T](Fields.merge(left, right) -> result) { init : (T,T) =>
+ ring.times(init._1, init._2)
+ } { (left : T, right: T) =>
+ ring.plus(left, right)
+ } { result => result }
+ }
+
def reverse : GroupBuilder = {
assert(reds.isEmpty, "Cannot sort when reducing")
assert(!isReversed, "Reverse called a second time! Only one allowed")
View
2  src/main/scala/com/twitter/scalding/Source.scala
@@ -137,7 +137,7 @@ abstract class Source extends java.io.Serializable {
case Read => {
val buffer = buffers(this)
val fields = hdfsScheme.getSourceFields
- new MemorySourceTap(buffer.toList.asJava, fields)
+ (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[RawTap]
}
case Write => {
val path = hdfsTest.getWritePathFor(this)
View
1,109 src/main/scala/com/twitter/scalding/mathematics/AbstractAlgebra.scala
1,109 additions, 0 deletions not shown
View
33 src/test/scala/com/twitter/scalding/AlgebraicReductionsTest.scala
@@ -0,0 +1,33 @@
+package com.twitter.scalding
+
+import org.specs._
+/**
+ */
+class AlgebraJob(args : Args) extends Job(args) {
+ Tsv("input", ('x,'y,'z,'w))
+ .map('w -> 'w) { w : Int => Set(w) }
+ .groupBy('x) {
+ _.plus[(Int,Int)](('y,'z) -> ('sy, 'sz))
+ .plus[Set[Int]]('w -> 'setw)
+ .times[(Int,Int)](('y, 'z) -> ('py, 'pz))
+ .dot[Int]('y,'z,'ydotz)
+ }
+ .write(Tsv("output"))
+}
+
+class AlgebraJobTest extends Specification {
+ import RichPipe._
+ val inputData = List((1,2,3,5),(1,4,5,7),(2,1,0,7))
+ val correctOutput = List((1,6,8,Set(5,7), 8,15,(6 + 20)),(2,1,0,Set(7),1,0,0))
+ "A AlgebraJob" should {
+ JobTest("com.twitter.scalding.AlgebraJob")
+ .source(Tsv("input",('x,'y,'z,'w)), inputData)
+ .sink[(Int, Int, Int, Set[Int], Int, Int, Int)](Tsv("output")) { buf =>
+ "correctly do algebra" in {
+ buf.toList must be_==(correctOutput)
+ }
+ }
+ .run
+ .finish
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.